Skip to main content

drasi_lib/channels/
dispatcher.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use serde::{Deserialize, Serialize};
4use std::fmt::Debug;
5use std::sync::Arc;
6use tokio::sync::{broadcast, mpsc};
7
8/// Event routing mode for distributing changes to subscribers
9///
10/// `DispatchMode` determines how events are routed from sources to queries and from
11/// queries to reactions. It affects memory usage, throughput, and fanout behavior.
12///
13/// # Modes
14///
15/// ## Broadcast Mode
16///
17/// Uses a single shared channel with multiple receivers (1-to-N fanout):
18///
19/// ```text
20/// Source → [Broadcast Channel] → Query 1
21///                              → Query 2
22///                              → Query 3
23/// ```
24///
25/// **Advantages**:
26/// - Lower memory usage (one copy of each event)
27/// - Better for high-fanout scenarios (many subscribers)
28/// - Automatic backpressure when slowest subscriber lags
29///
30/// **Disadvantages**:
31/// - All subscribers receive all events (no filtering)
32/// - Slowest subscriber can slow down entire system
33/// - Events may be dropped if buffers fill
34///
35/// ## Channel Mode (Default)
36///
37/// Uses dedicated channels per subscriber (1-to-1):
38///
39/// ```text
40/// Source → [Channel 1] → Query 1
41///       → [Channel 2] → Query 2
42///       → [Channel 3] → Query 3
43/// ```
44///
45/// **Advantages**:
46/// - Subscribers process independently
47/// - Slow subscriber doesn't affect others
48/// - More predictable behavior
49///
50/// **Disadvantages**:
51/// - Higher memory usage (one copy per subscriber)
52/// - More overhead for high-fanout scenarios
53///
54/// # Configuration
55///
56/// Set in YAML configuration or via builder API:
57///
58/// ```yaml
59/// sources:
60///   - id: data_source
61///     source_type: postgres
62///     dispatch_mode: broadcast  # or channel (default)
63///
64/// queries:
65///   - id: my_query
66///     query: "MATCH (n) RETURN n"
67///     sources: [data_source]
68///     dispatch_mode: channel
69/// ```
70///
71/// # Choosing a Mode
72///
73/// **Use Broadcast when**:
74/// - High fanout (10+ subscribers)
75/// - All subscribers need all events
76/// - Memory is constrained
77/// - All subscribers process at similar speeds
78///
79/// **Use Channel (default) when**:
80/// - Few subscribers (1-5)
81/// - Subscribers have different processing speeds
82/// - Isolation between subscribers is important
83/// - Memory is not constrained
84///
85/// # Examples
86///
87/// ## Builder API Configuration
88///
89/// ```no_run
90/// use drasi_lib::{DrasiLib, Query, DispatchMode};
91///
92/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
93/// // Sources are now instance-based - create them externally and use .with_source()
94/// let core = DrasiLib::builder()
95///     // .with_source(my_source_instance)
96///     .with_query(
97///         Query::cypher("active_orders")
98///             .query("MATCH (o:Order) WHERE o.status = 'active' RETURN o")
99///             .from_source("orders_db")
100///             .with_dispatch_mode(DispatchMode::Channel)    // Default, independent processing
101///             .build()
102///     )
103///     .build()
104///     .await?;
105/// # Ok(())
106/// # }
107/// ```
108///
109/// ## High-Fanout Scenario (Use Broadcast)
110///
111/// ```yaml
112/// sources:
113///   - id: event_stream
114///     source_type: http
115///     host: localhost
116///     port: 8080
117///     dispatch_mode: broadcast  # Many queries subscribe to this source
118///
119/// queries:
120///   - id: query1
121///     query: "MATCH (n:Type1) RETURN n"
122///     sources: [event_stream]
123///   - id: query2
124///     query: "MATCH (n:Type2) RETURN n"
125///     sources: [event_stream]
126///   # ... 20 more queries subscribing to event_stream
127/// ```
128///
129/// ## Independent Processing (Use Channel)
130///
131/// ```yaml
132/// sources:
133///   - id: sensor_data
134///     source_type: mock
135///     dispatch_mode: channel  # Default - each query processes independently
136///
137/// queries:
138///   - id: real_time_alerts
139///     query: "MATCH (s:Sensor) WHERE s.value > 100 RETURN s"
140///     sources: [sensor_data]
141///     # Fast processing
142///
143///   - id: historical_analysis
144///     query: "MATCH (s:Sensor) RETURN s"
145///     sources: [sensor_data]
146///     # Slow processing - won't affect real_time_alerts
147/// ```
148///
149/// # Performance Considerations
150///
151/// **Broadcast Memory Usage**: `O(buffer_size)` - single buffer shared
152/// **Channel Memory Usage**: `O(buffer_size * subscribers)` - buffer per subscriber
153///
154/// For 10 subscribers with 1000-event buffers:
155/// - Broadcast: ~1,000 events in memory
156/// - Channel: ~10,000 events in memory
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
158#[serde(rename_all = "lowercase")]
159pub enum DispatchMode {
160    /// Broadcast mode: single channel with multiple receivers (1-to-N fanout)
161    Broadcast,
162    /// Channel mode: dedicated channel per subscriber (1-to-1)
163    #[default]
164    Channel,
165}
166
167/// Trait for dispatching changes to subscribers
168#[async_trait]
169pub trait ChangeDispatcher<T>: Send + Sync
170where
171    T: Clone + Send + Sync + 'static,
172{
173    /// Dispatch a single change to all subscribers
174    async fn dispatch_change(&self, change: Arc<T>) -> Result<()>;
175
176    /// Dispatch multiple changes to all subscribers
177    async fn dispatch_changes(&self, changes: Vec<Arc<T>>) -> Result<()> {
178        for change in changes {
179            self.dispatch_change(change).await?;
180        }
181        Ok(())
182    }
183
184    /// Create a new receiver for this dispatcher
185    async fn create_receiver(&self) -> Result<Box<dyn ChangeReceiver<T>>>;
186}
187
188/// Trait for receiving changes from a dispatcher
189#[async_trait]
190pub trait ChangeReceiver<T>: Send + Sync
191where
192    T: Clone + Send + Sync + 'static,
193{
194    /// Receive the next change
195    async fn recv(&mut self) -> Result<Arc<T>>;
196}
197
198/// Broadcast-based implementation of ChangeDispatcher
199pub struct BroadcastChangeDispatcher<T>
200where
201    T: Clone + Send + Sync + 'static,
202{
203    tx: broadcast::Sender<Arc<T>>,
204    _capacity: usize,
205}
206
207impl<T> BroadcastChangeDispatcher<T>
208where
209    T: Clone + Send + Sync + 'static,
210{
211    /// Create a new broadcast dispatcher with specified capacity
212    pub fn new(capacity: usize) -> Self {
213        let (tx, _) = broadcast::channel(capacity);
214        Self {
215            tx,
216            _capacity: capacity,
217        }
218    }
219}
220
221#[async_trait]
222impl<T> ChangeDispatcher<T> for BroadcastChangeDispatcher<T>
223where
224    T: Clone + Send + Sync + 'static,
225{
226    async fn dispatch_change(&self, change: Arc<T>) -> Result<()> {
227        // Ignore send errors if there are no receivers
228        let _ = self.tx.send(change);
229        Ok(())
230    }
231
232    async fn create_receiver(&self) -> Result<Box<dyn ChangeReceiver<T>>> {
233        let rx = self.tx.subscribe();
234        Ok(Box::new(BroadcastChangeReceiver { rx }))
235    }
236}
237
238/// Broadcast-based implementation of ChangeReceiver
239pub struct BroadcastChangeReceiver<T>
240where
241    T: Clone + Send + Sync + 'static,
242{
243    rx: broadcast::Receiver<Arc<T>>,
244}
245
246#[async_trait]
247impl<T> ChangeReceiver<T> for BroadcastChangeReceiver<T>
248where
249    T: Clone + Send + Sync + 'static,
250{
251    async fn recv(&mut self) -> Result<Arc<T>> {
252        match self.rx.recv().await {
253            Ok(change) => Ok(change),
254            Err(broadcast::error::RecvError::Closed) => {
255                Err(anyhow::anyhow!("Broadcast channel closed"))
256            }
257            Err(broadcast::error::RecvError::Lagged(n)) => {
258                // Log the lag but try to continue
259                log::warn!("Broadcast receiver lagged by {n} messages");
260                // Try to receive the next message
261                self.recv().await
262            }
263        }
264    }
265}
266
267/// Channel-based (MPSC) implementation of ChangeDispatcher
268pub struct ChannelChangeDispatcher<T>
269where
270    T: Clone + Send + Sync + 'static,
271{
272    tx: mpsc::Sender<Arc<T>>,
273    rx: Arc<tokio::sync::Mutex<Option<mpsc::Receiver<Arc<T>>>>>,
274    _capacity: usize,
275}
276
277impl<T> ChannelChangeDispatcher<T>
278where
279    T: Clone + Send + Sync + 'static,
280{
281    /// Create a new channel dispatcher with specified capacity
282    pub fn new(capacity: usize) -> Self {
283        let (tx, rx) = mpsc::channel(capacity);
284        Self {
285            tx,
286            rx: Arc::new(tokio::sync::Mutex::new(Some(rx))),
287            _capacity: capacity,
288        }
289    }
290}
291
292#[async_trait]
293impl<T> ChangeDispatcher<T> for ChannelChangeDispatcher<T>
294where
295    T: Clone + Send + Sync + 'static,
296{
297    async fn dispatch_change(&self, change: Arc<T>) -> Result<()> {
298        self.tx
299            .send(change)
300            .await
301            .map_err(|_| anyhow::anyhow!("Failed to send on channel"))?;
302        Ok(())
303    }
304
305    async fn create_receiver(&self) -> Result<Box<dyn ChangeReceiver<T>>> {
306        // For channel mode, we can only create one receiver
307        // Take the receiver out of the option
308        let mut rx_opt = self.rx.lock().await;
309        let rx = rx_opt.take().ok_or_else(|| {
310            anyhow::anyhow!("Receiver already created for this channel dispatcher")
311        })?;
312        Ok(Box::new(ChannelChangeReceiver { rx }))
313    }
314}
315
316/// Channel-based (MPSC) implementation of ChangeReceiver
317pub struct ChannelChangeReceiver<T>
318where
319    T: Clone + Send + Sync + 'static,
320{
321    rx: mpsc::Receiver<Arc<T>>,
322}
323
324#[async_trait]
325impl<T> ChangeReceiver<T> for ChannelChangeReceiver<T>
326where
327    T: Clone + Send + Sync + 'static,
328{
329    async fn recv(&mut self) -> Result<Arc<T>> {
330        self.rx
331            .recv()
332            .await
333            .ok_or_else(|| anyhow::anyhow!("Channel closed"))
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use tokio::time::{sleep, Duration};
341
342    #[derive(Clone, Debug, PartialEq)]
343    struct TestMessage {
344        id: u32,
345        content: String,
346    }
347
348    #[tokio::test]
349    async fn test_broadcast_dispatcher_single_receiver() {
350        let dispatcher = BroadcastChangeDispatcher::<TestMessage>::new(100);
351        let mut receiver = dispatcher.create_receiver().await.unwrap();
352
353        let msg = Arc::new(TestMessage {
354            id: 1,
355            content: "test".to_string(),
356        });
357
358        dispatcher.dispatch_change(msg.clone()).await.unwrap();
359
360        let received = receiver.recv().await.unwrap();
361        assert_eq!(*received, *msg);
362    }
363
364    #[tokio::test]
365    async fn test_broadcast_dispatcher_multiple_receivers() {
366        let dispatcher = BroadcastChangeDispatcher::<TestMessage>::new(100);
367        let mut receiver1 = dispatcher.create_receiver().await.unwrap();
368        let mut receiver2 = dispatcher.create_receiver().await.unwrap();
369
370        let msg = Arc::new(TestMessage {
371            id: 1,
372            content: "broadcast".to_string(),
373        });
374
375        dispatcher.dispatch_change(msg.clone()).await.unwrap();
376
377        let received1 = receiver1.recv().await.unwrap();
378        let received2 = receiver2.recv().await.unwrap();
379
380        assert_eq!(*received1, *msg);
381        assert_eq!(*received2, *msg);
382    }
383
384    #[tokio::test]
385    async fn test_broadcast_dispatcher_dispatch_changes() {
386        let dispatcher = BroadcastChangeDispatcher::<TestMessage>::new(100);
387        let mut receiver = dispatcher.create_receiver().await.unwrap();
388
389        let messages = vec![
390            Arc::new(TestMessage {
391                id: 1,
392                content: "first".to_string(),
393            }),
394            Arc::new(TestMessage {
395                id: 2,
396                content: "second".to_string(),
397            }),
398            Arc::new(TestMessage {
399                id: 3,
400                content: "third".to_string(),
401            }),
402        ];
403
404        dispatcher.dispatch_changes(messages.clone()).await.unwrap();
405
406        for expected in messages {
407            let received = receiver.recv().await.unwrap();
408            assert_eq!(*received, *expected);
409        }
410    }
411
412    #[tokio::test]
413    async fn test_channel_dispatcher_single_receiver() {
414        let dispatcher = ChannelChangeDispatcher::<TestMessage>::new(100);
415        let mut receiver = dispatcher.create_receiver().await.unwrap();
416
417        let msg = Arc::new(TestMessage {
418            id: 1,
419            content: "channel".to_string(),
420        });
421
422        dispatcher.dispatch_change(msg.clone()).await.unwrap();
423
424        let received = receiver.recv().await.unwrap();
425        assert_eq!(*received, *msg);
426    }
427
428    #[tokio::test]
429    async fn test_channel_dispatcher_only_one_receiver() {
430        let dispatcher = ChannelChangeDispatcher::<TestMessage>::new(100);
431        let _receiver1 = dispatcher.create_receiver().await.unwrap();
432
433        // Attempting to create a second receiver should fail
434        let result = dispatcher.create_receiver().await;
435        assert!(result.is_err());
436        if let Err(e) = result {
437            assert!(e.to_string().contains("Receiver already created"));
438        }
439    }
440
441    #[tokio::test]
442    async fn test_channel_dispatcher_dispatch_changes() {
443        let dispatcher = ChannelChangeDispatcher::<TestMessage>::new(100);
444        let mut receiver = dispatcher.create_receiver().await.unwrap();
445
446        let messages = vec![
447            Arc::new(TestMessage {
448                id: 1,
449                content: "first".to_string(),
450            }),
451            Arc::new(TestMessage {
452                id: 2,
453                content: "second".to_string(),
454            }),
455        ];
456
457        dispatcher.dispatch_changes(messages.clone()).await.unwrap();
458
459        for expected in messages {
460            let received = receiver.recv().await.unwrap();
461            assert_eq!(*received, *expected);
462        }
463    }
464
465    #[tokio::test]
466    async fn test_broadcast_receiver_handles_lag() {
467        // Create a small capacity broadcaster to force lag
468        let dispatcher = BroadcastChangeDispatcher::<TestMessage>::new(2);
469        let mut receiver = dispatcher.create_receiver().await.unwrap();
470
471        // Send more messages than capacity without reading
472        for i in 0..5 {
473            let msg = Arc::new(TestMessage {
474                id: i,
475                content: format!("msg{i}"),
476            });
477            dispatcher.dispatch_change(msg).await.unwrap();
478        }
479
480        // Give some time for messages to accumulate
481        sleep(Duration::from_millis(10)).await;
482
483        // Try to receive - should handle lag and continue
484        let result = receiver.recv().await;
485        assert!(result.is_ok());
486    }
487
488    #[tokio::test]
489    async fn test_dispatch_mode_default() {
490        assert_eq!(DispatchMode::default(), DispatchMode::Channel);
491    }
492
493    #[tokio::test]
494    async fn test_dispatch_mode_serialization() {
495        let mode = DispatchMode::Broadcast;
496        let json = serde_json::to_string(&mode).unwrap();
497        assert_eq!(json, "\"broadcast\"");
498
499        let deserialized: DispatchMode = serde_json::from_str(&json).unwrap();
500        assert_eq!(deserialized, DispatchMode::Broadcast);
501
502        let mode = DispatchMode::Channel;
503        let json = serde_json::to_string(&mode).unwrap();
504        assert_eq!(json, "\"channel\"");
505
506        let deserialized: DispatchMode = serde_json::from_str(&json).unwrap();
507        assert_eq!(deserialized, DispatchMode::Channel);
508    }
509}