aura_core/effects/reactive.rs
1//! Reactive Effect Traits (FRP as Algebraic Effects)
2//!
3//! This module unifies functional reactive programming with the algebraic effect system.
4//! FRP operations (reading signals, emitting values, subscribing to changes) are modeled
5//! as effects that handlers can interpret.
6//!
7//! # Effect Classification
8//!
9//! - **Category**: Infrastructure Effect
10//! - **Implementation**: `aura-effects` (Layer 3)
11//! - **Usage**: All crates needing reactive state management
12//!
13//! # Core Concepts
14//!
15//! - **Signal<T>**: A time-varying value of type T (behavior in FRP terminology)
16//! - **SignalStream<T>**: A stream of value changes (event stream in FRP terminology)
17//! - **ReactiveEffects**: The effect trait for all reactive operations
18//! - **Query-bound signals**: Signals that automatically update when facts change
19//!
20//! # Design Principles
21//!
22//! 1. **Effects all the way down**: Reading a signal is an effect, not a direct memory access
23//! 2. **Composition**: Reactive effects compose with other effects (auth, journaling, etc.)
24//! 3. **Testability**: Mock handlers enable deterministic testing of reactive flows
25//! 4. **Type safety**: Signals are phantom-typed for compile-time correctness
26//! 5. **Query binding**: Signals can be bound to queries for automatic updates
27
28use async_trait::async_trait;
29use serde::{Deserialize, Serialize};
30use std::fmt;
31use std::hash::Hash;
32use std::marker::PhantomData;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::sync::Arc;
35use tokio::sync::broadcast;
36
37use crate::query::{FactPredicate, Query};
38
39// ─────────────────────────────────────────────────────────────────────────────
40// Error Types
41// ─────────────────────────────────────────────────────────────────────────────
42
43/// Error type for reactive operations.
44#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
45pub enum ReactiveError {
46 /// Signal not found in the reactive graph
47 #[error("Signal not found: {id}")]
48 SignalNotFound { id: String },
49
50 /// Type mismatch when reading or emitting
51 #[error("Type mismatch for signal {id}: expected {expected}, got {actual}")]
52 TypeMismatch {
53 id: String,
54 expected: String,
55 actual: String,
56 },
57
58 /// Subscription channel closed
59 #[error("Subscription channel closed for signal: {id}")]
60 SubscriptionClosed { id: String },
61
62 /// Emission failed (e.g., no subscribers, channel full)
63 #[error("Failed to emit to signal {id}: {reason}")]
64 EmissionFailed { id: String, reason: String },
65
66 /// Derivation cycle detected
67 #[error("Cycle detected in signal derivation: {path}")]
68 CycleDetected { path: String },
69
70 /// Handler not available
71 #[error("Reactive handler not available")]
72 HandlerUnavailable,
73
74 /// Internal error
75 #[error("Internal reactive error: {reason}")]
76 Internal { reason: String },
77}
78
79// ─────────────────────────────────────────────────────────────────────────────
80// Signal Types
81// ─────────────────────────────────────────────────────────────────────────────
82
83/// Unique identifier for a signal.
84///
85/// SignalIds are globally unique and can be used to look up signals
86/// in the reactive graph.
87#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
88pub struct SignalId(String);
89
90impl SignalId {
91 /// Create a new signal ID from a string.
92 pub fn new(id: impl Into<String>) -> Self {
93 Self(id.into())
94 }
95
96 /// Create a unique signal ID with an auto-generated suffix.
97 pub fn unique(prefix: &str) -> Self {
98 static COUNTER: AtomicU64 = AtomicU64::new(0);
99 let id = COUNTER.fetch_add(1, Ordering::SeqCst);
100 Self(format!("{prefix}:{id}"))
101 }
102
103 /// Get the string representation.
104 pub fn as_str(&self) -> &str {
105 &self.0
106 }
107}
108
109impl fmt::Display for SignalId {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 write!(f, "{}", self.0)
112 }
113}
114
115impl From<&str> for SignalId {
116 fn from(s: &str) -> Self {
117 Self::new(s)
118 }
119}
120
121impl From<String> for SignalId {
122 fn from(s: String) -> Self {
123 Self::new(s)
124 }
125}
126
127/// A typed signal representing a time-varying value.
128///
129/// Signals are the core primitive of the reactive system. They represent
130/// values that change over time and can be:
131/// - Read (get current value)
132/// - Emitted to (update value)
133/// - Subscribed to (receive change notifications)
134/// - Derived from (computed from other signals)
135///
136/// The type parameter T is phantom - actual storage is handled by the
137/// reactive handler, enabling type-safe access without runtime type tags.
138#[derive(Debug)]
139pub struct Signal<T> {
140 /// Unique identifier for this signal
141 id: SignalId,
142 /// Phantom type marker
143 _phantom: PhantomData<T>,
144}
145
146impl<T> Signal<T> {
147 /// Create a new signal with the given ID.
148 pub fn new(id: impl Into<SignalId>) -> Self {
149 Self {
150 id: id.into(),
151 _phantom: PhantomData,
152 }
153 }
154
155 /// Create a signal with a unique auto-generated ID.
156 pub fn unique(prefix: &str) -> Self {
157 Self {
158 id: SignalId::unique(prefix),
159 _phantom: PhantomData,
160 }
161 }
162
163 /// Get the signal's ID.
164 pub fn id(&self) -> &SignalId {
165 &self.id
166 }
167}
168
169impl<T> Clone for Signal<T> {
170 fn clone(&self) -> Self {
171 Self {
172 id: self.id.clone(),
173 _phantom: PhantomData,
174 }
175 }
176}
177
178impl<T> PartialEq for Signal<T> {
179 fn eq(&self, other: &Self) -> bool {
180 self.id == other.id
181 }
182}
183
184impl<T> Eq for Signal<T> {}
185
186impl<T> Hash for Signal<T> {
187 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
188 self.id.hash(state);
189 }
190}
191
192// ─────────────────────────────────────────────────────────────────────────────
193// Signal Stream
194// ─────────────────────────────────────────────────────────────────────────────
195
196/// A stream of signal value changes.
197///
198/// SignalStream wraps a broadcast receiver and provides filtering capabilities.
199/// It implements async iteration for use in event loops.
200#[allow(clippy::type_complexity)]
201pub struct SignalStream<T: Clone> {
202 /// The underlying broadcast receiver
203 receiver: broadcast::Receiver<T>,
204 /// Optional filter predicate
205 filter: Option<Box<dyn Fn(&T) -> bool + Send + Sync>>,
206 /// Signal ID for error reporting
207 signal_id: SignalId,
208}
209
210impl<T: Clone> SignalStream<T> {
211 /// Create a new signal stream.
212 pub fn new(receiver: broadcast::Receiver<T>, signal_id: SignalId) -> Self {
213 Self {
214 receiver,
215 filter: None,
216 signal_id,
217 }
218 }
219
220 /// Add a filter to this stream.
221 ///
222 /// Only values matching the predicate will be yielded.
223 pub fn filter<F>(mut self, predicate: F) -> Self
224 where
225 F: Fn(&T) -> bool + Send + Sync + 'static,
226 {
227 self.filter = Some(Box::new(predicate));
228 self
229 }
230
231 /// Try to receive the next value without blocking.
232 ///
233 /// Returns `None` if no value is available or the channel is closed.
234 pub fn try_recv(&mut self) -> Option<T> {
235 loop {
236 match self.receiver.try_recv() {
237 Ok(value) => {
238 if let Some(ref filter) = self.filter {
239 if filter(&value) {
240 return Some(value);
241 }
242 // Value didn't match filter, try again
243 continue;
244 }
245 return Some(value);
246 }
247 Err(_) => return None,
248 }
249 }
250 }
251
252 /// Receive the next value, waiting if necessary.
253 ///
254 /// Returns an error if the channel is closed.
255 ///
256 /// Signal subscriptions are eventually consistent rather than lossless:
257 /// if a subscriber lags behind the broadcast buffer under sustained load,
258 /// intermediate values may be dropped and the next received value will be a
259 /// newer snapshot. Handlers are expected to log lag events so the loss is
260 /// diagnosable.
261 pub async fn recv(&mut self) -> Result<T, ReactiveError> {
262 loop {
263 match self.receiver.recv().await {
264 Ok(value) => {
265 if let Some(ref filter) = self.filter {
266 if filter(&value) {
267 return Ok(value);
268 }
269 // Value didn't match filter, try again
270 continue;
271 }
272 return Ok(value);
273 }
274 Err(broadcast::error::RecvError::Closed) => {
275 return Err(ReactiveError::SubscriptionClosed {
276 id: self.signal_id.to_string(),
277 });
278 }
279 Err(broadcast::error::RecvError::Lagged(skipped)) => {
280 tracing::warn!(
281 signal_id = %self.signal_id,
282 skipped,
283 "signal stream lagged; continuing with a newer snapshot"
284 );
285 continue;
286 }
287 }
288 }
289 }
290}
291
292// ─────────────────────────────────────────────────────────────────────────────
293// Reactive Effects Trait
294// ─────────────────────────────────────────────────────────────────────────────
295
296/// Reactive effects for FRP-style state management.
297///
298/// This trait unifies the effect system with functional reactive programming.
299/// All state access and mutation goes through these effects, enabling:
300/// - Transparent reactivity (handlers track dependencies)
301/// - Effect composition (reactive ops compose with other effects)
302/// - Testability (mock handlers for deterministic testing)
303///
304/// # Example
305///
306/// ```ignore
307/// // Reading a signal
308/// let chat_state = effects.read(&CHAT_SIGNAL).await?;
309///
310/// // Emitting to a signal
311/// effects.emit(&CHAT_SIGNAL, new_state).await?;
312///
313/// // Subscribing to changes
314/// let mut stream = effects.subscribe(&CHAT_SIGNAL);
315/// while let Ok(value) = stream.recv().await {
316/// println!("Chat updated: {:?}", value);
317/// }
318/// ```
319#[async_trait]
320pub trait ReactiveEffects: Send + Sync {
321 /// Read the current value of a signal.
322 ///
323 /// This is a point-in-time snapshot. For continuous updates, use `subscribe`.
324 ///
325 /// # Errors
326 ///
327 /// Returns `ReactiveError::SignalNotFound` if the signal doesn't exist.
328 async fn read<T>(&self, signal: &Signal<T>) -> Result<T, ReactiveError>
329 where
330 T: Clone + Send + Sync + 'static;
331
332 /// Emit a new value to a signal.
333 ///
334 /// This updates the signal's value and notifies all subscribers.
335 /// Derived signals that depend on this signal are also updated.
336 ///
337 /// # Errors
338 ///
339 /// Returns `ReactiveError::SignalNotFound` if the signal doesn't exist.
340 async fn emit<T>(&self, signal: &Signal<T>, value: T) -> Result<(), ReactiveError>
341 where
342 T: Clone + Send + Sync + 'static;
343
344 /// Subscribe to signal changes.
345 ///
346 /// Returns a stream that yields values when the signal changes.
347 /// The stream can be filtered to only receive specific updates.
348 ///
349 /// # Errors
350 ///
351 /// Returns `ReactiveError::SignalNotFound` if the signal has not been
352 /// registered yet. Callers must handle this rather than assuming an empty
353 /// stream means "no updates". There is no implicit registration wait.
354 ///
355 /// Subscription delivery is eventually consistent rather than lossless.
356 /// If a subscriber falls behind the bounded broadcast buffer under
357 /// sustained load, intermediate values may be dropped and the stream will
358 /// resume from a newer snapshot after logging the lag event.
359 fn subscribe<T>(&self, signal: &Signal<T>) -> Result<SignalStream<T>, ReactiveError>
360 where
361 T: Clone + Send + Sync + 'static;
362
363 /// Register a signal with an initial value.
364 ///
365 /// This creates the signal in the reactive graph. Signals must be
366 /// registered before they can be read, emitted to, or subscribed.
367 ///
368 /// # Errors
369 ///
370 /// Returns an error if a signal with the same ID already exists.
371 async fn register<T>(&self, signal: &Signal<T>, initial: T) -> Result<(), ReactiveError>
372 where
373 T: Clone + Send + Sync + 'static;
374
375 /// Check if a signal is registered.
376 fn is_registered(&self, signal_id: &SignalId) -> bool;
377
378 /// Register a signal bound to a query.
379 ///
380 /// This creates a signal whose value is derived from executing the query.
381 /// When facts matching the query's `dependencies()` change, the query is
382 /// automatically re-evaluated and the signal is updated.
383 ///
384 /// # Query-Signal Flow
385 ///
386 /// ```text
387 /// Facts Change → Check dependencies() → Re-execute query → Emit to signal
388 /// ```
389 ///
390 /// # Example
391 ///
392 /// ```ignore
393 /// use aura_app::queries::ChannelsQuery;
394 ///
395 /// // Register a signal bound to a query
396 /// effects.register_query(&CHANNELS_SIGNAL, ChannelsQuery::default()).await?;
397 ///
398 /// // Signal automatically updates when channel facts change
399 /// let channels = effects.read(&CHANNELS_SIGNAL).await?;
400 /// ```
401 ///
402 /// # Errors
403 ///
404 /// Returns an error if the signal already exists or query binding fails.
405 async fn register_query<Q: Query>(
406 &self,
407 signal: &Signal<Q::Result>,
408 query: Q,
409 ) -> Result<(), ReactiveError>;
410
411 /// Get the fact predicates that a signal depends on.
412 ///
413 /// Returns `None` if the signal is not bound to a query.
414 /// Returns `Some(predicates)` if the signal was registered with `register_query`.
415 fn query_dependencies(&self, signal_id: &SignalId) -> Option<Vec<FactPredicate>>;
416
417 /// Notify that facts matching a predicate have changed.
418 ///
419 /// This triggers re-evaluation of all query-bound signals whose
420 /// dependencies intersect with the changed predicate.
421 ///
422 /// Called by `JournalEffects` when facts are committed.
423 async fn invalidate_queries(&self, changed: &FactPredicate);
424}
425
426/// Extension trait for derived signals.
427///
428/// This trait provides combinators for creating derived signals that
429/// automatically update when their sources change.
430#[async_trait]
431pub trait ReactiveDeriveEffects: ReactiveEffects {
432 /// Create a derived signal that maps values from a source signal.
433 ///
434 /// The derived signal automatically updates when the source changes.
435 ///
436 /// # Example
437 ///
438 /// ```ignore
439 /// let unread_count = effects.map(&CHAT_SIGNAL, |chat| {
440 /// chat.channels.iter().map(|c| c.unread_count).sum()
441 /// });
442 /// ```
443 async fn map<A, B, F>(&self, source: &Signal<A>, f: F) -> Result<Signal<B>, ReactiveError>
444 where
445 A: Clone + Send + Sync + 'static,
446 B: Clone + Send + Sync + 'static,
447 F: Fn(A) -> B + Send + Sync + 'static;
448
449 /// Create a derived signal that combines two source signals.
450 ///
451 /// The derived signal updates when either source changes.
452 ///
453 /// # Example
454 ///
455 /// ```ignore
456 /// let status = effects.combine(&CONNECTION_SIGNAL, &SYNC_SIGNAL, |conn, sync| {
457 /// format!("Connected: {}, Syncing: {}", conn.is_connected, sync.in_progress)
458 /// });
459 /// ```
460 async fn combine<A, B, C, F>(
461 &self,
462 a: &Signal<A>,
463 b: &Signal<B>,
464 f: F,
465 ) -> Result<Signal<C>, ReactiveError>
466 where
467 A: Clone + Send + Sync + 'static,
468 B: Clone + Send + Sync + 'static,
469 C: Clone + Send + Sync + 'static,
470 F: Fn(A, B) -> C + Send + Sync + 'static;
471
472 /// Create a derived signal that filters and maps values.
473 ///
474 /// Only emits when the filter returns Some.
475 async fn filter_map<A, B, F>(
476 &self,
477 source: &Signal<A>,
478 f: F,
479 ) -> Result<Signal<Option<B>>, ReactiveError>
480 where
481 A: Clone + Send + Sync + 'static,
482 B: Clone + Send + Sync + 'static,
483 F: Fn(A) -> Option<B> + Send + Sync + 'static;
484}
485
486// ─────────────────────────────────────────────────────────────────────────────
487// Blanket Implementations
488// ─────────────────────────────────────────────────────────────────────────────
489
490/// Blanket implementation for Arc<T> where T: ReactiveEffects
491#[async_trait]
492impl<T: ReactiveEffects + ?Sized> ReactiveEffects for Arc<T> {
493 async fn read<V>(&self, signal: &Signal<V>) -> Result<V, ReactiveError>
494 where
495 V: Clone + Send + Sync + 'static,
496 {
497 (**self).read(signal).await
498 }
499
500 async fn emit<V>(&self, signal: &Signal<V>, value: V) -> Result<(), ReactiveError>
501 where
502 V: Clone + Send + Sync + 'static,
503 {
504 (**self).emit(signal, value).await
505 }
506
507 fn subscribe<V>(&self, signal: &Signal<V>) -> Result<SignalStream<V>, ReactiveError>
508 where
509 V: Clone + Send + Sync + 'static,
510 {
511 (**self).subscribe(signal)
512 }
513
514 async fn register<V>(&self, signal: &Signal<V>, initial: V) -> Result<(), ReactiveError>
515 where
516 V: Clone + Send + Sync + 'static,
517 {
518 (**self).register(signal, initial).await
519 }
520
521 fn is_registered(&self, signal_id: &SignalId) -> bool {
522 (**self).is_registered(signal_id)
523 }
524
525 async fn register_query<Q: Query>(
526 &self,
527 signal: &Signal<Q::Result>,
528 query: Q,
529 ) -> Result<(), ReactiveError> {
530 (**self).register_query(signal, query).await
531 }
532
533 fn query_dependencies(&self, signal_id: &SignalId) -> Option<Vec<FactPredicate>> {
534 (**self).query_dependencies(signal_id)
535 }
536
537 async fn invalidate_queries(&self, changed: &FactPredicate) {
538 (**self).invalidate_queries(changed).await;
539 }
540}
541
542// ─────────────────────────────────────────────────────────────────────────────
543// Tests
544// ─────────────────────────────────────────────────────────────────────────────
545
546#[cfg(test)]
547mod tests {
548 use super::*;
549
550 #[test]
551 fn test_signal_id_creation() {
552 let id1 = SignalId::new("chat");
553 assert_eq!(id1.as_str(), "chat");
554
555 let id2 = SignalId::from("recovery");
556 assert_eq!(id2.as_str(), "recovery");
557 }
558
559 #[test]
560 fn test_signal_id_unique() {
561 let id1 = SignalId::unique("test");
562 let id2 = SignalId::unique("test");
563 assert_ne!(id1, id2);
564 }
565
566 #[test]
567 fn test_signal_creation() {
568 let signal: Signal<String> = Signal::new("chat");
569 assert_eq!(signal.id().as_str(), "chat");
570 }
571
572 #[test]
573 fn test_signal_clone() {
574 let signal1: Signal<u32> = Signal::new("counter");
575 let signal2 = signal1.clone();
576 assert_eq!(signal1.id(), signal2.id());
577 }
578
579 #[test]
580 fn test_signal_equality() {
581 let signal1: Signal<String> = Signal::new("chat");
582 let signal2: Signal<String> = Signal::new("chat");
583 let signal3: Signal<String> = Signal::new("recovery");
584
585 assert_eq!(signal1, signal2);
586 assert_ne!(signal1, signal3);
587 }
588
589 #[test]
590 fn test_reactive_error_display() {
591 let err = ReactiveError::SignalNotFound {
592 id: "chat".to_string(),
593 };
594 assert!(err.to_string().contains("chat"));
595
596 let err = ReactiveError::TypeMismatch {
597 id: "counter".to_string(),
598 expected: "u32".to_string(),
599 actual: "String".to_string(),
600 };
601 assert!(err.to_string().contains("counter"));
602 assert!(err.to_string().contains("u32"));
603 }
604}