Skip to main content

bext_realtime/
hub.rs

1//! Core pub/sub hub that manages subscribers, topic subscriptions, event
2//! broadcasting, and replay buffers for the realtime subsystem.
3
4use std::collections::{HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use chrono::Utc;
8use dashmap::DashMap;
9use parking_lot::Mutex;
10use serde_json::Value;
11use tokio::sync::mpsc;
12use tracing::warn;
13
14use crate::message::HubEvent;
15use crate::topic::TopicMatcher;
16
17/// Configuration for the hub.
18#[derive(Debug, Clone)]
19pub struct HubConfig {
20    /// Maximum concurrent subscriber connections. 0 = unlimited.
21    pub max_connections: usize,
22    /// Heartbeat interval in milliseconds (used by SSE/WS layers above).
23    pub heartbeat_interval_ms: u64,
24    /// Maximum events kept in the replay buffer for Last-Event-ID catchup.
25    pub replay_buffer_size: usize,
26}
27
28impl Default for HubConfig {
29    fn default() -> Self {
30        Self {
31            max_connections: 10_000,
32            heartbeat_interval_ms: 30_000,
33            replay_buffer_size: 1_000,
34        }
35    }
36}
37
38/// Represents a single connected subscriber.
39#[derive(Debug)]
40pub struct Subscriber {
41    pub id: u64,
42    /// Topic patterns this subscriber is interested in.
43    pub topics: Vec<String>,
44    /// Bounded channel sender for delivering events.
45    /// Capacity is limited to prevent OOM from slow/stalled clients.
46    pub sender: mpsc::Sender<HubEvent>,
47    /// When this subscriber connected.
48    pub created_at: chrono::DateTime<Utc>,
49}
50
51/// Live counters for the hub.
52pub struct HubStats {
53    pub total_published: AtomicU64,
54    pub total_delivered: AtomicU64,
55    pub active_connections: AtomicU64,
56}
57
58impl Default for HubStats {
59    fn default() -> Self {
60        Self {
61            total_published: AtomicU64::new(0),
62            total_delivered: AtomicU64::new(0),
63            active_connections: AtomicU64::new(0),
64        }
65    }
66}
67
68/// Point-in-time snapshot of hub statistics.
69#[derive(Debug, Clone, serde::Serialize)]
70pub struct HubStatsSnapshot {
71    pub active_connections: u64,
72    pub total_published: u64,
73    pub total_delivered: u64,
74    pub topic_count: usize,
75    pub subscriber_count: usize,
76    pub uptime_secs: f64,
77}
78
79/// Core pub/sub hub. Thread-safe, designed to be shared via `Arc<BextHub>`.
80pub struct BextHub {
81    /// subscriber_id -> Subscriber
82    subscribers: DashMap<u64, Subscriber>,
83    /// topic_pattern -> list of subscriber IDs registered for that exact pattern string
84    topics: DashMap<String, Vec<u64>>,
85    /// Monotonic event ID counter.
86    next_id: AtomicU64,
87    /// Monotonic subscriber ID counter.
88    next_subscriber_id: AtomicU64,
89    /// Bounded ring buffer for event replay.
90    replay_buffer: Mutex<VecDeque<HubEvent>>,
91    /// Live statistics.
92    stats: HubStats,
93    /// Configuration.
94    config: HubConfig,
95    /// When the hub was created.
96    created_at: chrono::DateTime<Utc>,
97}
98
99impl BextHub {
100    /// Create a new hub with the given configuration.
101    pub fn new(config: HubConfig) -> Self {
102        Self {
103            subscribers: DashMap::new(),
104            topics: DashMap::new(),
105            next_id: AtomicU64::new(1),
106            next_subscriber_id: AtomicU64::new(1),
107            replay_buffer: Mutex::new(VecDeque::with_capacity(config.replay_buffer_size)),
108            stats: HubStats::default(),
109            config,
110            created_at: Utc::now(),
111        }
112    }
113
114    /// Subscribe to a set of topic patterns.
115    ///
116    /// Returns `(subscriber_id, receiver)`. The receiver yields `HubEvent`s
117    /// that match the requested patterns.
118    ///
119    /// Returns `None` if `max_connections` has been reached.
120    pub fn subscribe(
121        &self,
122        topics: Vec<String>,
123    ) -> Option<(u64, mpsc::Receiver<HubEvent>)> {
124        // Enforce connection limit
125        if self.config.max_connections > 0 {
126            let current = self.stats.active_connections.load(Ordering::Relaxed);
127            if current >= self.config.max_connections as u64 {
128                warn!(
129                    limit = self.config.max_connections,
130                    "hub: max connections reached"
131                );
132                return None;
133            }
134        }
135
136        let id = self.next_subscriber_id.fetch_add(1, Ordering::Relaxed);
137        // Bounded channel: 256 pending events per subscriber. If a client
138        // falls behind (slow reader / stalled SSE connection), sends will
139        // fail and the subscriber is removed — preventing unbounded memory
140        // growth that could lead to OOM.
141        let (tx, rx) = mpsc::channel(256);
142
143        let subscriber = Subscriber {
144            id,
145            topics: topics.clone(),
146            sender: tx,
147            created_at: Utc::now(),
148        };
149        self.subscribers.insert(id, subscriber);
150
151        // Register subscriber under each topic pattern.
152        for topic in &topics {
153            self.topics.entry(topic.clone()).or_default().push(id);
154        }
155
156        self.stats
157            .active_connections
158            .fetch_add(1, Ordering::Relaxed);
159        Some((id, rx))
160    }
161
162    /// Remove a subscriber and unregister from all topics.
163    pub fn unsubscribe(&self, subscriber_id: u64) {
164        if let Some((_, subscriber)) = self.subscribers.remove(&subscriber_id) {
165            for topic in &subscriber.topics {
166                if let Some(mut subs) = self.topics.get_mut(topic) {
167                    subs.retain(|&id| id != subscriber_id);
168                    // If the vec is empty, we can clean up the topic entry
169                    if subs.is_empty() {
170                        drop(subs);
171                        self.topics.remove(topic);
172                    }
173                }
174            }
175            self.stats
176                .active_connections
177                .fetch_sub(1, Ordering::Relaxed);
178        }
179    }
180
181    /// Add more topic subscriptions for an existing subscriber.
182    pub fn add_topics(&self, subscriber_id: u64, topics: Vec<String>) {
183        if let Some(mut sub) = self.subscribers.get_mut(&subscriber_id) {
184            for topic in &topics {
185                if !sub.topics.contains(topic) {
186                    sub.topics.push(topic.clone());
187                    self.topics
188                        .entry(topic.clone())
189                        .or_default()
190                        .push(subscriber_id);
191                }
192            }
193        }
194    }
195
196    /// Remove specific topic subscriptions from an existing subscriber.
197    pub fn remove_topics(&self, subscriber_id: u64, topics: Vec<String>) {
198        if let Some(mut sub) = self.subscribers.get_mut(&subscriber_id) {
199            for topic in &topics {
200                sub.topics.retain(|t| t != topic);
201                if let Some(mut subs) = self.topics.get_mut(topic) {
202                    subs.retain(|&id| id != subscriber_id);
203                    if subs.is_empty() {
204                        drop(subs);
205                        self.topics.remove(topic);
206                    }
207                }
208            }
209        }
210    }
211
212    /// Publish an event to all subscribers whose topic patterns match.
213    pub fn publish(&self, topic: &str, data: Value) {
214        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
215        let event = HubEvent {
216            id,
217            topic: topic.to_string(),
218            data,
219            timestamp: Utc::now(),
220        };
221
222        // Store in replay buffer
223        {
224            let mut buf = self.replay_buffer.lock();
225            if buf.len() >= self.config.replay_buffer_size {
226                buf.pop_front();
227            }
228            buf.push_back(event.clone());
229        }
230
231        self.stats.total_published.fetch_add(1, Ordering::Relaxed);
232
233        // Fan out: walk all topic patterns and match against the published topic.
234        // We iterate over the topics DashMap (which contains subscriber patterns)
235        // and use TopicMatcher to check if the pattern matches the concrete topic.
236        let mut delivered_to: HashSet<u64> = HashSet::new();
237        let mut dead_subscribers: Vec<u64> = Vec::new();
238
239        for entry in self.topics.iter() {
240            let pattern = entry.key();
241            if TopicMatcher::matches(pattern, topic) {
242                for &sub_id in entry.value() {
243                    if delivered_to.contains(&sub_id) {
244                        continue; // Don't deliver the same event twice
245                    }
246                    if let Some(sub) = self.subscribers.get(&sub_id) {
247                        match sub.sender.try_send(event.clone()) {
248                            Ok(()) => {
249                                self.stats.total_delivered.fetch_add(1, Ordering::Relaxed);
250                                delivered_to.insert(sub_id);
251                            }
252                            Err(mpsc::error::TrySendError::Closed(_)) => {
253                                dead_subscribers.push(sub_id);
254                            }
255                            Err(mpsc::error::TrySendError::Full(_)) => {
256                                warn!(subscriber_id = sub_id, "dropping slow subscriber (channel full)");
257                                dead_subscribers.push(sub_id);
258                            }
259                        }
260                    }
261                }
262            }
263        }
264
265        // Eagerly clean up dead subscribers to prevent memory leaks.
266        for dead_id in dead_subscribers {
267            self.remove_subscriber_from_topics(dead_id);
268            self.subscribers.remove(&dead_id);
269            self.stats
270                .active_connections
271                .fetch_sub(1, Ordering::Relaxed);
272        }
273    }
274
275    /// Publish a pre-built event (used by Redis relay to inject remote events).
276    pub fn publish_event(&self, event: HubEvent) {
277        // Store in replay buffer
278        {
279            let mut buf = self.replay_buffer.lock();
280            if buf.len() >= self.config.replay_buffer_size {
281                buf.pop_front();
282            }
283            buf.push_back(event.clone());
284        }
285
286        self.stats.total_published.fetch_add(1, Ordering::Relaxed);
287
288        let mut delivered_to: HashSet<u64> = HashSet::new();
289        let mut dead_subscribers: Vec<u64> = Vec::new();
290
291        for entry in self.topics.iter() {
292            let pattern = entry.key();
293            if TopicMatcher::matches(pattern, &event.topic) {
294                for &sub_id in entry.value() {
295                    if delivered_to.contains(&sub_id) {
296                        continue;
297                    }
298                    if let Some(sub) = self.subscribers.get(&sub_id) {
299                        match sub.sender.try_send(event.clone()) {
300                            Ok(()) => {
301                                self.stats.total_delivered.fetch_add(1, Ordering::Relaxed);
302                                delivered_to.insert(sub_id);
303                            }
304                            Err(mpsc::error::TrySendError::Closed(_)) => {
305                                dead_subscribers.push(sub_id);
306                            }
307                            Err(mpsc::error::TrySendError::Full(_)) => {
308                                warn!(subscriber_id = sub_id, "dropping slow subscriber (channel full)");
309                                dead_subscribers.push(sub_id);
310                            }
311                        }
312                    }
313                }
314            }
315        }
316
317        // Eagerly clean up dead subscribers to prevent memory leaks.
318        for dead_id in dead_subscribers {
319            self.remove_subscriber_from_topics(dead_id);
320            self.subscribers.remove(&dead_id);
321            self.stats
322                .active_connections
323                .fetch_sub(1, Ordering::Relaxed);
324        }
325    }
326
327    /// Remove a subscriber ID from all topic subscription lists.
328    ///
329    /// This is used during eager dead-channel cleanup. It mirrors the
330    /// topic-cleanup logic in `unsubscribe()` but works from just the
331    /// subscriber's stored topics.
332    fn remove_subscriber_from_topics(&self, subscriber_id: u64) {
333        if let Some(sub) = self.subscribers.get(&subscriber_id) {
334            for topic in &sub.topics {
335                if let Some(mut subs) = self.topics.get_mut(topic) {
336                    subs.retain(|&id| id != subscriber_id);
337                    if subs.is_empty() {
338                        drop(subs);
339                        self.topics.remove(topic);
340                    }
341                }
342            }
343        }
344    }
345
346    /// Replay events since a given `last_event_id` (exclusive).
347    ///
348    /// Returns events with id > `last_event_id`, in order.
349    pub fn replay_since(&self, last_event_id: u64) -> Vec<HubEvent> {
350        let buf = self.replay_buffer.lock();
351        buf.iter()
352            .filter(|e| e.id > last_event_id)
353            .cloned()
354            .collect()
355    }
356
357    /// Number of active subscribers.
358    pub fn subscriber_count(&self) -> usize {
359        self.subscribers.len()
360    }
361
362    /// Number of distinct topic patterns with at least one subscriber.
363    pub fn topic_count(&self) -> usize {
364        self.topics.len()
365    }
366
367    /// Point-in-time statistics snapshot.
368    pub fn stats(&self) -> HubStatsSnapshot {
369        let uptime = Utc::now()
370            .signed_duration_since(self.created_at)
371            .num_milliseconds() as f64
372            / 1_000.0;
373
374        HubStatsSnapshot {
375            active_connections: self.stats.active_connections.load(Ordering::Relaxed),
376            total_published: self.stats.total_published.load(Ordering::Relaxed),
377            total_delivered: self.stats.total_delivered.load(Ordering::Relaxed),
378            topic_count: self.topics.len(),
379            subscriber_count: self.subscribers.len(),
380            uptime_secs: uptime,
381        }
382    }
383
384    /// Access the config (read-only).
385    pub fn config(&self) -> &HubConfig {
386        &self.config
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use serde_json::json;
394    use std::sync::Arc;
395
396    fn default_hub() -> BextHub {
397        BextHub::new(HubConfig::default())
398    }
399
400    // ── subscribe / unsubscribe ─────────────────────────────────────
401
402    #[test]
403    fn subscribe_returns_id_and_receiver() {
404        let hub = default_hub();
405        let result = hub.subscribe(vec!["test".to_string()]);
406        assert!(result.is_some());
407        let (id, _rx) = result.unwrap();
408        assert!(id > 0);
409    }
410
411    #[test]
412    fn subscribe_increments_active_connections() {
413        let hub = default_hub();
414        assert_eq!(hub.subscriber_count(), 0);
415
416        hub.subscribe(vec!["a".to_string()]);
417        assert_eq!(hub.subscriber_count(), 1);
418
419        hub.subscribe(vec!["b".to_string()]);
420        assert_eq!(hub.subscriber_count(), 2);
421    }
422
423    #[test]
424    fn unsubscribe_decrements_active_connections() {
425        let hub = default_hub();
426        let (id, _rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
427        assert_eq!(hub.subscriber_count(), 1);
428
429        hub.unsubscribe(id);
430        assert_eq!(hub.subscriber_count(), 0);
431    }
432
433    #[test]
434    fn unsubscribe_nonexistent_is_noop() {
435        let hub = default_hub();
436        hub.unsubscribe(999); // Should not panic
437    }
438
439    #[test]
440    fn unsubscribe_cleans_up_topic_entries() {
441        let hub = default_hub();
442        let (id, _rx) = hub.subscribe(vec!["topic/a".to_string()]).unwrap();
443        assert_eq!(hub.topic_count(), 1);
444
445        hub.unsubscribe(id);
446        assert_eq!(hub.topic_count(), 0);
447    }
448
449    // ── max_connections ─────────────────────────────────────────────
450
451    #[test]
452    fn max_connections_enforced() {
453        let hub = BextHub::new(HubConfig {
454            max_connections: 2,
455            ..Default::default()
456        });
457
458        let _s1 = hub.subscribe(vec!["a".to_string()]).unwrap();
459        let _s2 = hub.subscribe(vec!["b".to_string()]).unwrap();
460        let s3 = hub.subscribe(vec!["c".to_string()]);
461        assert!(s3.is_none());
462    }
463
464    #[test]
465    fn max_connections_zero_means_unlimited() {
466        let hub = BextHub::new(HubConfig {
467            max_connections: 0,
468            ..Default::default()
469        });
470
471        for i in 0..100 {
472            let r = hub.subscribe(vec![format!("t/{}", i)]);
473            assert!(r.is_some());
474        }
475    }
476
477    // ── publish / receive ───────────────────────────────────────────
478
479    #[tokio::test]
480    async fn publish_delivers_to_exact_subscriber() {
481        let hub = default_hub();
482        let (_id, mut rx) = hub.subscribe(vec!["deploy".to_string()]).unwrap();
483
484        hub.publish("deploy", json!({"v": 1}));
485
486        let evt = rx.recv().await.unwrap();
487        assert_eq!(evt.topic, "deploy");
488        assert_eq!(evt.data, json!({"v": 1}));
489    }
490
491    #[tokio::test]
492    async fn publish_does_not_deliver_non_matching() {
493        let hub = default_hub();
494        let (_id, mut rx) = hub.subscribe(vec!["deploy".to_string()]).unwrap();
495
496        hub.publish("restart", json!({"v": 1}));
497
498        // Channel should be empty
499        let result = rx.try_recv();
500        assert!(result.is_err());
501    }
502
503    #[tokio::test]
504    async fn publish_with_wildcard_subscriber() {
505        let hub = default_hub();
506        let (_id, mut rx) = hub.subscribe(vec!["app/*".to_string()]).unwrap();
507
508        hub.publish("app/marketing", json!({"action": "send"}));
509
510        let evt = rx.recv().await.unwrap();
511        assert_eq!(evt.topic, "app/marketing");
512    }
513
514    #[tokio::test]
515    async fn publish_with_multi_wildcard_subscriber() {
516        let hub = default_hub();
517        let (_id, mut rx) = hub.subscribe(vec!["app/#".to_string()]).unwrap();
518
519        hub.publish("app/marketing/events/click", json!({}));
520
521        let evt = rx.recv().await.unwrap();
522        assert_eq!(evt.topic, "app/marketing/events/click");
523    }
524
525    #[tokio::test]
526    async fn publish_to_multiple_subscribers() {
527        let hub = default_hub();
528        let (_id1, mut rx1) = hub.subscribe(vec!["events".to_string()]).unwrap();
529        let (_id2, mut rx2) = hub.subscribe(vec!["events".to_string()]).unwrap();
530
531        hub.publish("events", json!({"n": 1}));
532
533        let e1 = rx1.recv().await.unwrap();
534        let e2 = rx2.recv().await.unwrap();
535        assert_eq!(e1.data, json!({"n": 1}));
536        assert_eq!(e2.data, json!({"n": 1}));
537    }
538
539    #[tokio::test]
540    async fn publish_no_duplicate_delivery_from_overlapping_patterns() {
541        let hub = default_hub();
542        // Subscriber has two patterns that both match the same topic
543        let (_id, mut rx) = hub
544            .subscribe(vec!["app/deploy".to_string(), "app/#".to_string()])
545            .unwrap();
546
547        hub.publish("app/deploy", json!({"v": 1}));
548
549        let evt = rx.recv().await.unwrap();
550        assert_eq!(evt.topic, "app/deploy");
551
552        // Should NOT receive a second copy
553        let result = rx.try_recv();
554        assert!(result.is_err());
555    }
556
557    // ── add_topics / remove_topics ──────────────────────────────────
558
559    #[tokio::test]
560    async fn add_topics_enables_new_subscriptions() {
561        let hub = default_hub();
562        let (id, mut rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
563
564        // Initially doesn't get "b" events
565        hub.publish("b", json!(1));
566        assert!(rx.try_recv().is_err());
567
568        // Add "b" subscription
569        hub.add_topics(id, vec!["b".to_string()]);
570        hub.publish("b", json!(2));
571        let evt = rx.recv().await.unwrap();
572        assert_eq!(evt.data, json!(2));
573    }
574
575    #[tokio::test]
576    async fn remove_topics_disables_subscriptions() {
577        let hub = default_hub();
578        let (id, mut rx) = hub
579            .subscribe(vec!["a".to_string(), "b".to_string()])
580            .unwrap();
581
582        // Can receive "b" events
583        hub.publish("b", json!(1));
584        let _ = rx.recv().await.unwrap();
585
586        // Remove "b"
587        hub.remove_topics(id, vec!["b".to_string()]);
588        hub.publish("b", json!(2));
589        assert!(rx.try_recv().is_err());
590
591        // Still receives "a"
592        hub.publish("a", json!(3));
593        let evt = rx.recv().await.unwrap();
594        assert_eq!(evt.data, json!(3));
595    }
596
597    #[test]
598    fn add_topics_deduplicates() {
599        let hub = default_hub();
600        let (id, _rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
601        hub.add_topics(id, vec!["a".to_string()]);
602        // Should still only have one "a" pattern
603        let sub = hub.subscribers.get(&id).unwrap();
604        assert_eq!(sub.topics.iter().filter(|t| *t == "a").count(), 1);
605    }
606
607    // ── replay ──────────────────────────────────────────────────────
608
609    #[test]
610    fn replay_returns_events_after_id() {
611        let hub = default_hub();
612        hub.publish("a", json!(1));
613        hub.publish("a", json!(2));
614        hub.publish("a", json!(3));
615
616        let replayed = hub.replay_since(1);
617        assert_eq!(replayed.len(), 2);
618        assert_eq!(replayed[0].data, json!(2));
619        assert_eq!(replayed[1].data, json!(3));
620    }
621
622    #[test]
623    fn replay_since_zero_returns_all() {
624        let hub = default_hub();
625        hub.publish("a", json!(1));
626        hub.publish("a", json!(2));
627
628        let replayed = hub.replay_since(0);
629        assert_eq!(replayed.len(), 2);
630    }
631
632    #[test]
633    fn replay_since_future_id_returns_empty() {
634        let hub = default_hub();
635        hub.publish("a", json!(1));
636
637        let replayed = hub.replay_since(999);
638        assert!(replayed.is_empty());
639    }
640
641    #[test]
642    fn replay_buffer_wraps_around() {
643        let hub = BextHub::new(HubConfig {
644            replay_buffer_size: 3,
645            ..Default::default()
646        });
647
648        hub.publish("a", json!(1)); // id 1
649        hub.publish("a", json!(2)); // id 2
650        hub.publish("a", json!(3)); // id 3
651        hub.publish("a", json!(4)); // id 4 → evicts id 1
652
653        let replayed = hub.replay_since(0);
654        assert_eq!(replayed.len(), 3);
655        assert_eq!(replayed[0].data, json!(2));
656        assert_eq!(replayed[2].data, json!(4));
657    }
658
659    // ── stats ───────────────────────────────────────────────────────
660
661    #[tokio::test]
662    async fn stats_track_published_and_delivered() {
663        let hub = default_hub();
664        let (_id, mut rx) = hub.subscribe(vec!["x".to_string()]).unwrap();
665
666        hub.publish("x", json!(1));
667        hub.publish("x", json!(2));
668
669        // Drain the receiver
670        let _ = rx.recv().await;
671        let _ = rx.recv().await;
672
673        let s = hub.stats();
674        assert_eq!(s.total_published, 2);
675        assert_eq!(s.total_delivered, 2);
676        assert_eq!(s.active_connections, 1);
677        assert_eq!(s.subscriber_count, 1);
678        assert!(s.uptime_secs >= 0.0);
679    }
680
681    #[test]
682    fn stats_topic_count() {
683        let hub = default_hub();
684        hub.subscribe(vec!["a".to_string(), "b".to_string()]);
685        assert_eq!(hub.stats().topic_count, 2);
686    }
687
688    // ── concurrent usage ────────────────────────────────────────────
689
690    #[tokio::test]
691    async fn concurrent_publish_subscribe() {
692        let hub = Arc::new(default_hub());
693        let mut handles = Vec::new();
694
695        // Spawn 10 subscribers
696        let mut receivers = Vec::new();
697        for _ in 0..10 {
698            let (_id, rx) = hub.subscribe(vec!["concurrent".to_string()]).unwrap();
699            receivers.push(rx);
700        }
701
702        // Spawn 10 publishers
703        for i in 0..10 {
704            let hub_clone = Arc::clone(&hub);
705            handles.push(tokio::spawn(async move {
706                hub_clone.publish("concurrent", json!(i));
707            }));
708        }
709
710        for h in handles {
711            h.await.unwrap();
712        }
713
714        // Each subscriber should receive all 10 events
715        for rx in &mut receivers {
716            let mut count = 0;
717            while rx.try_recv().is_ok() {
718                count += 1;
719            }
720            assert_eq!(count, 10);
721        }
722    }
723
724    #[tokio::test]
725    async fn concurrent_subscribe_unsubscribe() {
726        let hub = Arc::new(default_hub());
727        let mut handles = Vec::new();
728
729        for _ in 0..50 {
730            let hub_clone = Arc::clone(&hub);
731            handles.push(tokio::spawn(async move {
732                let (id, _rx) = hub_clone.subscribe(vec!["t".to_string()]).unwrap();
733                hub_clone.unsubscribe(id);
734            }));
735        }
736
737        for h in handles {
738            h.await.unwrap();
739        }
740
741        assert_eq!(hub.subscriber_count(), 0);
742    }
743
744    // ── publish_event ───────────────────────────────────────────────
745
746    #[tokio::test]
747    async fn publish_event_delivers_to_subscribers() {
748        let hub = default_hub();
749        let (_id, mut rx) = hub.subscribe(vec!["relay".to_string()]).unwrap();
750
751        let event = HubEvent {
752            id: 100,
753            topic: "relay".to_string(),
754            data: json!({"from": "remote"}),
755            timestamp: Utc::now(),
756        };
757        hub.publish_event(event.clone());
758
759        let evt = rx.recv().await.unwrap();
760        assert_eq!(evt.id, 100);
761        assert_eq!(evt.data, json!({"from": "remote"}));
762    }
763
764    #[test]
765    fn publish_event_stored_in_replay() {
766        let hub = default_hub();
767        let event = HubEvent {
768            id: 200,
769            topic: "relay".to_string(),
770            data: json!("test"),
771            timestamp: Utc::now(),
772        };
773        hub.publish_event(event);
774
775        let replayed = hub.replay_since(199);
776        assert_eq!(replayed.len(), 1);
777        assert_eq!(replayed[0].id, 200);
778    }
779}