Skip to main content

aura_core/effects/
reactive.rs

1//! Reactive Effect Traits (FRP as Algebraic Effects)
2//!
3//! This module unifies functional reactive programming with the algebraic effect system.
4//! FRP operations (reading signals, emitting values, subscribing to changes) are modeled
5//! as effects that handlers can interpret.
6//!
7//! # Effect Classification
8//!
9//! - **Category**: Infrastructure Effect
10//! - **Implementation**: `aura-effects` (Layer 3)
11//! - **Usage**: All crates needing reactive state management
12//!
13//! # Core Concepts
14//!
15//! - **Signal<T>**: A time-varying value of type T (behavior in FRP terminology)
16//! - **SignalStream<T>**: A stream of value changes (event stream in FRP terminology)
17//! - **ReactiveEffects**: The effect trait for all reactive operations
18//! - **Query-bound signals**: Signals that automatically update when facts change
19//!
20//! # Design Principles
21//!
22//! 1. **Effects all the way down**: Reading a signal is an effect, not a direct memory access
23//! 2. **Composition**: Reactive effects compose with other effects (auth, journaling, etc.)
24//! 3. **Testability**: Mock handlers enable deterministic testing of reactive flows
25//! 4. **Type safety**: Signals are phantom-typed for compile-time correctness
26//! 5. **Query binding**: Signals can be bound to queries for automatic updates
27
28use async_trait::async_trait;
29use serde::{Deserialize, Serialize};
30use std::fmt;
31use std::hash::Hash;
32use std::marker::PhantomData;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use tokio::sync::broadcast;
36
37use crate::query::{FactPredicate, Query};
38
39// ─────────────────────────────────────────────────────────────────────────────
40// Error Types
41// ─────────────────────────────────────────────────────────────────────────────
42
43/// Error type for reactive operations.
44#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
45pub enum ReactiveError {
46    /// Signal not found in the reactive graph
47    #[error("Signal not found: {id}")]
48    SignalNotFound { id: String },
49
50    /// Type mismatch when reading or emitting
51    #[error("Type mismatch for signal {id}: expected {expected}, got {actual}")]
52    TypeMismatch {
53        id: String,
54        expected: String,
55        actual: String,
56    },
57
58    /// Subscription channel closed
59    #[error("Subscription channel closed for signal: {id}")]
60    SubscriptionClosed { id: String },
61
62    /// Emission failed (e.g., no subscribers, channel full)
63    #[error("Failed to emit to signal {id}: {reason}")]
64    EmissionFailed { id: String, reason: String },
65
66    /// Derivation cycle detected
67    #[error("Cycle detected in signal derivation: {path}")]
68    CycleDetected { path: String },
69
70    /// Handler not available
71    #[error("Reactive handler not available")]
72    HandlerUnavailable,
73
74    /// Internal error
75    #[error("Internal reactive error: {reason}")]
76    Internal { reason: String },
77}
78
79// ─────────────────────────────────────────────────────────────────────────────
80// Signal Types
81// ─────────────────────────────────────────────────────────────────────────────
82
83/// Unique identifier for a signal.
84///
85/// SignalIds are globally unique and can be used to look up signals
86/// in the reactive graph.
87#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
88pub struct SignalId(String);
89
90impl SignalId {
91    /// Create a new signal ID from a string.
92    pub fn new(id: impl Into<String>) -> Self {
93        Self(id.into())
94    }
95
96    /// Create a unique signal ID with an auto-generated suffix.
97    pub fn unique(prefix: &str) -> Self {
98        static COUNTER: AtomicU64 = AtomicU64::new(0);
99        let id = COUNTER.fetch_add(1, Ordering::SeqCst);
100        Self(format!("{prefix}:{id}"))
101    }
102
103    /// Get the string representation.
104    pub fn as_str(&self) -> &str {
105        &self.0
106    }
107}
108
109impl fmt::Display for SignalId {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        write!(f, "{}", self.0)
112    }
113}
114
115impl From<&str> for SignalId {
116    fn from(s: &str) -> Self {
117        Self::new(s)
118    }
119}
120
121impl From<String> for SignalId {
122    fn from(s: String) -> Self {
123        Self::new(s)
124    }
125}
126
127/// A typed signal representing a time-varying value.
128///
129/// Signals are the core primitive of the reactive system. They represent
130/// values that change over time and can be:
131/// - Read (get current value)
132/// - Emitted to (update value)
133/// - Subscribed to (receive change notifications)
134/// - Derived from (computed from other signals)
135///
136/// The type parameter T is phantom - actual storage is handled by the
137/// reactive handler, enabling type-safe access without runtime type tags.
138#[derive(Debug)]
139pub struct Signal<T> {
140    /// Unique identifier for this signal
141    id: SignalId,
142    /// Phantom type marker
143    _phantom: PhantomData<T>,
144}
145
146impl<T> Signal<T> {
147    /// Create a new signal with the given ID.
148    pub fn new(id: impl Into<SignalId>) -> Self {
149        Self {
150            id: id.into(),
151            _phantom: PhantomData,
152        }
153    }
154
155    /// Create a signal with a unique auto-generated ID.
156    pub fn unique(prefix: &str) -> Self {
157        Self {
158            id: SignalId::unique(prefix),
159            _phantom: PhantomData,
160        }
161    }
162
163    /// Get the signal's ID.
164    pub fn id(&self) -> &SignalId {
165        &self.id
166    }
167}
168
169impl<T> Clone for Signal<T> {
170    fn clone(&self) -> Self {
171        Self {
172            id: self.id.clone(),
173            _phantom: PhantomData,
174        }
175    }
176}
177
178impl<T> PartialEq for Signal<T> {
179    fn eq(&self, other: &Self) -> bool {
180        self.id == other.id
181    }
182}
183
184impl<T> Eq for Signal<T> {}
185
186impl<T> Hash for Signal<T> {
187    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
188        self.id.hash(state);
189    }
190}
191
192// ─────────────────────────────────────────────────────────────────────────────
193// Signal Stream
194// ─────────────────────────────────────────────────────────────────────────────
195
196/// A stream of signal value changes.
197///
198/// SignalStream wraps a broadcast receiver and provides filtering capabilities.
199/// It implements async iteration for use in event loops.
200#[allow(clippy::type_complexity)]
201pub struct SignalStream<T: Clone> {
202    /// The underlying broadcast receiver
203    receiver: broadcast::Receiver<T>,
204    /// Optional filter predicate
205    filter: Option<Box<dyn Fn(&T) -> bool + Send + Sync>>,
206    /// Signal ID for error reporting
207    signal_id: SignalId,
208}
209
210impl<T: Clone> SignalStream<T> {
211    /// Create a new signal stream.
212    pub fn new(receiver: broadcast::Receiver<T>, signal_id: SignalId) -> Self {
213        Self {
214            receiver,
215            filter: None,
216            signal_id,
217        }
218    }
219
220    /// Add a filter to this stream.
221    ///
222    /// Only values matching the predicate will be yielded.
223    pub fn filter<F>(mut self, predicate: F) -> Self
224    where
225        F: Fn(&T) -> bool + Send + Sync + 'static,
226    {
227        self.filter = Some(Box::new(predicate));
228        self
229    }
230
231    /// Try to receive the next value without blocking.
232    ///
233    /// Returns `None` if no value is available or the channel is closed.
234    pub fn try_recv(&mut self) -> Option<T> {
235        loop {
236            match self.receiver.try_recv() {
237                Ok(value) => {
238                    if let Some(ref filter) = self.filter {
239                        if filter(&value) {
240                            return Some(value);
241                        }
242                        // Value didn't match filter, try again
243                        continue;
244                    }
245                    return Some(value);
246                }
247                Err(_) => return None,
248            }
249        }
250    }
251
252    /// Receive the next value, waiting if necessary.
253    ///
254    /// Returns an error if the channel is closed.
255    ///
256    /// Signal subscriptions are eventually consistent rather than lossless:
257    /// if a subscriber lags behind the broadcast buffer under sustained load,
258    /// intermediate values may be dropped and the next received value will be a
259    /// newer snapshot. Handlers are expected to log lag events so the loss is
260    /// diagnosable.
261    pub async fn recv(&mut self) -> Result<T, ReactiveError> {
262        loop {
263            match self.receiver.recv().await {
264                Ok(value) => {
265                    if let Some(ref filter) = self.filter {
266                        if filter(&value) {
267                            return Ok(value);
268                        }
269                        // Value didn't match filter, try again
270                        continue;
271                    }
272                    return Ok(value);
273                }
274                Err(broadcast::error::RecvError::Closed) => {
275                    return Err(ReactiveError::SubscriptionClosed {
276                        id: self.signal_id.to_string(),
277                    });
278                }
279                Err(broadcast::error::RecvError::Lagged(skipped)) => {
280                    tracing::warn!(
281                        signal_id = %self.signal_id,
282                        skipped,
283                        "signal stream lagged; continuing with a newer snapshot"
284                    );
285                    continue;
286                }
287            }
288        }
289    }
290}
291
292// ─────────────────────────────────────────────────────────────────────────────
293// Reactive Effects Trait
294// ─────────────────────────────────────────────────────────────────────────────
295
296/// Reactive effects for FRP-style state management.
297///
298/// This trait unifies the effect system with functional reactive programming.
299/// All state access and mutation goes through these effects, enabling:
300/// - Transparent reactivity (handlers track dependencies)
301/// - Effect composition (reactive ops compose with other effects)
302/// - Testability (mock handlers for deterministic testing)
303///
304/// # Example
305///
306/// ```ignore
307/// // Reading a signal
308/// let chat_state = effects.read(&CHAT_SIGNAL).await?;
309///
310/// // Emitting to a signal
311/// effects.emit(&CHAT_SIGNAL, new_state).await?;
312///
313/// // Subscribing to changes
314/// let mut stream = effects.subscribe(&CHAT_SIGNAL);
315/// while let Ok(value) = stream.recv().await {
316///     println!("Chat updated: {:?}", value);
317/// }
318/// ```
319#[async_trait]
320pub trait ReactiveEffects: Send + Sync {
321    /// Read the current value of a signal.
322    ///
323    /// This is a point-in-time snapshot. For continuous updates, use `subscribe`.
324    ///
325    /// # Errors
326    ///
327    /// Returns `ReactiveError::SignalNotFound` if the signal doesn't exist.
328    async fn read<T>(&self, signal: &Signal<T>) -> Result<T, ReactiveError>
329    where
330        T: Clone + Send + Sync + 'static;
331
332    /// Emit a new value to a signal.
333    ///
334    /// This updates the signal's value and notifies all subscribers.
335    /// Derived signals that depend on this signal are also updated.
336    ///
337    /// # Errors
338    ///
339    /// Returns `ReactiveError::SignalNotFound` if the signal doesn't exist.
340    async fn emit<T>(&self, signal: &Signal<T>, value: T) -> Result<(), ReactiveError>
341    where
342        T: Clone + Send + Sync + 'static;
343
344    /// Subscribe to signal changes.
345    ///
346    /// Returns a stream that yields values when the signal changes.
347    /// The stream can be filtered to only receive specific updates.
348    ///
349    /// # Errors
350    ///
351    /// Returns `ReactiveError::SignalNotFound` if the signal has not been
352    /// registered yet. Callers must handle this rather than assuming an empty
353    /// stream means "no updates". There is no implicit registration wait.
354    ///
355    /// Subscription delivery is eventually consistent rather than lossless.
356    /// If a subscriber falls behind the bounded broadcast buffer under
357    /// sustained load, intermediate values may be dropped and the stream will
358    /// resume from a newer snapshot after logging the lag event.
359    fn subscribe<T>(&self, signal: &Signal<T>) -> Result<SignalStream<T>, ReactiveError>
360    where
361        T: Clone + Send + Sync + 'static;
362
363    /// Register a signal with an initial value.
364    ///
365    /// This creates the signal in the reactive graph. Signals must be
366    /// registered before they can be read, emitted to, or subscribed.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if a signal with the same ID already exists.
371    async fn register<T>(&self, signal: &Signal<T>, initial: T) -> Result<(), ReactiveError>
372    where
373        T: Clone + Send + Sync + 'static;
374
375    /// Check if a signal is registered.
376    fn is_registered(&self, signal_id: &SignalId) -> bool;
377
378    /// Register a signal bound to a query.
379    ///
380    /// This creates a signal whose value is derived from executing the query.
381    /// When facts matching the query's `dependencies()` change, the query is
382    /// automatically re-evaluated and the signal is updated.
383    ///
384    /// # Query-Signal Flow
385    ///
386    /// ```text
387    /// Facts Change → Check dependencies() → Re-execute query → Emit to signal
388    /// ```
389    ///
390    /// # Example
391    ///
392    /// ```ignore
393    /// use aura_app::queries::ChannelsQuery;
394    ///
395    /// // Register a signal bound to a query
396    /// effects.register_query(&CHANNELS_SIGNAL, ChannelsQuery::default()).await?;
397    ///
398    /// // Signal automatically updates when channel facts change
399    /// let channels = effects.read(&CHANNELS_SIGNAL).await?;
400    /// ```
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the signal already exists or query binding fails.
405    async fn register_query<Q: Query>(
406        &self,
407        signal: &Signal<Q::Result>,
408        query: Q,
409    ) -> Result<(), ReactiveError>;
410
411    /// Get the fact predicates that a signal depends on.
412    ///
413    /// Returns `None` if the signal is not bound to a query.
414    /// Returns `Some(predicates)` if the signal was registered with `register_query`.
415    fn query_dependencies(&self, signal_id: &SignalId) -> Option<Vec<FactPredicate>>;
416
417    /// Notify that facts matching a predicate have changed.
418    ///
419    /// This triggers re-evaluation of all query-bound signals whose
420    /// dependencies intersect with the changed predicate.
421    ///
422    /// Called by `JournalEffects` when facts are committed.
423    async fn invalidate_queries(&self, changed: &FactPredicate);
424}
425
426/// Extension trait for derived signals.
427///
428/// This trait provides combinators for creating derived signals that
429/// automatically update when their sources change.
430#[async_trait]
431pub trait ReactiveDeriveEffects: ReactiveEffects {
432    /// Create a derived signal that maps values from a source signal.
433    ///
434    /// The derived signal automatically updates when the source changes.
435    ///
436    /// # Example
437    ///
438    /// ```ignore
439    /// let unread_count = effects.map(&CHAT_SIGNAL, |chat| {
440    ///     chat.channels.iter().map(|c| c.unread_count).sum()
441    /// });
442    /// ```
443    async fn map<A, B, F>(&self, source: &Signal<A>, f: F) -> Result<Signal<B>, ReactiveError>
444    where
445        A: Clone + Send + Sync + 'static,
446        B: Clone + Send + Sync + 'static,
447        F: Fn(A) -> B + Send + Sync + 'static;
448
449    /// Create a derived signal that combines two source signals.
450    ///
451    /// The derived signal updates when either source changes.
452    ///
453    /// # Example
454    ///
455    /// ```ignore
456    /// let status = effects.combine(&CONNECTION_SIGNAL, &SYNC_SIGNAL, |conn, sync| {
457    ///     format!("Connected: {}, Syncing: {}", conn.is_connected, sync.in_progress)
458    /// });
459    /// ```
460    async fn combine<A, B, C, F>(
461        &self,
462        a: &Signal<A>,
463        b: &Signal<B>,
464        f: F,
465    ) -> Result<Signal<C>, ReactiveError>
466    where
467        A: Clone + Send + Sync + 'static,
468        B: Clone + Send + Sync + 'static,
469        C: Clone + Send + Sync + 'static,
470        F: Fn(A, B) -> C + Send + Sync + 'static;
471
472    /// Create a derived signal that filters and maps values.
473    ///
474    /// Only emits when the filter returns Some.
475    async fn filter_map<A, B, F>(
476        &self,
477        source: &Signal<A>,
478        f: F,
479    ) -> Result<Signal<Option<B>>, ReactiveError>
480    where
481        A: Clone + Send + Sync + 'static,
482        B: Clone + Send + Sync + 'static,
483        F: Fn(A) -> Option<B> + Send + Sync + 'static;
484}
485
486// ─────────────────────────────────────────────────────────────────────────────
487// Blanket Implementations
488// ─────────────────────────────────────────────────────────────────────────────
489
490/// Blanket implementation for Arc<T> where T: ReactiveEffects
491#[async_trait]
492impl<T: ReactiveEffects + ?Sized> ReactiveEffects for Arc<T> {
493    async fn read<V>(&self, signal: &Signal<V>) -> Result<V, ReactiveError>
494    where
495        V: Clone + Send + Sync + 'static,
496    {
497        (**self).read(signal).await
498    }
499
500    async fn emit<V>(&self, signal: &Signal<V>, value: V) -> Result<(), ReactiveError>
501    where
502        V: Clone + Send + Sync + 'static,
503    {
504        (**self).emit(signal, value).await
505    }
506
507    fn subscribe<V>(&self, signal: &Signal<V>) -> Result<SignalStream<V>, ReactiveError>
508    where
509        V: Clone + Send + Sync + 'static,
510    {
511        (**self).subscribe(signal)
512    }
513
514    async fn register<V>(&self, signal: &Signal<V>, initial: V) -> Result<(), ReactiveError>
515    where
516        V: Clone + Send + Sync + 'static,
517    {
518        (**self).register(signal, initial).await
519    }
520
521    fn is_registered(&self, signal_id: &SignalId) -> bool {
522        (**self).is_registered(signal_id)
523    }
524
525    async fn register_query<Q: Query>(
526        &self,
527        signal: &Signal<Q::Result>,
528        query: Q,
529    ) -> Result<(), ReactiveError> {
530        (**self).register_query(signal, query).await
531    }
532
533    fn query_dependencies(&self, signal_id: &SignalId) -> Option<Vec<FactPredicate>> {
534        (**self).query_dependencies(signal_id)
535    }
536
537    async fn invalidate_queries(&self, changed: &FactPredicate) {
538        (**self).invalidate_queries(changed).await;
539    }
540}
541
542// ─────────────────────────────────────────────────────────────────────────────
543// Tests
544// ─────────────────────────────────────────────────────────────────────────────
545
546#[cfg(test)]
547mod tests {
548    use super::*;
549
550    #[test]
551    fn test_signal_id_creation() {
552        let id1 = SignalId::new("chat");
553        assert_eq!(id1.as_str(), "chat");
554
555        let id2 = SignalId::from("recovery");
556        assert_eq!(id2.as_str(), "recovery");
557    }
558
559    #[test]
560    fn test_signal_id_unique() {
561        let id1 = SignalId::unique("test");
562        let id2 = SignalId::unique("test");
563        assert_ne!(id1, id2);
564    }
565
566    #[test]
567    fn test_signal_creation() {
568        let signal: Signal<String> = Signal::new("chat");
569        assert_eq!(signal.id().as_str(), "chat");
570    }
571
572    #[test]
573    fn test_signal_clone() {
574        let signal1: Signal<u32> = Signal::new("counter");
575        let signal2 = signal1.clone();
576        assert_eq!(signal1.id(), signal2.id());
577    }
578
579    #[test]
580    fn test_signal_equality() {
581        let signal1: Signal<String> = Signal::new("chat");
582        let signal2: Signal<String> = Signal::new("chat");
583        let signal3: Signal<String> = Signal::new("recovery");
584
585        assert_eq!(signal1, signal2);
586        assert_ne!(signal1, signal3);
587    }
588
589    #[test]
590    fn test_reactive_error_display() {
591        let err = ReactiveError::SignalNotFound {
592            id: "chat".to_string(),
593        };
594        assert!(err.to_string().contains("chat"));
595
596        let err = ReactiveError::TypeMismatch {
597            id: "counter".to_string(),
598            expected: "u32".to_string(),
599            actual: "String".to_string(),
600        };
601        assert!(err.to_string().contains("counter"));
602        assert!(err.to_string().contains("u32"));
603    }
604}