Skip to main content

drasi_lib/channels/
dispatcher.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::fmt::Debug;
5use std::sync::Arc;
6use tokio::sync::{broadcast, mpsc};
7
8/// Event routing mode for distributing changes to subscribers
9///
10/// `DispatchMode` determines how events are routed from sources to queries and from
11/// queries to reactions. It affects memory usage, throughput, and fanout behavior.
12///
13/// # Modes
14///
15/// ## Broadcast Mode
16///
17/// Uses a single shared channel with multiple receivers (1-to-N fanout):
18///
19/// ```text
20/// Source → [Broadcast Channel] → Query 1
21///                              → Query 2
22///                              → Query 3
23/// ```
24///
25/// **Advantages**:
26/// - Lower memory usage (one copy of each event)
27/// - Better for high-fanout scenarios (many subscribers)
28/// - Automatic backpressure when slowest subscriber lags
29///
30/// **Disadvantages**:
31/// - All subscribers receive all events (no filtering)
32/// - Slowest subscriber can slow down entire system
33/// - Events may be dropped if buffers fill
34///
35/// ## Channel Mode (Default)
36///
37/// Uses dedicated channels per subscriber (1-to-1):
38///
39/// ```text
40/// Source → [Channel 1] → Query 1
41///       → [Channel 2] → Query 2
42///       → [Channel 3] → Query 3
43/// ```
44///
45/// **Advantages**:
46/// - Subscribers process independently
47/// - Slow subscriber doesn't affect others
48/// - More predictable behavior
49///
50/// **Disadvantages**:
51/// - Higher memory usage (one copy per subscriber)
52/// - More overhead for high-fanout scenarios
53///
54/// # Configuration
55///
56/// Set in YAML configuration or via builder API:
57///
58/// ```yaml
59/// sources:
60///   - id: data_source
61///     source_type: postgres
62///     dispatch_mode: broadcast  # or channel (default)
63///
64/// queries:
65///   - id: my_query
66///     query: "MATCH (n) RETURN n"
67///     sources: [data_source]
68///     dispatch_mode: channel
69/// ```
70///
71/// # Choosing a Mode
72///
73/// **Use Broadcast when**:
74/// - High fanout (10+ subscribers)
75/// - All subscribers need all events
76/// - Memory is constrained
77/// - All subscribers process at similar speeds
78///
79/// **Use Channel (default) when**:
80/// - Few subscribers (1-5)
81/// - Subscribers have different processing speeds
82/// - Isolation between subscribers is important
83/// - Memory is not constrained
84///
85/// # Examples
86///
87/// ## Builder API Configuration
88///
89/// ```no_run
90/// use drasi_lib::{DrasiLib, Query, DispatchMode};
91///
92/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
93/// // Sources are now instance-based - create them externally and use .with_source()
94/// let core = DrasiLib::builder()
95///     // .with_source(my_source_instance)
96///     .with_query(
97///         Query::cypher("active_orders")
98///             .query("MATCH (o:Order) WHERE o.status = 'active' RETURN o")
99///             .from_source("orders_db")
100///             .with_dispatch_mode(DispatchMode::Channel)    // Default, independent processing
101///             .build()
102///     )
103///     .build()
104///     .await?;
105/// # Ok(())
106/// # }
107/// ```
108///
109/// ## High-Fanout Scenario (Use Broadcast)
110///
111/// ```yaml
112/// sources:
113///   - id: event_stream
114///     source_type: http
115///     host: localhost
116///     port: 8080
117///     dispatch_mode: broadcast  # Many queries subscribe to this source
118///
119/// queries:
120///   - id: query1
121///     query: "MATCH (n:Type1) RETURN n"
122///     sources: [event_stream]
123///   - id: query2
124///     query: "MATCH (n:Type2) RETURN n"
125///     sources: [event_stream]
126///   # ... 20 more queries subscribing to event_stream
127/// ```
128///
129/// ## Independent Processing (Use Channel)
130///
131/// ```yaml
132/// sources:
133///   - id: sensor_data
134///     source_type: mock
135///     dispatch_mode: channel  # Default - each query processes independently
136///
137/// queries:
138///   - id: real_time_alerts
139///     query: "MATCH (s:Sensor) WHERE s.value > 100 RETURN s"
140///     sources: [sensor_data]
141///     # Fast processing
142///
143///   - id: historical_analysis
144///     query: "MATCH (s:Sensor) RETURN s"
145///     sources: [sensor_data]
146///     # Slow processing - won't affect real_time_alerts
147/// ```
148///
149/// # Performance Considerations
150///
151/// **Broadcast Memory Usage**: `O(buffer_size)` - single buffer shared
152/// **Channel Memory Usage**: `O(buffer_size * subscribers)` - buffer per subscriber
153///
154/// For 10 subscribers with 1000-event buffers:
155/// - Broadcast: ~1,000 events in memory
156/// - Channel: ~10,000 events in memory
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
158#[serde(rename_all = "lowercase")]
159pub enum DispatchMode {
160    /// Broadcast mode: single channel with multiple receivers (1-to-N fanout)
161    Broadcast,
162    /// Channel mode: dedicated channel per subscriber (1-to-1)
163    #[default]
164    Channel,
165}
166
167/// Trait for dispatching changes to subscribers
168#[async_trait]
169pub trait ChangeDispatcher<T>: Send + Sync
170where
171    T: Clone + Send + Sync + 'static,
172{
173    /// Dispatch a single change to all subscribers
174    async fn dispatch_change(&self, change: Arc<T>) -> Result<()>;
175
176    /// Dispatch multiple changes to all subscribers
177    async fn dispatch_changes(&self, changes: Vec<Arc<T>>) -> Result<()> {
178        for change in changes {
179            self.dispatch_change(change).await?;
180        }
181        Ok(())
182    }
183
184    /// Create a new receiver for this dispatcher
185    async fn create_receiver(&self) -> Result<Box<dyn ChangeReceiver<T>>>;
186}
187
188/// Trait for receiving changes from a dispatcher
189#[async_trait]
190pub trait ChangeReceiver<T>: Send + Sync
191where
192    T: Clone + Send + Sync + 'static,
193{
194    /// Receive the next change
195    async fn recv(&mut self) -> Result<Arc<T>>;
196}
197
198/// Broadcast-based implementation of ChangeDispatcher
199pub struct BroadcastChangeDispatcher<T>
200where
201    T: Clone + Send + Sync + 'static,
202{
203    tx: broadcast::Sender<Arc<T>>,
204    _capacity: usize,
205}
206
207impl<T> BroadcastChangeDispatcher<T>
208where
209    T: Clone + Send + Sync + 'static,
210{
211    /// Create a new broadcast dispatcher with specified capacity
212    pub fn new(capacity: usize) -> Self {
213        let (tx, _) = broadcast::channel(capacity);
214        Self {
215            tx,
216            _capacity: capacity,
217        }
218    }
219}
220
221#[async_trait]
222impl<T> ChangeDispatcher<T> for BroadcastChangeDispatcher<T>
223where
224    T: Clone + Send + Sync + 'static,
225{
226    async fn dispatch_change(&self, change: Arc<T>) -> Result<()> {
227        // Ignore send errors if there are no receivers
228        let _ = self.tx.send(change);
229        Ok(())
230    }
231
232    async fn create_receiver(&self) -> Result<Box<dyn ChangeReceiver<T>>> {
233        let rx = self.tx.subscribe();
234        Ok(Box::new(BroadcastChangeReceiver { rx }))
235    }
236}
237
238/// Broadcast-based implementation of ChangeReceiver
239pub struct BroadcastChangeReceiver<T>
240where
241    T: Clone + Send + Sync + 'static,
242{
243    rx: broadcast::Receiver<Arc<T>>,
244}
245
246#[async_trait]
247impl<T> ChangeReceiver<T> for BroadcastChangeReceiver<T>
248where
249    T: Clone + Send + Sync + 'static,
250{
251    async fn recv(&mut self) -> Result<Arc<T>> {
252        loop {
253            match self.rx.recv().await {
254                Ok(change) => return Ok(change),
255                Err(broadcast::error::RecvError::Closed) => {
256                    return Err(anyhow::anyhow!("Broadcast channel closed"));
257                }
258                Err(broadcast::error::RecvError::Lagged(n)) => {
259                    log::warn!("Broadcast receiver lagged by {n} messages");
260                    // Continue the loop to receive the next available message
261                }
262            }
263        }
264    }
265}
266
267/// Channel-based (MPSC) implementation of ChangeDispatcher
268pub struct ChannelChangeDispatcher<T>
269where
270    T: Clone + Send + Sync + 'static,
271{
272    tx: mpsc::Sender<Arc<T>>,
273    rx: Arc<tokio::sync::Mutex<Option<mpsc::Receiver<Arc<T>>>>>,
274    _capacity: usize,
275}
276
277impl<T> ChannelChangeDispatcher<T>
278where
279    T: Clone + Send + Sync + 'static,
280{
281    /// Create a new channel dispatcher with specified capacity
282    pub fn new(capacity: usize) -> Self {
283        let (tx, rx) = mpsc::channel(capacity);
284        Self {
285            tx,
286            rx: Arc::new(tokio::sync::Mutex::new(Some(rx))),
287            _capacity: capacity,
288        }
289    }
290}
291
292#[async_trait]
293impl<T> ChangeDispatcher<T> for ChannelChangeDispatcher<T>
294where
295    T: Clone + Send + Sync + 'static,
296{
297    async fn dispatch_change(&self, change: Arc<T>) -> Result<()> {
298        self.tx
299            .send(change)
300            .await
301            .map_err(|_| anyhow::anyhow!("Failed to send on channel"))?;
302        Ok(())
303    }
304
305    async fn create_receiver(&self) -> Result<Box<dyn ChangeReceiver<T>>> {
306        // For channel mode, we can only create one receiver
307        // Take the receiver out of the option
308        let mut rx_opt = self.rx.lock().await;
309        let rx = rx_opt.take().ok_or_else(|| {
310            anyhow::anyhow!("Receiver already created for this channel dispatcher")
311        })?;
312        Ok(Box::new(ChannelChangeReceiver { rx }))
313    }
314}
315
316/// Channel-based (MPSC) implementation of ChangeReceiver
317pub struct ChannelChangeReceiver<T>
318where
319    T: Clone + Send + Sync + 'static,
320{
321    rx: mpsc::Receiver<Arc<T>>,
322}
323
324#[async_trait]
325impl<T> ChangeReceiver<T> for ChannelChangeReceiver<T>
326where
327    T: Clone + Send + Sync + 'static,
328{
329    async fn recv(&mut self) -> Result<Arc<T>> {
330        self.rx
331            .recv()
332            .await
333            .ok_or_else(|| anyhow::anyhow!("Channel closed"))
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    #[derive(Clone, Debug, PartialEq)]
341    struct TestMessage {
342        id: u32,
343        content: String,
344    }
345
346    #[tokio::test]
347    async fn test_broadcast_dispatcher_single_receiver() {
348        let dispatcher = BroadcastChangeDispatcher::<TestMessage>::new(100);
349        let mut receiver = dispatcher.create_receiver().await.unwrap();
350
351        let msg = Arc::new(TestMessage {
352            id: 1,
353            content: "test".to_string(),
354        });
355
356        dispatcher.dispatch_change(msg.clone()).await.unwrap();
357
358        let received = receiver.recv().await.unwrap();
359        assert_eq!(*received, *msg);
360    }
361
362    #[tokio::test]
363    async fn test_broadcast_dispatcher_multiple_receivers() {
364        let dispatcher = BroadcastChangeDispatcher::<TestMessage>::new(100);
365        let mut receiver1 = dispatcher.create_receiver().await.unwrap();
366        let mut receiver2 = dispatcher.create_receiver().await.unwrap();
367
368        let msg = Arc::new(TestMessage {
369            id: 1,
370            content: "broadcast".to_string(),
371        });
372
373        dispatcher.dispatch_change(msg.clone()).await.unwrap();
374
375        let received1 = receiver1.recv().await.unwrap();
376        let received2 = receiver2.recv().await.unwrap();
377
378        assert_eq!(*received1, *msg);
379        assert_eq!(*received2, *msg);
380    }
381
382    #[tokio::test]
383    async fn test_broadcast_dispatcher_dispatch_changes() {
384        let dispatcher = BroadcastChangeDispatcher::<TestMessage>::new(100);
385        let mut receiver = dispatcher.create_receiver().await.unwrap();
386
387        let messages = vec![
388            Arc::new(TestMessage {
389                id: 1,
390                content: "first".to_string(),
391            }),
392            Arc::new(TestMessage {
393                id: 2,
394                content: "second".to_string(),
395            }),
396            Arc::new(TestMessage {
397                id: 3,
398                content: "third".to_string(),
399            }),
400        ];
401
402        dispatcher.dispatch_changes(messages.clone()).await.unwrap();
403
404        for expected in messages {
405            let received = receiver.recv().await.unwrap();
406            assert_eq!(*received, *expected);
407        }
408    }
409
410    #[tokio::test]
411    async fn test_channel_dispatcher_single_receiver() {
412        let dispatcher = ChannelChangeDispatcher::<TestMessage>::new(100);
413        let mut receiver = dispatcher.create_receiver().await.unwrap();
414
415        let msg = Arc::new(TestMessage {
416            id: 1,
417            content: "channel".to_string(),
418        });
419
420        dispatcher.dispatch_change(msg.clone()).await.unwrap();
421
422        let received = receiver.recv().await.unwrap();
423        assert_eq!(*received, *msg);
424    }
425
426    #[tokio::test]
427    async fn test_channel_dispatcher_only_one_receiver() {
428        let dispatcher = ChannelChangeDispatcher::<TestMessage>::new(100);
429        let _receiver1 = dispatcher.create_receiver().await.unwrap();
430
431        // Attempting to create a second receiver should fail
432        let result = dispatcher.create_receiver().await;
433        assert!(result.is_err());
434        if let Err(e) = result {
435            assert!(e.to_string().contains("Receiver already created"));
436        }
437    }
438
439    #[tokio::test]
440    async fn test_channel_dispatcher_dispatch_changes() {
441        let dispatcher = ChannelChangeDispatcher::<TestMessage>::new(100);
442        let mut receiver = dispatcher.create_receiver().await.unwrap();
443
444        let messages = vec![
445            Arc::new(TestMessage {
446                id: 1,
447                content: "first".to_string(),
448            }),
449            Arc::new(TestMessage {
450                id: 2,
451                content: "second".to_string(),
452            }),
453        ];
454
455        dispatcher.dispatch_changes(messages.clone()).await.unwrap();
456
457        for expected in messages {
458            let received = receiver.recv().await.unwrap();
459            assert_eq!(*received, *expected);
460        }
461    }
462
463    #[tokio::test]
464    async fn test_broadcast_receiver_handles_lag() {
465        // Create a small capacity broadcaster to force lag
466        let dispatcher = BroadcastChangeDispatcher::<TestMessage>::new(2);
467        let mut receiver = dispatcher.create_receiver().await.unwrap();
468
469        // Send more messages than capacity without reading
470        for i in 0..5 {
471            let msg = Arc::new(TestMessage {
472                id: i,
473                content: format!("msg{i}"),
474            });
475            dispatcher.dispatch_change(msg).await.unwrap();
476        }
477
478        // Give some time for messages to accumulate
479        tokio::task::yield_now().await;
480
481        // Try to receive - should handle lag and continue
482        let result = receiver.recv().await;
483        assert!(result.is_ok());
484    }
485
486    #[tokio::test]
487    async fn test_dispatch_mode_default() {
488        assert_eq!(DispatchMode::default(), DispatchMode::Channel);
489    }
490
491    #[tokio::test]
492    async fn test_dispatch_mode_serialization() {
493        let mode = DispatchMode::Broadcast;
494        let json = serde_json::to_string(&mode).unwrap();
495        assert_eq!(json, "\"broadcast\"");
496
497        let deserialized: DispatchMode = serde_json::from_str(&json).unwrap();
498        assert_eq!(deserialized, DispatchMode::Broadcast);
499
500        let mode = DispatchMode::Channel;
501        let json = serde_json::to_string(&mode).unwrap();
502        assert_eq!(json, "\"channel\"");
503
504        let deserialized: DispatchMode = serde_json::from_str(&json).unwrap();
505        assert_eq!(deserialized, DispatchMode::Channel);
506    }
507}