1use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::Arc;
26use std::time::Duration;
27
28use tokio::sync::watch;
29
30use crate::subscription::event::{ChangeEvent, NotificationRef};
31use crate::subscription::notification::NotificationRing;
32use crate::subscription::registry::SubscriptionRegistry;
33
34pub trait NotificationDataSource: Send + Sync {
43 fn resolve(&self, notif: &NotificationRef) -> Option<ChangeEvent>;
49}
50
51#[derive(Debug, Clone)]
57pub struct DispatcherConfig {
58 pub max_drain_per_cycle: usize,
60 pub idle_sleep: Duration,
62 pub spin_iterations: usize,
64 pub batch_by_source: bool,
66 pub max_batch_per_source: usize,
68}
69
70impl Default for DispatcherConfig {
71 fn default() -> Self {
72 Self {
73 max_drain_per_cycle: 4096,
74 idle_sleep: Duration::from_micros(10),
75 spin_iterations: 100,
76 batch_by_source: true,
77 max_batch_per_source: 256,
78 }
79 }
80}
81
82#[derive(Debug, Default)]
88pub struct DispatcherMetrics {
89 pub notifications_drained: AtomicU64,
91 pub events_dispatched: AtomicU64,
93 pub events_dropped: AtomicU64,
95 pub dispatch_cycles: AtomicU64,
97 pub idle_cycles: AtomicU64,
99 pub max_dispatch_latency_ns: AtomicU64,
101}
102
103impl DispatcherMetrics {
104 #[must_use]
106 pub fn notifications_drained(&self) -> u64 {
107 self.notifications_drained.load(Ordering::Relaxed)
108 }
109
110 #[must_use]
112 pub fn events_dispatched(&self) -> u64 {
113 self.events_dispatched.load(Ordering::Relaxed)
114 }
115
116 #[must_use]
118 pub fn events_dropped(&self) -> u64 {
119 self.events_dropped.load(Ordering::Relaxed)
120 }
121
122 #[must_use]
124 pub fn dispatch_cycles(&self) -> u64 {
125 self.dispatch_cycles.load(Ordering::Relaxed)
126 }
127
128 #[must_use]
130 pub fn idle_cycles(&self) -> u64 {
131 self.idle_cycles.load(Ordering::Relaxed)
132 }
133}
134
135pub struct SubscriptionDispatcher {
159 notification_rings: Vec<Arc<NotificationRing>>,
161 registry: Arc<SubscriptionRegistry>,
163 data_source: Arc<dyn NotificationDataSource>,
165 config: DispatcherConfig,
167 metrics: Arc<DispatcherMetrics>,
169 shutdown: watch::Receiver<bool>,
171}
172
173impl SubscriptionDispatcher {
174 #[must_use]
176 pub fn new(
177 notification_rings: Vec<Arc<NotificationRing>>,
178 registry: Arc<SubscriptionRegistry>,
179 data_source: Arc<dyn NotificationDataSource>,
180 config: DispatcherConfig,
181 shutdown: watch::Receiver<bool>,
182 ) -> Self {
183 Self {
184 notification_rings,
185 registry,
186 data_source,
187 config,
188 metrics: Arc::new(DispatcherMetrics::default()),
189 shutdown,
190 }
191 }
192
193 pub async fn run(self) {
197 let mut batch_buffer: Vec<(u32, Vec<NotificationRef>)> = Vec::new();
198
199 loop {
200 if *self.shutdown.borrow() {
201 break;
202 }
203
204 let drained = self.drain_and_dispatch(&mut batch_buffer);
205 self.metrics.dispatch_cycles.fetch_add(1, Ordering::Relaxed);
206
207 if drained == 0 {
208 self.metrics.idle_cycles.fetch_add(1, Ordering::Relaxed);
209
210 for _ in 0..self.config.spin_iterations {
212 std::hint::spin_loop();
213 }
214 tokio::time::sleep(self.config.idle_sleep).await;
215 }
216 }
217 }
218
219 pub fn drain_and_dispatch(&self, batch_buffer: &mut Vec<(u32, Vec<NotificationRef>)>) -> usize {
223 batch_buffer.clear();
224 let mut total_drained: usize = 0;
225
226 for ring in &self.notification_rings {
228 ring.drain_into(|notif| {
229 if total_drained >= self.config.max_drain_per_cycle {
230 return;
231 }
232 total_drained += 1;
233
234 if self.config.batch_by_source {
235 let source_id = notif.source_id;
236 if let Some((_, batch)) =
237 batch_buffer.iter_mut().find(|(id, _)| *id == source_id)
238 {
239 if batch.len() < self.config.max_batch_per_source {
240 batch.push(notif);
241 }
242 } else {
243 batch_buffer.push((source_id, vec![notif]));
244 }
245 } else {
246 self.dispatch_single(¬if);
247 }
248 });
249 }
250
251 #[allow(clippy::cast_possible_truncation)]
252 let drained_u64 = total_drained as u64;
253 self.metrics
254 .notifications_drained
255 .fetch_add(drained_u64, Ordering::Relaxed);
256
257 if self.config.batch_by_source {
259 for (source_id, notifs) in batch_buffer.drain(..) {
260 self.dispatch_batch(source_id, ¬ifs);
261 }
262 }
263
264 total_drained
265 }
266
267 fn dispatch_single(&self, notif: &NotificationRef) {
269 let Some(event) = self.data_source.resolve(notif) else {
270 self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
271 return;
272 };
273
274 let senders = self.registry.get_senders_for_source(notif.source_id);
275 if senders.is_empty() {
276 return;
277 }
278
279 for sender in senders {
280 match sender.send(event.clone()) {
281 Ok(_) => {
282 self.metrics
283 .events_dispatched
284 .fetch_add(1, Ordering::Relaxed);
285 }
286 Err(_) => {
287 self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
288 }
289 }
290 }
291 }
292
293 fn dispatch_batch(&self, source_id: u32, notifs: &[NotificationRef]) {
295 let senders = self.registry.get_senders_for_source(source_id);
296 if senders.is_empty() {
297 return;
298 }
299
300 for notif in notifs {
301 let Some(event) = self.data_source.resolve(notif) else {
302 self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
303 continue;
304 };
305
306 for sender in &senders {
307 match sender.send(event.clone()) {
308 Ok(_) => {
309 self.metrics
310 .events_dispatched
311 .fetch_add(1, Ordering::Relaxed);
312 }
313 Err(_) => {
314 self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
315 }
316 }
317 }
318 }
319 }
320
321 #[must_use]
323 pub fn metrics(&self) -> &Arc<DispatcherMetrics> {
324 &self.metrics
325 }
326}
327
328#[cfg(test)]
333#[allow(clippy::cast_possible_wrap)]
334#[allow(clippy::field_reassign_with_default)]
335mod tests {
336 use super::*;
337 use std::sync::Arc;
338
339 use arrow_array::Int64Array;
340 use arrow_schema::{DataType, Field, Schema};
341
342 use crate::subscription::event::EventType;
343 use crate::subscription::notification::NotificationRing;
344 use crate::subscription::registry::{SubscriptionConfig, SubscriptionRegistry};
345
346 fn make_batch(n: usize) -> arrow_array::RecordBatch {
349 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
350 let values: Vec<i64> = (0..n as i64).collect();
351 let array = Int64Array::from(values);
352 arrow_array::RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
353 }
354
355 struct MockDataSource;
358
359 impl NotificationDataSource for MockDataSource {
360 fn resolve(&self, notif: &NotificationRef) -> Option<ChangeEvent> {
361 let batch = Arc::new(make_batch(notif.row_count as usize));
362 Some(ChangeEvent::insert(batch, notif.timestamp, notif.sequence))
363 }
364 }
365
366 struct FailingDataSource;
368
369 impl NotificationDataSource for FailingDataSource {
370 fn resolve(&self, _notif: &NotificationRef) -> Option<ChangeEvent> {
371 None
372 }
373 }
374
375 fn make_notif(seq: u64, source_id: u32, rows: u32, ts: i64) -> NotificationRef {
376 NotificationRef::new(seq, source_id, EventType::Insert, rows, ts, 0)
377 }
378
379 fn make_dispatcher(
380 rings: Vec<Arc<NotificationRing>>,
381 registry: Arc<SubscriptionRegistry>,
382 data_source: Arc<dyn NotificationDataSource>,
383 config: DispatcherConfig,
384 ) -> SubscriptionDispatcher {
385 let (_tx, rx) = watch::channel(false);
386 SubscriptionDispatcher::new(rings, registry, data_source, config, rx)
387 }
388
389 #[test]
392 fn test_dispatcher_config_default() {
393 let cfg = DispatcherConfig::default();
394 assert_eq!(cfg.max_drain_per_cycle, 4096);
395 assert_eq!(cfg.idle_sleep, Duration::from_micros(10));
396 assert_eq!(cfg.spin_iterations, 100);
397 assert!(cfg.batch_by_source);
398 assert_eq!(cfg.max_batch_per_source, 256);
399 }
400
401 #[test]
404 fn test_dispatcher_drain_single_ring() {
405 let ring = Arc::new(NotificationRing::new(64));
406 for i in 0..5u64 {
407 ring.push(make_notif(i, 0, 1, 1000 + i as i64));
408 }
409
410 let registry = Arc::new(SubscriptionRegistry::new());
411 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
412 let dispatcher = make_dispatcher(
413 vec![Arc::clone(&ring)],
414 registry,
415 ds,
416 DispatcherConfig::default(),
417 );
418
419 let mut buf = Vec::new();
420 let drained = dispatcher.drain_and_dispatch(&mut buf);
421 assert_eq!(drained, 5);
422 assert_eq!(dispatcher.metrics().notifications_drained(), 5);
423 }
424
425 #[test]
426 fn test_dispatcher_drain_multiple_rings() {
427 let ring0 = Arc::new(NotificationRing::new(64));
428 let ring1 = Arc::new(NotificationRing::new(64));
429
430 for i in 0..3u64 {
431 ring0.push(make_notif(i, 0, 1, 1000));
432 }
433 for i in 0..4u64 {
434 ring1.push(make_notif(i, 1, 1, 2000));
435 }
436
437 let registry = Arc::new(SubscriptionRegistry::new());
438 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
439 let dispatcher = make_dispatcher(
440 vec![ring0, ring1],
441 registry,
442 ds,
443 DispatcherConfig::default(),
444 );
445
446 let mut buf = Vec::new();
447 let drained = dispatcher.drain_and_dispatch(&mut buf);
448 assert_eq!(drained, 7);
449 assert_eq!(dispatcher.metrics().notifications_drained(), 7);
450 }
451
452 #[test]
455 fn test_dispatcher_dispatch_single() {
456 let ring = Arc::new(NotificationRing::new(64));
457 ring.push(make_notif(1, 0, 5, 1000));
458
459 let registry = Arc::new(SubscriptionRegistry::new());
460 let (_, mut rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
461 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
462
463 let mut cfg = DispatcherConfig::default();
465 cfg.batch_by_source = false;
466 let dispatcher = make_dispatcher(vec![ring], Arc::clone(®istry), ds, cfg);
467
468 let mut buf = Vec::new();
469 dispatcher.drain_and_dispatch(&mut buf);
470
471 assert_eq!(dispatcher.metrics().events_dispatched(), 1);
472
473 let event = rx.try_recv().unwrap();
474 assert_eq!(event.timestamp(), 1000);
475 assert_eq!(event.sequence(), Some(1));
476 assert_eq!(event.row_count(), 5);
477 }
478
479 #[test]
480 fn test_dispatcher_dispatch_batch() {
481 let ring = Arc::new(NotificationRing::new(64));
482 for i in 0..3u64 {
483 ring.push(make_notif(i + 1, 0, 2, 1000 + i as i64));
484 }
485
486 let registry = Arc::new(SubscriptionRegistry::new());
487 let (_, mut rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
488 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
489 let dispatcher = make_dispatcher(
490 vec![ring],
491 Arc::clone(®istry),
492 ds,
493 DispatcherConfig::default(),
494 );
495
496 let mut buf = Vec::new();
497 let drained = dispatcher.drain_and_dispatch(&mut buf);
498 assert_eq!(drained, 3);
499 assert_eq!(dispatcher.metrics().events_dispatched(), 3);
500
501 for i in 0..3u64 {
503 let event = rx.try_recv().unwrap();
504 assert_eq!(event.sequence(), Some(i + 1));
505 }
506 }
507
508 #[test]
509 fn test_dispatcher_no_subscribers() {
510 let ring = Arc::new(NotificationRing::new(64));
511 ring.push(make_notif(1, 0, 1, 1000));
512
513 let registry = Arc::new(SubscriptionRegistry::new());
514 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
516 let dispatcher = make_dispatcher(vec![ring], registry, ds, DispatcherConfig::default());
517
518 let mut buf = Vec::new();
519 let drained = dispatcher.drain_and_dispatch(&mut buf);
520 assert_eq!(drained, 1);
521 assert_eq!(dispatcher.metrics().events_dispatched(), 0);
523 assert_eq!(dispatcher.metrics().events_dropped(), 0);
524 }
525
526 #[test]
527 fn test_dispatcher_paused_subscriber_skipped() {
528 let ring = Arc::new(NotificationRing::new(64));
529 ring.push(make_notif(1, 0, 1, 1000));
530
531 let registry = Arc::new(SubscriptionRegistry::new());
532 let (id, mut rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
533 registry.pause(id);
534
535 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
536 let dispatcher = make_dispatcher(
537 vec![ring],
538 Arc::clone(®istry),
539 ds,
540 DispatcherConfig::default(),
541 );
542
543 let mut buf = Vec::new();
544 dispatcher.drain_and_dispatch(&mut buf);
545
546 assert_eq!(dispatcher.metrics().events_dispatched(), 0);
548 assert!(rx.try_recv().is_err());
549 }
550
551 #[test]
552 fn test_dispatcher_metrics() {
553 let ring = Arc::new(NotificationRing::new(64));
554 for i in 0..10u64 {
555 ring.push(make_notif(i, 0, 1, 1000));
556 }
557
558 let registry = Arc::new(SubscriptionRegistry::new());
559 let (_, _rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
560 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
561 let dispatcher = make_dispatcher(
562 vec![ring],
563 Arc::clone(®istry),
564 ds,
565 DispatcherConfig::default(),
566 );
567
568 let mut buf = Vec::new();
569 dispatcher.drain_and_dispatch(&mut buf);
570
571 let m = dispatcher.metrics();
572 assert_eq!(m.notifications_drained(), 10);
573 assert_eq!(m.events_dispatched(), 10);
574 assert_eq!(m.events_dropped(), 0);
575 assert_eq!(m.dispatch_cycles(), 0); }
577
578 #[test]
579 fn test_dispatcher_resolve_failure() {
580 let ring = Arc::new(NotificationRing::new(64));
581 ring.push(make_notif(1, 0, 1, 1000));
582 ring.push(make_notif(2, 0, 1, 2000));
583
584 let registry = Arc::new(SubscriptionRegistry::new());
585 let (_, _rx) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
586 let ds = Arc::new(FailingDataSource) as Arc<dyn NotificationDataSource>;
587 let dispatcher = make_dispatcher(
588 vec![ring],
589 Arc::clone(®istry),
590 ds,
591 DispatcherConfig::default(),
592 );
593
594 let mut buf = Vec::new();
595 let drained = dispatcher.drain_and_dispatch(&mut buf);
596 assert_eq!(drained, 2);
597 assert_eq!(dispatcher.metrics().events_dispatched(), 0);
598 assert_eq!(dispatcher.metrics().events_dropped(), 2);
599 }
600
601 #[test]
602 fn test_dispatcher_lagged_subscriber() {
603 let ring = Arc::new(NotificationRing::new(64));
605 for i in 0..10u64 {
606 ring.push(make_notif(i, 0, 1, 1000));
607 }
608
609 let registry = Arc::new(SubscriptionRegistry::new());
610 let mut cfg = SubscriptionConfig::default();
611 cfg.buffer_size = 2; let (_, _rx) = registry.create("mv_a".into(), 0, cfg);
613
614 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
615 let dispatcher = make_dispatcher(
616 vec![ring],
617 Arc::clone(®istry),
618 ds,
619 DispatcherConfig::default(),
620 );
621
622 let mut buf = Vec::new();
623 dispatcher.drain_and_dispatch(&mut buf);
624
625 assert_eq!(dispatcher.metrics().notifications_drained(), 10);
629 assert_eq!(dispatcher.metrics().events_dispatched(), 10);
630 }
631
632 #[tokio::test]
635 async fn test_dispatcher_shutdown() {
636 let ring = Arc::new(NotificationRing::new(64));
637 let registry = Arc::new(SubscriptionRegistry::new());
638 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
639
640 let (shutdown_tx, shutdown_rx) = watch::channel(false);
641 let mut cfg = DispatcherConfig::default();
642 cfg.idle_sleep = Duration::from_millis(1); cfg.spin_iterations = 0;
644
645 let dispatcher = SubscriptionDispatcher::new(vec![ring], registry, ds, cfg, shutdown_rx);
646
647 let metrics = Arc::clone(dispatcher.metrics());
648
649 let handle = tokio::spawn(dispatcher.run());
650
651 tokio::time::sleep(Duration::from_millis(20)).await;
653
654 shutdown_tx.send(true).unwrap();
656 handle.await.unwrap();
657
658 assert!(metrics.dispatch_cycles() > 0);
660 assert!(metrics.idle_cycles() > 0);
661 }
662
663 #[test]
666 fn test_end_to_end_notification_to_subscriber() {
667 use crate::subscription::notification::NotificationHub;
668
669 let mut hub = NotificationHub::new(4, 64);
670 let source_id = hub.register_source().unwrap();
671
672 let registry = Arc::new(SubscriptionRegistry::new());
673 let (_, mut rx) =
674 registry.create("mv_orders".into(), source_id, SubscriptionConfig::default());
675
676 hub.notify_source(source_id, EventType::Insert, 10, 5000, 0);
678 hub.notify_source(source_id, EventType::Delete, 3, 6000, 64);
679
680 let ring = Arc::new(NotificationRing::new(64));
682 hub.drain_notifications(|n| {
683 ring.push(n);
684 });
685
686 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
687 let dispatcher = make_dispatcher(
688 vec![ring],
689 Arc::clone(®istry),
690 ds,
691 DispatcherConfig::default(),
692 );
693
694 let mut buf = Vec::new();
695 dispatcher.drain_and_dispatch(&mut buf);
696
697 assert_eq!(dispatcher.metrics().events_dispatched(), 2);
698
699 let e1 = rx.try_recv().unwrap();
700 assert_eq!(e1.timestamp(), 5000);
701 assert_eq!(e1.row_count(), 10);
702
703 let e2 = rx.try_recv().unwrap();
704 assert_eq!(e2.timestamp(), 6000);
705 assert_eq!(e2.row_count(), 3);
706 }
707
708 #[test]
709 fn test_dispatcher_multiple_subscribers_same_source() {
710 let ring = Arc::new(NotificationRing::new(64));
711 ring.push(make_notif(1, 0, 5, 1000));
712
713 let registry = Arc::new(SubscriptionRegistry::new());
714 let (_, mut rx1) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
715 let (_, mut rx2) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
716 let (_, mut rx3) = registry.create("mv_a".into(), 0, SubscriptionConfig::default());
717
718 let ds = Arc::new(MockDataSource) as Arc<dyn NotificationDataSource>;
719 let dispatcher = make_dispatcher(
720 vec![ring],
721 Arc::clone(®istry),
722 ds,
723 DispatcherConfig::default(),
724 );
725
726 let mut buf = Vec::new();
727 dispatcher.drain_and_dispatch(&mut buf);
728
729 assert_eq!(dispatcher.metrics().events_dispatched(), 3);
731
732 for rx in [&mut rx1, &mut rx2, &mut rx3] {
734 let event = rx.try_recv().unwrap();
735 assert_eq!(event.timestamp(), 1000);
736 assert_eq!(event.row_count(), 5);
737 }
738 }
739}