Skip to main content

laminar_core/subscription/
registry.rs

1//! Subscription Registry — Ring 2 lifecycle management.
2//!
3//! Manages the lifecycle of all active subscriptions: create, pause, resume,
4//! cancel. Provides the mapping from `source_id` (used by Ring 0 notifications)
5//! to the set of active subscriptions and their broadcast channels (used by the
6//! Ring 1 dispatcher).
7//!
8//! # Thread Safety
9//!
10//! The registry is designed for concurrent access:
11//! - **Read** operations (`get_senders_for_source`, `metrics`) take a read lock
12//!   and can proceed concurrently with each other.
13//! - **Write** operations (`create`, `cancel`, `pause`, `resume`) take a write
14//!   lock but are infrequent Ring 2 operations with no latency requirements.
15//!
16//! # Architecture
17//!
18//! ```text
19//! Ring 2 (Control Plane)          Ring 1 (Dispatcher)
20//! ┌─────────────────────┐         ┌──────────────────────┐
21//! │ create / cancel     │         │ get_senders_for_source│
22//! │ pause / resume      │  ────►  │ (read lock, O(1))     │
23//! │ (write lock, rare)  │         └──────────────────────┘
24//! └─────────────────────┘
25//! ```
26
27use std::collections::HashMap;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::RwLock;
30use std::time::{Duration, Instant};
31
32use tokio::sync::broadcast;
33
34use crate::subscription::event::ChangeEvent;
35
36// ---------------------------------------------------------------------------
37// SubscriptionId
38// ---------------------------------------------------------------------------
39
40/// Unique subscription identifier.
41///
42/// Monotonically assigned by [`SubscriptionRegistry`].
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub struct SubscriptionId(pub u64);
45
46impl std::fmt::Display for SubscriptionId {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        write!(f, "sub-{}", self.0)
49    }
50}
51
52// ---------------------------------------------------------------------------
53// SubscriptionState
54// ---------------------------------------------------------------------------
55
56/// Lifecycle state of a subscription.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum SubscriptionState {
59    /// Actively receiving events.
60    Active,
61    /// Temporarily paused (events are buffered or dropped per config).
62    Paused,
63    /// Cancelled and pending cleanup.
64    Cancelled,
65}
66
67// ---------------------------------------------------------------------------
68// BackpressureStrategy
69// ---------------------------------------------------------------------------
70
71/// Strategy applied when a subscription's channel buffer is full.
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum BackpressureStrategy {
74    /// Drop oldest events when buffer full (real-time priority).
75    DropOldest,
76    /// Drop newest events when buffer full (completeness priority).
77    DropNewest,
78    /// Block the dispatcher (only blocks Ring 1 dispatch, NOT Ring 0).
79    Block,
80    /// Sample: deliver every Nth event.
81    Sample(usize),
82}
83
84// ---------------------------------------------------------------------------
85// SubscriptionConfig
86// ---------------------------------------------------------------------------
87
88/// Configuration for a subscription.
89#[derive(Debug, Clone)]
90pub struct SubscriptionConfig {
91    /// Channel buffer capacity.
92    pub buffer_size: usize,
93    /// Backpressure strategy when buffer is full.
94    pub backpressure: BackpressureStrategy,
95    /// Optional filter predicate (evaluated in Ring 1).
96    pub filter: Option<String>,
97    /// Whether to send an initial snapshot on subscribe.
98    pub send_snapshot: bool,
99    /// Maximum batch size for delivery.
100    pub max_batch_size: usize,
101    /// Maximum batch delay before flushing (microseconds).
102    pub max_batch_delay_us: u64,
103}
104
105impl Default for SubscriptionConfig {
106    fn default() -> Self {
107        Self {
108            buffer_size: 1024,
109            backpressure: BackpressureStrategy::DropOldest,
110            filter: None,
111            send_snapshot: false,
112            max_batch_size: 64,
113            max_batch_delay_us: 100,
114        }
115    }
116}
117
118// ---------------------------------------------------------------------------
119// SubscriptionEntry
120// ---------------------------------------------------------------------------
121
122/// A registered subscription entry with state, config, channel, and metrics.
123#[derive(Debug)]
124pub struct SubscriptionEntry {
125    /// Unique ID.
126    pub id: SubscriptionId,
127    /// Target source/MV name.
128    pub source_name: String,
129    /// Source ID for Ring 0 notification matching.
130    pub source_id: u32,
131    /// Current lifecycle state.
132    pub state: SubscriptionState,
133    /// Configuration.
134    pub config: SubscriptionConfig,
135    /// Broadcast channel sender for this subscription.
136    pub sender: broadcast::Sender<ChangeEvent>,
137    /// Creation timestamp.
138    pub created_at: Instant,
139    /// Total events delivered.
140    pub events_delivered: u64,
141    /// Total events dropped (backpressure).
142    pub events_dropped: u64,
143    /// Current lag (events pending in channel).
144    pub current_lag: u64,
145}
146
147// ---------------------------------------------------------------------------
148// SubscriptionMetrics
149// ---------------------------------------------------------------------------
150
151/// Point-in-time metrics snapshot for a subscription.
152#[derive(Debug, Clone)]
153pub struct SubscriptionMetrics {
154    /// Subscription ID.
155    pub id: SubscriptionId,
156    /// Source name.
157    pub source_name: String,
158    /// Current state.
159    pub state: SubscriptionState,
160    /// Total events delivered.
161    pub events_delivered: u64,
162    /// Total events dropped.
163    pub events_dropped: u64,
164    /// Current lag.
165    pub current_lag: u64,
166    /// Time since creation.
167    pub age: Duration,
168}
169
170// ---------------------------------------------------------------------------
171// SubscriptionRegistry
172// ---------------------------------------------------------------------------
173
174/// Registry managing all active subscriptions.
175///
176/// Thread-safe via internal [`RwLock`]. Read operations (used by the Ring 1
177/// dispatcher) take a read lock. Write operations (create/cancel) take a write
178/// lock but are rare Ring 2 operations.
179///
180/// Three indices provide O(1) lookups:
181/// - `subscriptions`: by [`SubscriptionId`]
182/// - `by_source`: by `source_id` (u32) — used by the dispatcher
183/// - `by_name`: by source name (String) — used by admin API
184///
185/// # Panics
186///
187/// All methods on this type panic if an internal `RwLock` has been poisoned
188/// (i.e., a thread panicked while holding the lock). This should not occur
189/// under normal operation.
190pub struct SubscriptionRegistry {
191    /// All subscriptions by ID.
192    subscriptions: RwLock<HashMap<SubscriptionId, SubscriptionEntry>>,
193    /// Index: `source_id` → subscription IDs.
194    by_source: RwLock<HashMap<u32, Vec<SubscriptionId>>>,
195    /// Index: source name → subscription IDs.
196    by_name: RwLock<HashMap<String, Vec<SubscriptionId>>>,
197    /// Next subscription ID (monotonically increasing).
198    next_id: AtomicU64,
199}
200
201#[allow(clippy::missing_panics_doc)] // All methods panic only on poisoned RwLock
202impl SubscriptionRegistry {
203    /// Creates a new empty registry.
204    #[must_use]
205    pub fn new() -> Self {
206        Self {
207            subscriptions: RwLock::new(HashMap::new()),
208            by_source: RwLock::new(HashMap::new()),
209            by_name: RwLock::new(HashMap::new()),
210            next_id: AtomicU64::new(1),
211        }
212    }
213
214    /// Creates a new subscription for the given source.
215    ///
216    /// Returns the subscription ID and a broadcast [`Receiver`](broadcast::Receiver)
217    /// that the subscriber uses to receive [`ChangeEvent`]s.
218    ///
219    /// # Arguments
220    ///
221    /// * `source_name` — Name of the MV or streaming query.
222    /// * `source_id` — Ring 0 source identifier (from `NotificationHub`).
223    /// * `config` — Subscription configuration (buffer size, backpressure, etc.).
224    pub fn create(
225        &self,
226        source_name: String,
227        source_id: u32,
228        config: SubscriptionConfig,
229    ) -> (SubscriptionId, broadcast::Receiver<ChangeEvent>) {
230        let id = SubscriptionId(self.next_id.fetch_add(1, Ordering::Relaxed));
231        let (tx, rx) = broadcast::channel(config.buffer_size);
232
233        let entry = SubscriptionEntry {
234            id,
235            source_name: source_name.clone(),
236            source_id,
237            state: SubscriptionState::Active,
238            config,
239            sender: tx,
240            created_at: Instant::now(),
241            events_delivered: 0,
242            events_dropped: 0,
243            current_lag: 0,
244        };
245
246        // Insert into main map
247        self.subscriptions.write().unwrap().insert(id, entry);
248
249        // Insert into source index
250        self.by_source
251            .write()
252            .unwrap()
253            .entry(source_id)
254            .or_default()
255            .push(id);
256
257        // Insert into name index
258        self.by_name
259            .write()
260            .unwrap()
261            .entry(source_name)
262            .or_default()
263            .push(id);
264
265        (id, rx)
266    }
267
268    /// Pauses an active subscription.
269    ///
270    /// Returns `true` if the subscription was `Active` and is now `Paused`.
271    /// Returns `false` if the subscription does not exist or is not active.
272    pub fn pause(&self, id: SubscriptionId) -> bool {
273        let mut subs = self.subscriptions.write().unwrap();
274        if let Some(entry) = subs.get_mut(&id) {
275            if entry.state == SubscriptionState::Active {
276                entry.state = SubscriptionState::Paused;
277                return true;
278            }
279        }
280        false
281    }
282
283    /// Resumes a paused subscription.
284    ///
285    /// Returns `true` if the subscription was `Paused` and is now `Active`.
286    /// Returns `false` if the subscription does not exist or is not paused.
287    pub fn resume(&self, id: SubscriptionId) -> bool {
288        let mut subs = self.subscriptions.write().unwrap();
289        if let Some(entry) = subs.get_mut(&id) {
290            if entry.state == SubscriptionState::Paused {
291                entry.state = SubscriptionState::Active;
292                return true;
293            }
294        }
295        false
296    }
297
298    /// Cancels a subscription and removes it from all indices.
299    ///
300    /// Returns `true` if the subscription existed and was removed.
301    pub fn cancel(&self, id: SubscriptionId) -> bool {
302        let entry = self.subscriptions.write().unwrap().remove(&id);
303
304        if let Some(entry) = entry {
305            // Remove from source index
306            if let Some(ids) = self.by_source.write().unwrap().get_mut(&entry.source_id) {
307                ids.retain(|&i| i != id);
308            }
309
310            // Remove from name index
311            if let Some(ids) = self.by_name.write().unwrap().get_mut(&entry.source_name) {
312                ids.retain(|&i| i != id);
313            }
314
315            true
316        } else {
317            false
318        }
319    }
320
321    /// Returns broadcast senders for all active subscriptions of a source.
322    ///
323    /// Called by the Ring 1 dispatcher on every notification. Uses a read lock
324    /// for fast concurrent access.
325    #[must_use]
326    pub fn get_senders_for_source(&self, source_id: u32) -> Vec<broadcast::Sender<ChangeEvent>> {
327        let by_source = self.by_source.read().unwrap();
328        let Some(ids) = by_source.get(&source_id) else {
329            return Vec::new();
330        };
331
332        let subs = self.subscriptions.read().unwrap();
333        ids.iter()
334            .filter_map(|id| {
335                subs.get(id).and_then(|entry| {
336                    if entry.state == SubscriptionState::Active {
337                        Some(entry.sender.clone())
338                    } else {
339                        None
340                    }
341                })
342            })
343            .collect()
344    }
345
346    /// Returns subscription IDs for the given source name.
347    #[must_use]
348    pub fn get_subscriptions_by_name(&self, name: &str) -> Vec<SubscriptionId> {
349        let by_name = self.by_name.read().unwrap();
350        by_name.get(name).cloned().unwrap_or_default()
351    }
352
353    /// Returns the total number of registered subscriptions.
354    #[must_use]
355    pub fn subscription_count(&self) -> usize {
356        self.subscriptions.read().unwrap().len()
357    }
358
359    /// Returns the number of active subscriptions.
360    #[must_use]
361    pub fn active_count(&self) -> usize {
362        self.subscriptions
363            .read()
364            .unwrap()
365            .values()
366            .filter(|e| e.state == SubscriptionState::Active)
367            .count()
368    }
369
370    /// Returns a metrics snapshot for the given subscription.
371    #[must_use]
372    pub fn metrics(&self, id: SubscriptionId) -> Option<SubscriptionMetrics> {
373        let subs = self.subscriptions.read().unwrap();
374        subs.get(&id).map(|entry| SubscriptionMetrics {
375            id: entry.id,
376            source_name: entry.source_name.clone(),
377            state: entry.state,
378            events_delivered: entry.events_delivered,
379            events_dropped: entry.events_dropped,
380            current_lag: entry.current_lag,
381            age: entry.created_at.elapsed(),
382        })
383    }
384
385    /// Returns the state of a subscription.
386    #[must_use]
387    pub fn state(&self, id: SubscriptionId) -> Option<SubscriptionState> {
388        self.subscriptions.read().unwrap().get(&id).map(|e| e.state)
389    }
390
391    /// Increments the delivered event count for a subscription.
392    ///
393    /// Called by the Ring 1 dispatcher after successful delivery.
394    pub fn record_delivery(&self, id: SubscriptionId, count: u64) {
395        if let Some(entry) = self.subscriptions.write().unwrap().get_mut(&id) {
396            entry.events_delivered += count;
397        }
398    }
399
400    /// Increments the dropped event count for a subscription.
401    ///
402    /// Called by the Ring 1 dispatcher on backpressure.
403    pub fn record_drop(&self, id: SubscriptionId, count: u64) {
404        if let Some(entry) = self.subscriptions.write().unwrap().get_mut(&id) {
405            entry.events_dropped += count;
406        }
407    }
408}
409
410impl Default for SubscriptionRegistry {
411    fn default() -> Self {
412        Self::new()
413    }
414}
415
416// ===========================================================================
417// Tests
418// ===========================================================================
419
420#[cfg(test)]
421#[allow(clippy::cast_possible_wrap)]
422mod tests {
423    use super::*;
424    use std::sync::Arc;
425
426    use arrow_array::Int64Array;
427    use arrow_schema::{DataType, Field, Schema};
428
429    fn make_batch(n: usize) -> arrow_array::RecordBatch {
430        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
431        let values: Vec<i64> = (0..n as i64).collect();
432        let array = Int64Array::from(values);
433        arrow_array::RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
434    }
435
436    // --- Config tests ---
437
438    #[test]
439    fn test_registry_config_default() {
440        let cfg = SubscriptionConfig::default();
441        assert_eq!(cfg.buffer_size, 1024);
442        assert_eq!(cfg.backpressure, BackpressureStrategy::DropOldest);
443        assert!(cfg.filter.is_none());
444        assert!(!cfg.send_snapshot);
445        assert_eq!(cfg.max_batch_size, 64);
446        assert_eq!(cfg.max_batch_delay_us, 100);
447    }
448
449    // --- Create tests ---
450
451    #[test]
452    fn test_registry_create() {
453        let reg = SubscriptionRegistry::new();
454        let (id, _rx) = reg.create("mv_orders".into(), 0, SubscriptionConfig::default());
455        assert_eq!(id.0, 1);
456        assert_eq!(reg.subscription_count(), 1);
457        assert_eq!(reg.active_count(), 1);
458    }
459
460    #[test]
461    fn test_registry_create_multiple() {
462        let reg = SubscriptionRegistry::new();
463        let (id1, _rx1) = reg.create("mv_orders".into(), 0, SubscriptionConfig::default());
464        let (id2, _rx2) = reg.create("mv_orders".into(), 0, SubscriptionConfig::default());
465        let (id3, _rx3) = reg.create("mv_trades".into(), 1, SubscriptionConfig::default());
466
467        assert_ne!(id1, id2);
468        assert_ne!(id2, id3);
469        assert_eq!(reg.subscription_count(), 3);
470
471        // Two subs for source 0, one for source 1
472        let senders_0 = reg.get_senders_for_source(0);
473        assert_eq!(senders_0.len(), 2);
474        let senders_1 = reg.get_senders_for_source(1);
475        assert_eq!(senders_1.len(), 1);
476    }
477
478    // --- Pause / Resume tests ---
479
480    #[test]
481    fn test_registry_pause_resume() {
482        let reg = SubscriptionRegistry::new();
483        let (id, _rx) = reg.create("mv_orders".into(), 0, SubscriptionConfig::default());
484
485        // Pause active -> true
486        assert!(reg.pause(id));
487        assert_eq!(reg.state(id), Some(SubscriptionState::Paused));
488        assert_eq!(reg.active_count(), 0);
489
490        // Pause again -> false (already paused)
491        assert!(!reg.pause(id));
492
493        // Resume paused -> true
494        assert!(reg.resume(id));
495        assert_eq!(reg.state(id), Some(SubscriptionState::Active));
496        assert_eq!(reg.active_count(), 1);
497
498        // Resume again -> false (already active)
499        assert!(!reg.resume(id));
500    }
501
502    // --- Cancel tests ---
503
504    #[test]
505    fn test_registry_cancel() {
506        let reg = SubscriptionRegistry::new();
507        let (id, _rx) = reg.create("mv_orders".into(), 0, SubscriptionConfig::default());
508        assert_eq!(reg.subscription_count(), 1);
509
510        assert!(reg.cancel(id));
511        assert_eq!(reg.subscription_count(), 0);
512        assert_eq!(reg.active_count(), 0);
513
514        // Source index cleaned up
515        let senders = reg.get_senders_for_source(0);
516        assert!(senders.is_empty());
517
518        // Name index cleaned up
519        let by_name = reg.get_subscriptions_by_name("mv_orders");
520        assert!(by_name.is_empty());
521    }
522
523    #[test]
524    fn test_registry_cancel_nonexistent() {
525        let reg = SubscriptionRegistry::new();
526        assert!(!reg.cancel(SubscriptionId(999)));
527    }
528
529    // --- Sender lookup tests ---
530
531    #[test]
532    fn test_registry_get_senders() {
533        let reg = SubscriptionRegistry::new();
534        let (_, _rx1) = reg.create("mv_a".into(), 0, SubscriptionConfig::default());
535        let (_, _rx2) = reg.create("mv_b".into(), 0, SubscriptionConfig::default());
536
537        let senders = reg.get_senders_for_source(0);
538        assert_eq!(senders.len(), 2);
539    }
540
541    #[test]
542    fn test_registry_get_senders_paused_excluded() {
543        let reg = SubscriptionRegistry::new();
544        let (id1, _rx1) = reg.create("mv_a".into(), 0, SubscriptionConfig::default());
545        let (_, _rx2) = reg.create("mv_b".into(), 0, SubscriptionConfig::default());
546
547        reg.pause(id1);
548        let senders = reg.get_senders_for_source(0);
549        assert_eq!(senders.len(), 1);
550    }
551
552    #[test]
553    fn test_registry_get_senders_no_source() {
554        let reg = SubscriptionRegistry::new();
555        let senders = reg.get_senders_for_source(42);
556        assert!(senders.is_empty());
557    }
558
559    // --- Metrics tests ---
560
561    #[test]
562    fn test_registry_subscription_count() {
563        let reg = SubscriptionRegistry::new();
564        assert_eq!(reg.subscription_count(), 0);
565        assert_eq!(reg.active_count(), 0);
566
567        let (id1, _rx1) = reg.create("mv_a".into(), 0, SubscriptionConfig::default());
568        let (_, _rx2) = reg.create("mv_b".into(), 1, SubscriptionConfig::default());
569        assert_eq!(reg.subscription_count(), 2);
570        assert_eq!(reg.active_count(), 2);
571
572        reg.pause(id1);
573        assert_eq!(reg.subscription_count(), 2);
574        assert_eq!(reg.active_count(), 1);
575    }
576
577    #[test]
578    fn test_registry_metrics() {
579        let reg = SubscriptionRegistry::new();
580        let (id, _rx) = reg.create("mv_orders".into(), 0, SubscriptionConfig::default());
581
582        let m = reg.metrics(id).unwrap();
583        assert_eq!(m.id, id);
584        assert_eq!(m.source_name, "mv_orders");
585        assert_eq!(m.state, SubscriptionState::Active);
586        assert_eq!(m.events_delivered, 0);
587        assert_eq!(m.events_dropped, 0);
588        assert_eq!(m.current_lag, 0);
589
590        // Nonexistent subscription
591        assert!(reg.metrics(SubscriptionId(999)).is_none());
592    }
593
594    #[test]
595    fn test_registry_record_delivery_and_drop() {
596        let reg = SubscriptionRegistry::new();
597        let (id, _rx) = reg.create("mv_a".into(), 0, SubscriptionConfig::default());
598
599        reg.record_delivery(id, 10);
600        reg.record_delivery(id, 5);
601        reg.record_drop(id, 2);
602
603        let m = reg.metrics(id).unwrap();
604        assert_eq!(m.events_delivered, 15);
605        assert_eq!(m.events_dropped, 2);
606    }
607
608    // --- Thread safety tests ---
609
610    #[test]
611    fn test_registry_thread_safety() {
612        let reg = Arc::new(SubscriptionRegistry::new());
613        let mut handles = Vec::new();
614
615        // Spawn 4 threads, each creating 100 subscriptions
616        for t in 0..4u32 {
617            let reg = Arc::clone(&reg);
618            handles.push(std::thread::spawn(move || {
619                let mut ids = Vec::new();
620                for i in 0..100u32 {
621                    let name = format!("mv_{t}_{i}");
622                    let (id, _rx) = reg.create(name, t, SubscriptionConfig::default());
623                    ids.push(id);
624                }
625                ids
626            }));
627        }
628
629        let all_ids: Vec<Vec<SubscriptionId>> =
630            handles.into_iter().map(|h| h.join().unwrap()).collect();
631
632        // All 400 subscriptions created
633        assert_eq!(reg.subscription_count(), 400);
634
635        // All IDs unique
636        let mut flat: Vec<u64> = all_ids.iter().flatten().map(|id| id.0).collect();
637        flat.sort_unstable();
638        flat.dedup();
639        assert_eq!(flat.len(), 400);
640
641        // Each source has 100 senders
642        for t in 0..4u32 {
643            let senders = reg.get_senders_for_source(t);
644            assert_eq!(senders.len(), 100);
645        }
646
647        // Cancel half from thread 0
648        for id in &all_ids[0][..50] {
649            assert!(reg.cancel(*id));
650        }
651        assert_eq!(reg.subscription_count(), 350);
652        assert_eq!(reg.get_senders_for_source(0).len(), 50);
653    }
654
655    // --- Integration tests ---
656
657    #[test]
658    fn test_registry_with_notification_hub() {
659        use crate::subscription::NotificationHub;
660
661        let mut hub = NotificationHub::new(4, 64);
662        let reg = SubscriptionRegistry::new();
663
664        // Register a source in the hub, then subscribe to it in the registry
665        let source_id = hub.register_source().unwrap();
666        let (sub_id, _rx) =
667            reg.create("mv_orders".into(), source_id, SubscriptionConfig::default());
668
669        // Verify the mapping works
670        let senders = reg.get_senders_for_source(source_id);
671        assert_eq!(senders.len(), 1);
672
673        // Notify through hub
674        assert!(hub.notify_source(
675            source_id,
676            crate::subscription::EventType::Insert,
677            10,
678            1000,
679            0,
680        ));
681
682        // Drain from hub
683        let mut count = 0;
684        hub.drain_notifications(|_n| count += 1);
685        assert_eq!(count, 1);
686
687        // Cleanup
688        reg.cancel(sub_id);
689        assert!(reg.get_senders_for_source(source_id).is_empty());
690    }
691
692    #[test]
693    fn test_registry_broadcast_delivery() {
694        let reg = SubscriptionRegistry::new();
695        let (_, mut rx1) = reg.create("mv_a".into(), 0, SubscriptionConfig::default());
696        let (_, mut rx2) = reg.create("mv_a".into(), 0, SubscriptionConfig::default());
697
698        // Get senders and broadcast
699        let senders = reg.get_senders_for_source(0);
700        assert_eq!(senders.len(), 2);
701
702        let batch = Arc::new(make_batch(5));
703        let event = ChangeEvent::insert(batch, 1000, 1);
704
705        for sender in &senders {
706            sender.send(event.clone()).unwrap();
707        }
708
709        // Both receivers get the event
710        let e1 = rx1.try_recv().unwrap();
711        assert_eq!(e1.timestamp(), 1000);
712        assert_eq!(e1.sequence(), Some(1));
713        assert_eq!(e1.row_count(), 5);
714
715        let e2 = rx2.try_recv().unwrap();
716        assert_eq!(e2.timestamp(), 1000);
717        assert_eq!(e2.sequence(), Some(1));
718    }
719
720    #[test]
721    fn test_subscription_id_display() {
722        let id = SubscriptionId(42);
723        assert_eq!(format!("{id}"), "sub-42");
724    }
725}