Skip to main content

laminar_core/subscription/
dispatcher.rs

1//! Subscription Dispatcher — Ring 1 notification routing.
2//!
3//! Bridges Ring 0 [`NotificationRef`] signals to subscriber broadcast channels.
4//! Runs as an async task that:
5//!
6//! 1. **Drains** all [`NotificationRing`]s from Ring 0 (one per core).
7//! 2. **Batches** notifications by `source_id` for efficient lookup.
8//! 3. **Resolves** each [`NotificationRef`] into a [`ChangeEvent`] via
9//!    [`NotificationDataSource`].
10//! 4. **Broadcasts** the event to all active subscribers found in the
11//!    [`SubscriptionRegistry`].
12//!
13//! # Architecture
14//!
15//! ```text
16//! Ring 0 (per core)           Ring 1 (Dispatcher)           Ring 2 (Subscribers)
17//! ┌──────────────┐            ┌───────────────────┐         ┌──────────────┐
18//! │ NotifRing[0] │──drain──►  │                   │──send──►│ Receiver A   │
19//! │ NotifRing[1] │──drain──►  │  Dispatcher loop  │──send──►│ Receiver B   │
20//! │ NotifRing[N] │──drain──►  │                   │──send──►│ Receiver C   │
21//! └──────────────┘            └───────────────────┘         └──────────────┘
22//! ```
23
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::Arc;
26use std::time::Duration;
27
28use tokio::sync::watch;
29
30use crate::subscription::event::{ChangeEvent, NotificationRef};
31use crate::subscription::notification::NotificationRing;
32use crate::subscription::registry::SubscriptionRegistry;
33
34// ---------------------------------------------------------------------------
35// NotificationDataSource — resolver trait
36// ---------------------------------------------------------------------------
37
38/// Resolves a [`NotificationRef`] into a full [`ChangeEvent`].
39///
40/// Implemented by the DAG executor or streaming pipeline to provide zero-copy
41/// access to the source data referenced by [`NotificationRef::batch_offset`].
42pub trait NotificationDataSource: Send + Sync {
43    /// Resolves a notification reference to a full change event.
44    ///
45    /// Returns `None` if the referenced data is no longer available (e.g.,
46    /// the ring buffer has been overwritten). The dispatcher will skip the
47    /// notification and increment the drop counter.
48    fn resolve(&self, notif: &NotificationRef) -> Option<ChangeEvent>;
49}
50
51// ---------------------------------------------------------------------------
52// DispatcherConfig
53// ---------------------------------------------------------------------------
54
55/// Configuration for the [`SubscriptionDispatcher`].
56#[derive(Debug, Clone)]
57pub struct DispatcherConfig {
58    /// Maximum notifications to drain per poll cycle.
59    pub max_drain_per_cycle: usize,
60    /// Idle sleep duration when no notifications are available.
61    pub idle_sleep: Duration,
62    /// Number of spin iterations before yielding on idle.
63    pub spin_iterations: usize,
64    /// Whether to batch notifications by source before dispatch.
65    pub batch_by_source: bool,
66    /// Maximum batch size per source per cycle.
67    pub max_batch_per_source: usize,
68}
69
70impl Default for DispatcherConfig {
71    fn default() -> Self {
72        Self {
73            max_drain_per_cycle: 4096,
74            idle_sleep: Duration::from_micros(10),
75            spin_iterations: 100,
76            batch_by_source: true,
77            max_batch_per_source: 256,
78        }
79    }
80}
81
82// ---------------------------------------------------------------------------
83// DispatcherMetrics
84// ---------------------------------------------------------------------------
85
86/// Atomic counters for monitoring the dispatcher.
87#[derive(Debug, Default)]
88pub struct DispatcherMetrics {
89    /// Total notifications drained from Ring 0.
90    pub notifications_drained: AtomicU64,
91    /// Total events dispatched to subscribers.
92    pub events_dispatched: AtomicU64,
93    /// Total events dropped (resolve failure or no receivers).
94    pub events_dropped: AtomicU64,
95    /// Total dispatch cycles.
96    pub dispatch_cycles: AtomicU64,
97    /// Total idle cycles (nothing to dispatch).
98    pub idle_cycles: AtomicU64,
99    /// Maximum dispatch latency observed (nanoseconds).
100    pub max_dispatch_latency_ns: AtomicU64,
101}
102
103impl DispatcherMetrics {
104    /// Returns total notifications drained.
105    #[must_use]
106    pub fn notifications_drained(&self) -> u64 {
107        self.notifications_drained.load(Ordering::Relaxed)
108    }
109
110    /// Returns total events dispatched.
111    #[must_use]
112    pub fn events_dispatched(&self) -> u64 {
113        self.events_dispatched.load(Ordering::Relaxed)
114    }
115
116    /// Returns total events dropped.
117    #[must_use]
118    pub fn events_dropped(&self) -> u64 {
119        self.events_dropped.load(Ordering::Relaxed)
120    }
121
122    /// Returns total dispatch cycles.
123    #[must_use]
124    pub fn dispatch_cycles(&self) -> u64 {
125        self.dispatch_cycles.load(Ordering::Relaxed)
126    }
127
128    /// Returns total idle cycles.
129    #[must_use]
130    pub fn idle_cycles(&self) -> u64 {
131        self.idle_cycles.load(Ordering::Relaxed)
132    }
133}
134
135// ---------------------------------------------------------------------------
136// SubscriptionDispatcher
137// ---------------------------------------------------------------------------
138
139/// Ring 1 dispatcher that routes notifications to subscriber channels.
140///
141/// The dispatcher drains [`NotificationRing`]s from Ring 0, resolves
142/// [`NotificationRef`] values to [`ChangeEvent`] instances, and broadcasts
143/// them to active subscriber channels via [`SubscriptionRegistry`].
144///
145/// # Usage
146///
147/// ```rust,ignore
148/// let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
149/// let dispatcher = SubscriptionDispatcher::new(
150///     rings, registry, data_source, config, shutdown_rx,
151/// );
152/// let handle = tokio::spawn(dispatcher.run());
153///
154/// // ... later ...
155/// shutdown_tx.send(true).unwrap();
156/// handle.await.unwrap();
157/// ```
158pub struct SubscriptionDispatcher {
159    /// Notification rings from Ring 0 (one per core).
160    notification_rings: Vec<Arc<NotificationRing>>,
161    /// Subscription registry for subscriber lookup.
162    registry: Arc<SubscriptionRegistry>,
163    /// Data source for resolving notifications to events.
164    data_source: Arc<dyn NotificationDataSource>,
165    /// Configuration.
166    config: DispatcherConfig,
167    /// Metrics.
168    metrics: Arc<DispatcherMetrics>,
169    /// Shutdown signal receiver.
170    shutdown: watch::Receiver<bool>,
171}
172
173impl SubscriptionDispatcher {
174    /// Creates a new dispatcher.
175    #[must_use]
176    pub fn new(
177        notification_rings: Vec<Arc<NotificationRing>>,
178        registry: Arc<SubscriptionRegistry>,
179        data_source: Arc<dyn NotificationDataSource>,
180        config: DispatcherConfig,
181        shutdown: watch::Receiver<bool>,
182    ) -> Self {
183        Self {
184            notification_rings,
185            registry,
186            data_source,
187            config,
188            metrics: Arc::new(DispatcherMetrics::default()),
189            shutdown,
190        }
191    }
192
193    /// Runs the dispatcher loop until shutdown.
194    ///
195    /// Should be spawned as a tokio task.
196    pub async fn run(self) {
197        let mut batch_buffer: Vec<(u32, Vec<NotificationRef>)> = Vec::new();
198
199        loop {
200            if *self.shutdown.borrow() {
201                break;
202            }
203
204            let drained = self.drain_and_dispatch(&mut batch_buffer);
205            self.metrics.dispatch_cycles.fetch_add(1, Ordering::Relaxed);
206
207            if drained == 0 {
208                self.metrics.idle_cycles.fetch_add(1, Ordering::Relaxed);
209
210                // Adaptive wait: spin then sleep
211                for _ in 0..self.config.spin_iterations {
212                    std::hint::spin_loop();
213                }
214                tokio::time::sleep(self.config.idle_sleep).await;
215            }
216        }
217    }
218
219    /// Drains all notification rings and dispatches events.
220    ///
221    /// Returns the total number of notifications drained.
222    pub fn drain_and_dispatch(&self, batch_buffer: &mut Vec<(u32, Vec<NotificationRef>)>) -> usize {
223        batch_buffer.clear();
224        let mut total_drained: usize = 0;
225
226        // Phase 1: Drain all notification rings
227        for ring in &self.notification_rings {
228            ring.drain_into(|notif| {
229                if total_drained >= self.config.max_drain_per_cycle {
230                    return;
231                }
232                total_drained += 1;
233
234                if self.config.batch_by_source {
235                    let source_id = notif.source_id;
236                    if let Some((_, batch)) =
237                        batch_buffer.iter_mut().find(|(id, _)| *id == source_id)
238                    {
239                        if batch.len() < self.config.max_batch_per_source {
240                            batch.push(notif);
241                        }
242                    } else {
243                        batch_buffer.push((source_id, vec![notif]));
244                    }
245                } else {
246                    self.dispatch_single(&notif);
247                }
248            });
249        }
250
251        #[allow(clippy::cast_possible_truncation)]
252        let drained_u64 = total_drained as u64;
253        self.metrics
254            .notifications_drained
255            .fetch_add(drained_u64, Ordering::Relaxed);
256
257        // Phase 2: Dispatch batched notifications
258        if self.config.batch_by_source {
259            for (source_id, notifs) in batch_buffer.drain(..) {
260                self.dispatch_batch(source_id, &notifs);
261            }
262        }
263
264        total_drained
265    }
266
267    /// Dispatches a single notification (unbatched mode).
268    fn dispatch_single(&self, notif: &NotificationRef) {
269        let Some(event) = self.data_source.resolve(notif) else {
270            self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
271            return;
272        };
273
274        let senders = self.registry.get_senders_for_source(notif.source_id);
275        if senders.is_empty() {
276            return;
277        }
278
279        for sender in senders {
280            match sender.send(event.clone()) {
281                Ok(_) => {
282                    self.metrics
283                        .events_dispatched
284                        .fetch_add(1, Ordering::Relaxed);
285                }
286                Err(_) => {
287                    self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
288                }
289            }
290        }
291    }
292
293    /// Dispatches a batch of notifications for one source.
294    fn dispatch_batch(&self, source_id: u32, notifs: &[NotificationRef]) {
295        let senders = self.registry.get_senders_for_source(source_id);
296        if senders.is_empty() {
297            return;
298        }
299
300        for notif in notifs {
301            let Some(event) = self.data_source.resolve(notif) else {
302                self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
303                continue;
304            };
305
306            for sender in &senders {
307                match sender.send(event.clone()) {
308                    Ok(_) => {
309                        self.metrics
310                            .events_dispatched
311                            .fetch_add(1, Ordering::Relaxed);
312                    }
313                    Err(_) => {
314                        self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
315                    }
316                }
317            }
318        }
319    }
320
321    /// Returns the dispatcher metrics.
322    #[must_use]
323    pub fn metrics(&self) -> &Arc<DispatcherMetrics> {
324        &self.metrics
325    }
326}
327
328// ===========================================================================
329// Tests
330// ===========================================================================
331
332#[cfg(test)]
333#[allow(clippy::cast_possible_wrap)]
334#[allow(clippy::field_reassign_with_default)]
335mod tests {
336    use super::*;
337    use std::sync::Arc;
338
339    use arrow_array::Int64Array;
340    use arrow_schema::{DataType, Field, Schema};
341
342    use crate::subscription::event::EventType;
343    use crate::subscription::notification::NotificationRing;
344    use crate::subscription::registry::{SubscriptionConfig, SubscriptionRegistry};
345
346    // -- helpers --
347
348    fn make_batch(n: usize) -> arrow_array::RecordBatch {
349        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
350        let values: Vec<i64> = (0..n as i64).collect();
351        let array = Int64Array::from(values);
352        arrow_array::RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
353    }
354
355    /// Mock data source that creates a `ChangeEvent::Insert` from the
356    /// notification metadata.
357    struct MockDataSource;
358
359    impl NotificationDataSource for MockDataSource {
360        fn resolve(&self, notif: &NotificationRef) -> Option<ChangeEvent> {
361            let batch = Arc::new(make_batch(notif.row_count as usize));
362            Some(ChangeEvent::insert(batch, notif.timestamp, notif.sequence))
363        }
364    }
365
366    /// Mock data source that always returns `None` (simulates expired data).
367    struct FailingDataSource;
368
369    impl NotificationDataSource for FailingDataSource {
370        fn resolve(&self, _notif: &NotificationRef) -> Option<ChangeEvent> {
371            None
372        }
373    }
374
375    fn make_notif(seq: u64, source_id: u32, rows: u32, ts: i64) -> NotificationRef {
376        NotificationRef::new(seq, source_id, EventType::Insert, rows, ts, 0)
377    }
378
379    fn make_dispatcher(
380        rings: Vec<Arc<NotificationRing>>,
381        registry: Arc<SubscriptionRegistry>,
382        data_source: Arc<dyn NotificationDataSource>,
383        config: DispatcherConfig,
384    ) -> SubscriptionDispatcher {
385        let (_tx, rx) = watch::channel(false);
386        SubscriptionDispatcher::new(rings, registry, data_source, config, rx)
387    }
388
389    // -- Config tests --
390
391    #[test]
392    fn test_dispatcher_config_default() {
393        let cfg = DispatcherConfig::default();
394        assert_eq!(cfg.max_drain_per_cycle, 4096);
395        assert_eq!(cfg.idle_sleep, Duration::from_micros(10));
396        assert_eq!(cfg.spin_iterations, 100);
397        assert!(cfg.batch_by_source);
398        assert_eq!(cfg.max_batch_per_source, 256);
399    }
400
401    // -- Drain tests --
402
403    #[test]
404    fn test_dispatcher_drain_single_ring() {
405        let ring = Arc::new(NotificationRing::new(64));
406        for i in 0..5u64 {
407            ring.push(make_notif(i, 0, 1, 1000 + i as i64));
408        }
409
410        let registry = Arc::new(SubscriptionRegistry::new());
411        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
412        let dispatcher = make_dispatcher(
413            vec![Arc::clone(&ring)],
414            registry,
415            ds,
416            DispatcherConfig::default(),
417        );
418
419        let mut buf = Vec::new();
420        let drained = dispatcher.drain_and_dispatch(&mut buf);
421        assert_eq!(drained, 5);
422        assert_eq!(dispatcher.metrics().notifications_drained(), 5);
423    }
424
425    #[test]
426    fn test_dispatcher_drain_multiple_rings() {
427        let ring0 = Arc::new(NotificationRing::new(64));
428        let ring1 = Arc::new(NotificationRing::new(64));
429
430        for i in 0..3u64 {
431            ring0.push(make_notif(i, 0, 1, 1000));
432        }
433        for i in 0..4u64 {
434            ring1.push(make_notif(i, 1, 1, 2000));
435        }
436
437        let registry = Arc::new(SubscriptionRegistry::new());
438        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
439        let dispatcher = make_dispatcher(
440            vec![ring0, ring1],
441            registry,
442            ds,
443            DispatcherConfig::default(),
444        );
445
446        let mut buf = Vec::new();
447        let drained = dispatcher.drain_and_dispatch(&mut buf);
448        assert_eq!(drained, 7);
449        assert_eq!(dispatcher.metrics().notifications_drained(), 7);
450    }
451
452    // -- Dispatch tests --
453
454    #[test]
455    fn test_dispatcher_dispatch_single() {
456        let ring = Arc::new(NotificationRing::new(64));
457        ring.push(make_notif(1, 0, 5, 1000));
458
459        let registry = Arc::new(SubscriptionRegistry::new());
460        let (_, mut rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
461        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
462
463        // Use unbatched mode
464        let mut cfg = DispatcherConfig::default();
465        cfg.batch_by_source = false;
466        let dispatcher = make_dispatcher(vec![ring], Arc::clone(&registry), ds, cfg);
467
468        let mut buf = Vec::new();
469        dispatcher.drain_and_dispatch(&mut buf);
470
471        assert_eq!(dispatcher.metrics().events_dispatched(), 1);
472
473        let event = rx.try_recv().unwrap();
474        assert_eq!(event.timestamp(), 1000);
475        assert_eq!(event.sequence(), Some(1));
476        assert_eq!(event.row_count(), 5);
477    }
478
479    #[test]
480    fn test_dispatcher_dispatch_batch() {
481        let ring = Arc::new(NotificationRing::new(64));
482        for i in 0..3u64 {
483            ring.push(make_notif(i + 1, 0, 2, 1000 + i as i64));
484        }
485
486        let registry = Arc::new(SubscriptionRegistry::new());
487        let (_, mut rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
488        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
489        let dispatcher = make_dispatcher(
490            vec![ring],
491            Arc::clone(&registry),
492            ds,
493            DispatcherConfig::default(),
494        );
495
496        let mut buf = Vec::new();
497        let drained = dispatcher.drain_and_dispatch(&mut buf);
498        assert_eq!(drained, 3);
499        assert_eq!(dispatcher.metrics().events_dispatched(), 3);
500
501        // All 3 events arrive in order
502        for i in 0..3u64 {
503            let event = rx.try_recv().unwrap();
504            assert_eq!(event.sequence(), Some(i + 1));
505        }
506    }
507
508    #[test]
509    fn test_dispatcher_no_subscribers() {
510        let ring = Arc::new(NotificationRing::new(64));
511        ring.push(make_notif(1, 0, 1, 1000));
512
513        let registry = Arc::new(SubscriptionRegistry::new());
514        // No subscriptions registered
515        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
516        let dispatcher = make_dispatcher(vec![ring], registry, ds, DispatcherConfig::default());
517
518        let mut buf = Vec::new();
519        let drained = dispatcher.drain_and_dispatch(&mut buf);
520        assert_eq!(drained, 1);
521        // Drained but not dispatched (no subscribers)
522        assert_eq!(dispatcher.metrics().events_dispatched(), 0);
523        assert_eq!(dispatcher.metrics().events_dropped(), 0);
524    }
525
526    #[test]
527    fn test_dispatcher_paused_subscriber_skipped() {
528        let ring = Arc::new(NotificationRing::new(64));
529        ring.push(make_notif(1, 0, 1, 1000));
530
531        let registry = Arc::new(SubscriptionRegistry::new());
532        let (id, mut rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
533        registry.pause(id);
534
535        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
536        let dispatcher = make_dispatcher(
537            vec![ring],
538            Arc::clone(&registry),
539            ds,
540            DispatcherConfig::default(),
541        );
542
543        let mut buf = Vec::new();
544        dispatcher.drain_and_dispatch(&mut buf);
545
546        // Paused subscriber does not receive
547        assert_eq!(dispatcher.metrics().events_dispatched(), 0);
548        assert!(rx.try_recv().is_err());
549    }
550
551    #[test]
552    fn test_dispatcher_metrics() {
553        let ring = Arc::new(NotificationRing::new(64));
554        for i in 0..10u64 {
555            ring.push(make_notif(i, 0, 1, 1000));
556        }
557
558        let registry = Arc::new(SubscriptionRegistry::new());
559        let (_, _rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
560        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
561        let dispatcher = make_dispatcher(
562            vec![ring],
563            Arc::clone(&registry),
564            ds,
565            DispatcherConfig::default(),
566        );
567
568        let mut buf = Vec::new();
569        dispatcher.drain_and_dispatch(&mut buf);
570
571        let m = dispatcher.metrics();
572        assert_eq!(m.notifications_drained(), 10);
573        assert_eq!(m.events_dispatched(), 10);
574        assert_eq!(m.events_dropped(), 0);
575        assert_eq!(m.dispatch_cycles(), 0); // drain_and_dispatch doesn't bump this
576    }
577
578    #[test]
579    fn test_dispatcher_resolve_failure() {
580        let ring = Arc::new(NotificationRing::new(64));
581        ring.push(make_notif(1, 0, 1, 1000));
582        ring.push(make_notif(2, 0, 1, 2000));
583
584        let registry = Arc::new(SubscriptionRegistry::new());
585        let (_, _rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
586        let ds = Arc::new(FailingDataSource) as Arc<dyn NotificationDataSource>;
587        let dispatcher = make_dispatcher(
588            vec![ring],
589            Arc::clone(&registry),
590            ds,
591            DispatcherConfig::default(),
592        );
593
594        let mut buf = Vec::new();
595        let drained = dispatcher.drain_and_dispatch(&mut buf);
596        assert_eq!(drained, 2);
597        assert_eq!(dispatcher.metrics().events_dispatched(), 0);
598        assert_eq!(dispatcher.metrics().events_dropped(), 2);
599    }
600
601    #[test]
602    fn test_dispatcher_lagged_subscriber() {
603        // Use a very small buffer so the receiver lags
604        let ring = Arc::new(NotificationRing::new(64));
605        for i in 0..10u64 {
606            ring.push(make_notif(i, 0, 1, 1000));
607        }
608
609        let registry = Arc::new(SubscriptionRegistry::new());
610        let mut cfg = SubscriptionConfig::default();
611        cfg.buffer_size = 2; // very small buffer
612        let (_, _rx) = registry.create("mv_a".into(), 0, cfg);
613
614        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
615        let dispatcher = make_dispatcher(
616            vec![ring],
617            Arc::clone(&registry),
618            ds,
619            DispatcherConfig::default(),
620        );
621
622        let mut buf = Vec::new();
623        dispatcher.drain_and_dispatch(&mut buf);
624
625        // All 10 drained. broadcast::send succeeds even when the internal
626        // buffer overwrites old entries (it returns Ok with receiver count).
627        // The lagged receiver will get RecvError::Lagged on next recv.
628        assert_eq!(dispatcher.metrics().notifications_drained(), 10);
629        assert_eq!(dispatcher.metrics().events_dispatched(), 10);
630    }
631
632    // -- Shutdown test --
633
634    #[tokio::test]
635    async fn test_dispatcher_shutdown() {
636        let ring = Arc::new(NotificationRing::new(64));
637        let registry = Arc::new(SubscriptionRegistry::new());
638        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
639
640        let (shutdown_tx, shutdown_rx) = watch::channel(false);
641        let mut cfg = DispatcherConfig::default();
642        cfg.idle_sleep = Duration::from_millis(1); // fast idle for test
643        cfg.spin_iterations = 0;
644
645        let dispatcher = SubscriptionDispatcher::new(vec![ring], registry, ds, cfg, shutdown_rx);
646
647        let metrics = Arc::clone(dispatcher.metrics());
648
649        let handle = tokio::spawn(dispatcher.run());
650
651        // Let it run a few cycles
652        tokio::time::sleep(Duration::from_millis(20)).await;
653
654        // Signal shutdown
655        shutdown_tx.send(true).unwrap();
656        handle.await.unwrap();
657
658        // Ran at least a few cycles
659        assert!(metrics.dispatch_cycles() > 0);
660        assert!(metrics.idle_cycles() > 0);
661    }
662
663    // -- Integration tests --
664
665    #[test]
666    fn test_end_to_end_notification_to_subscriber() {
667        use crate::subscription::notification::NotificationHub;
668
669        let mut hub = NotificationHub::new(4, 64);
670        let source_id = hub.register_source().unwrap();
671
672        let registry = Arc::new(SubscriptionRegistry::new());
673        let (_, mut rx) =
674            registry.create("mv_orders".into(), source_id, SubscriptionConfig::default());
675
676        // Notify through the hub
677        hub.notify_source(source_id, EventType::Insert, 10, 5000, 0);
678        hub.notify_source(source_id, EventType::Delete, 3, 6000, 64);
679
680        // Drain hub into a standalone ring for the dispatcher
681        let ring = Arc::new(NotificationRing::new(64));
682        hub.drain_notifications(|n| {
683            ring.push(n);
684        });
685
686        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
687        let dispatcher = make_dispatcher(
688            vec![ring],
689            Arc::clone(&registry),
690            ds,
691            DispatcherConfig::default(),
692        );
693
694        let mut buf = Vec::new();
695        dispatcher.drain_and_dispatch(&mut buf);
696
697        assert_eq!(dispatcher.metrics().events_dispatched(), 2);
698
699        let e1 = rx.try_recv().unwrap();
700        assert_eq!(e1.timestamp(), 5000);
701        assert_eq!(e1.row_count(), 10);
702
703        let e2 = rx.try_recv().unwrap();
704        assert_eq!(e2.timestamp(), 6000);
705        assert_eq!(e2.row_count(), 3);
706    }
707
708    #[test]
709    fn test_dispatcher_multiple_subscribers_same_source() {
710        let ring = Arc::new(NotificationRing::new(64));
711        ring.push(make_notif(1, 0, 5, 1000));
712
713        let registry = Arc::new(SubscriptionRegistry::new());
714        let (_, mut rx1) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
715        let (_, mut rx2) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
716        let (_, mut rx3) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
717
718        let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
719        let dispatcher = make_dispatcher(
720            vec![ring],
721            Arc::clone(&registry),
722            ds,
723            DispatcherConfig::default(),
724        );
725
726        let mut buf = Vec::new();
727        dispatcher.drain_and_dispatch(&mut buf);
728
729        // 1 notification × 3 subscribers = 3 dispatched
730        assert_eq!(dispatcher.metrics().events_dispatched(), 3);
731
732        // All three receive
733        for rx in [&mut rx1, &mut rx2, &mut rx3] {
734            let event = rx.try_recv().unwrap();
735            assert_eq!(event.timestamp(), 1000);
736            assert_eq!(event.row_count(), 5);
737        }
738    }
739}