1use std::collections::{HashSet, VecDeque};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use chrono::Utc;
8use dashmap::DashMap;
9use parking_lot::Mutex;
10use serde_json::Value;
11use tokio::sync::mpsc;
12use tracing::warn;
13
14use crate::message::HubEvent;
15use crate::topic::TopicMatcher;
16
17#[derive(Debug, Clone)]
19pub struct HubConfig {
20 pub max_connections: usize,
22 pub heartbeat_interval_ms: u64,
24 pub replay_buffer_size: usize,
26}
27
28impl Default for HubConfig {
29 fn default() -> Self {
30 Self {
31 max_connections: 10_000,
32 heartbeat_interval_ms: 30_000,
33 replay_buffer_size: 1_000,
34 }
35 }
36}
37
38#[derive(Debug)]
40pub struct Subscriber {
41 pub id: u64,
42 pub topics: Vec<String>,
44 pub sender: mpsc::Sender<HubEvent>,
47 pub created_at: chrono::DateTime<Utc>,
49}
50
51pub struct HubStats {
53 pub total_published: AtomicU64,
54 pub total_delivered: AtomicU64,
55 pub active_connections: AtomicU64,
56}
57
58impl Default for HubStats {
59 fn default() -> Self {
60 Self {
61 total_published: AtomicU64::new(0),
62 total_delivered: AtomicU64::new(0),
63 active_connections: AtomicU64::new(0),
64 }
65 }
66}
67
68#[derive(Debug, Clone, serde::Serialize)]
70pub struct HubStatsSnapshot {
71 pub active_connections: u64,
72 pub total_published: u64,
73 pub total_delivered: u64,
74 pub topic_count: usize,
75 pub subscriber_count: usize,
76 pub uptime_secs: f64,
77}
78
79pub struct BextHub {
81 subscribers: DashMap<u64, Subscriber>,
83 topics: DashMap<String, Vec<u64>>,
85 next_id: AtomicU64,
87 next_subscriber_id: AtomicU64,
89 replay_buffer: Mutex<VecDeque<HubEvent>>,
91 stats: HubStats,
93 config: HubConfig,
95 created_at: chrono::DateTime<Utc>,
97}
98
99impl BextHub {
100 pub fn new(config: HubConfig) -> Self {
102 Self {
103 subscribers: DashMap::new(),
104 topics: DashMap::new(),
105 next_id: AtomicU64::new(1),
106 next_subscriber_id: AtomicU64::new(1),
107 replay_buffer: Mutex::new(VecDeque::with_capacity(config.replay_buffer_size)),
108 stats: HubStats::default(),
109 config,
110 created_at: Utc::now(),
111 }
112 }
113
114 pub fn subscribe(
121 &self,
122 topics: Vec<String>,
123 ) -> Option<(u64, mpsc::Receiver<HubEvent>)> {
124 if self.config.max_connections > 0 {
126 let current = self.stats.active_connections.load(Ordering::Relaxed);
127 if current >= self.config.max_connections as u64 {
128 warn!(
129 limit = self.config.max_connections,
130 "hub: max connections reached"
131 );
132 return None;
133 }
134 }
135
136 let id = self.next_subscriber_id.fetch_add(1, Ordering::Relaxed);
137 let (tx, rx) = mpsc::channel(256);
142
143 let subscriber = Subscriber {
144 id,
145 topics: topics.clone(),
146 sender: tx,
147 created_at: Utc::now(),
148 };
149 self.subscribers.insert(id, subscriber);
150
151 for topic in &topics {
153 self.topics.entry(topic.clone()).or_default().push(id);
154 }
155
156 self.stats
157 .active_connections
158 .fetch_add(1, Ordering::Relaxed);
159 Some((id, rx))
160 }
161
162 pub fn unsubscribe(&self, subscriber_id: u64) {
164 if let Some((_, subscriber)) = self.subscribers.remove(&subscriber_id) {
165 for topic in &subscriber.topics {
166 if let Some(mut subs) = self.topics.get_mut(topic) {
167 subs.retain(|&id| id != subscriber_id);
168 if subs.is_empty() {
170 drop(subs);
171 self.topics.remove(topic);
172 }
173 }
174 }
175 self.stats
176 .active_connections
177 .fetch_sub(1, Ordering::Relaxed);
178 }
179 }
180
181 pub fn add_topics(&self, subscriber_id: u64, topics: Vec<String>) {
183 if let Some(mut sub) = self.subscribers.get_mut(&subscriber_id) {
184 for topic in &topics {
185 if !sub.topics.contains(topic) {
186 sub.topics.push(topic.clone());
187 self.topics
188 .entry(topic.clone())
189 .or_default()
190 .push(subscriber_id);
191 }
192 }
193 }
194 }
195
196 pub fn remove_topics(&self, subscriber_id: u64, topics: Vec<String>) {
198 if let Some(mut sub) = self.subscribers.get_mut(&subscriber_id) {
199 for topic in &topics {
200 sub.topics.retain(|t| t != topic);
201 if let Some(mut subs) = self.topics.get_mut(topic) {
202 subs.retain(|&id| id != subscriber_id);
203 if subs.is_empty() {
204 drop(subs);
205 self.topics.remove(topic);
206 }
207 }
208 }
209 }
210 }
211
212 pub fn publish(&self, topic: &str, data: Value) {
214 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
215 let event = HubEvent {
216 id,
217 topic: topic.to_string(),
218 data,
219 timestamp: Utc::now(),
220 };
221
222 {
224 let mut buf = self.replay_buffer.lock();
225 if buf.len() >= self.config.replay_buffer_size {
226 buf.pop_front();
227 }
228 buf.push_back(event.clone());
229 }
230
231 self.stats.total_published.fetch_add(1, Ordering::Relaxed);
232
233 let mut delivered_to: HashSet<u64> = HashSet::new();
237 let mut dead_subscribers: Vec<u64> = Vec::new();
238
239 for entry in self.topics.iter() {
240 let pattern = entry.key();
241 if TopicMatcher::matches(pattern, topic) {
242 for &sub_id in entry.value() {
243 if delivered_to.contains(&sub_id) {
244 continue; }
246 if let Some(sub) = self.subscribers.get(&sub_id) {
247 match sub.sender.try_send(event.clone()) {
248 Ok(()) => {
249 self.stats.total_delivered.fetch_add(1, Ordering::Relaxed);
250 delivered_to.insert(sub_id);
251 }
252 Err(mpsc::error::TrySendError::Closed(_)) => {
253 dead_subscribers.push(sub_id);
254 }
255 Err(mpsc::error::TrySendError::Full(_)) => {
256 warn!(subscriber_id = sub_id, "dropping slow subscriber (channel full)");
257 dead_subscribers.push(sub_id);
258 }
259 }
260 }
261 }
262 }
263 }
264
265 for dead_id in dead_subscribers {
267 self.remove_subscriber_from_topics(dead_id);
268 self.subscribers.remove(&dead_id);
269 self.stats
270 .active_connections
271 .fetch_sub(1, Ordering::Relaxed);
272 }
273 }
274
275 pub fn publish_event(&self, event: HubEvent) {
277 {
279 let mut buf = self.replay_buffer.lock();
280 if buf.len() >= self.config.replay_buffer_size {
281 buf.pop_front();
282 }
283 buf.push_back(event.clone());
284 }
285
286 self.stats.total_published.fetch_add(1, Ordering::Relaxed);
287
288 let mut delivered_to: HashSet<u64> = HashSet::new();
289 let mut dead_subscribers: Vec<u64> = Vec::new();
290
291 for entry in self.topics.iter() {
292 let pattern = entry.key();
293 if TopicMatcher::matches(pattern, &event.topic) {
294 for &sub_id in entry.value() {
295 if delivered_to.contains(&sub_id) {
296 continue;
297 }
298 if let Some(sub) = self.subscribers.get(&sub_id) {
299 match sub.sender.try_send(event.clone()) {
300 Ok(()) => {
301 self.stats.total_delivered.fetch_add(1, Ordering::Relaxed);
302 delivered_to.insert(sub_id);
303 }
304 Err(mpsc::error::TrySendError::Closed(_)) => {
305 dead_subscribers.push(sub_id);
306 }
307 Err(mpsc::error::TrySendError::Full(_)) => {
308 warn!(subscriber_id = sub_id, "dropping slow subscriber (channel full)");
309 dead_subscribers.push(sub_id);
310 }
311 }
312 }
313 }
314 }
315 }
316
317 for dead_id in dead_subscribers {
319 self.remove_subscriber_from_topics(dead_id);
320 self.subscribers.remove(&dead_id);
321 self.stats
322 .active_connections
323 .fetch_sub(1, Ordering::Relaxed);
324 }
325 }
326
327 fn remove_subscriber_from_topics(&self, subscriber_id: u64) {
333 if let Some(sub) = self.subscribers.get(&subscriber_id) {
334 for topic in &sub.topics {
335 if let Some(mut subs) = self.topics.get_mut(topic) {
336 subs.retain(|&id| id != subscriber_id);
337 if subs.is_empty() {
338 drop(subs);
339 self.topics.remove(topic);
340 }
341 }
342 }
343 }
344 }
345
346 pub fn replay_since(&self, last_event_id: u64) -> Vec<HubEvent> {
350 let buf = self.replay_buffer.lock();
351 buf.iter()
352 .filter(|e| e.id > last_event_id)
353 .cloned()
354 .collect()
355 }
356
357 pub fn subscriber_count(&self) -> usize {
359 self.subscribers.len()
360 }
361
362 pub fn topic_count(&self) -> usize {
364 self.topics.len()
365 }
366
367 pub fn stats(&self) -> HubStatsSnapshot {
369 let uptime = Utc::now()
370 .signed_duration_since(self.created_at)
371 .num_milliseconds() as f64
372 / 1_000.0;
373
374 HubStatsSnapshot {
375 active_connections: self.stats.active_connections.load(Ordering::Relaxed),
376 total_published: self.stats.total_published.load(Ordering::Relaxed),
377 total_delivered: self.stats.total_delivered.load(Ordering::Relaxed),
378 topic_count: self.topics.len(),
379 subscriber_count: self.subscribers.len(),
380 uptime_secs: uptime,
381 }
382 }
383
384 pub fn config(&self) -> &HubConfig {
386 &self.config
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use serde_json::json;
394 use std::sync::Arc;
395
396 fn default_hub() -> BextHub {
397 BextHub::new(HubConfig::default())
398 }
399
400 #[test]
403 fn subscribe_returns_id_and_receiver() {
404 let hub = default_hub();
405 let result = hub.subscribe(vec!["test".to_string()]);
406 assert!(result.is_some());
407 let (id, _rx) = result.unwrap();
408 assert!(id > 0);
409 }
410
411 #[test]
412 fn subscribe_increments_active_connections() {
413 let hub = default_hub();
414 assert_eq!(hub.subscriber_count(), 0);
415
416 hub.subscribe(vec!["a".to_string()]);
417 assert_eq!(hub.subscriber_count(), 1);
418
419 hub.subscribe(vec!["b".to_string()]);
420 assert_eq!(hub.subscriber_count(), 2);
421 }
422
423 #[test]
424 fn unsubscribe_decrements_active_connections() {
425 let hub = default_hub();
426 let (id, _rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
427 assert_eq!(hub.subscriber_count(), 1);
428
429 hub.unsubscribe(id);
430 assert_eq!(hub.subscriber_count(), 0);
431 }
432
433 #[test]
434 fn unsubscribe_nonexistent_is_noop() {
435 let hub = default_hub();
436 hub.unsubscribe(999); }
438
439 #[test]
440 fn unsubscribe_cleans_up_topic_entries() {
441 let hub = default_hub();
442 let (id, _rx) = hub.subscribe(vec!["topic/a".to_string()]).unwrap();
443 assert_eq!(hub.topic_count(), 1);
444
445 hub.unsubscribe(id);
446 assert_eq!(hub.topic_count(), 0);
447 }
448
449 #[test]
452 fn max_connections_enforced() {
453 let hub = BextHub::new(HubConfig {
454 max_connections: 2,
455 ..Default::default()
456 });
457
458 let _s1 = hub.subscribe(vec!["a".to_string()]).unwrap();
459 let _s2 = hub.subscribe(vec!["b".to_string()]).unwrap();
460 let s3 = hub.subscribe(vec!["c".to_string()]);
461 assert!(s3.is_none());
462 }
463
464 #[test]
465 fn max_connections_zero_means_unlimited() {
466 let hub = BextHub::new(HubConfig {
467 max_connections: 0,
468 ..Default::default()
469 });
470
471 for i in 0..100 {
472 let r = hub.subscribe(vec![format!("t/{}", i)]);
473 assert!(r.is_some());
474 }
475 }
476
477 #[tokio::test]
480 async fn publish_delivers_to_exact_subscriber() {
481 let hub = default_hub();
482 let (_id, mut rx) = hub.subscribe(vec!["deploy".to_string()]).unwrap();
483
484 hub.publish("deploy", json!({"v": 1}));
485
486 let evt = rx.recv().await.unwrap();
487 assert_eq!(evt.topic, "deploy");
488 assert_eq!(evt.data, json!({"v": 1}));
489 }
490
491 #[tokio::test]
492 async fn publish_does_not_deliver_non_matching() {
493 let hub = default_hub();
494 let (_id, mut rx) = hub.subscribe(vec!["deploy".to_string()]).unwrap();
495
496 hub.publish("restart", json!({"v": 1}));
497
498 let result = rx.try_recv();
500 assert!(result.is_err());
501 }
502
503 #[tokio::test]
504 async fn publish_with_wildcard_subscriber() {
505 let hub = default_hub();
506 let (_id, mut rx) = hub.subscribe(vec!["app/*".to_string()]).unwrap();
507
508 hub.publish("app/marketing", json!({"action": "send"}));
509
510 let evt = rx.recv().await.unwrap();
511 assert_eq!(evt.topic, "app/marketing");
512 }
513
514 #[tokio::test]
515 async fn publish_with_multi_wildcard_subscriber() {
516 let hub = default_hub();
517 let (_id, mut rx) = hub.subscribe(vec!["app/#".to_string()]).unwrap();
518
519 hub.publish("app/marketing/events/click", json!({}));
520
521 let evt = rx.recv().await.unwrap();
522 assert_eq!(evt.topic, "app/marketing/events/click");
523 }
524
525 #[tokio::test]
526 async fn publish_to_multiple_subscribers() {
527 let hub = default_hub();
528 let (_id1, mut rx1) = hub.subscribe(vec!["events".to_string()]).unwrap();
529 let (_id2, mut rx2) = hub.subscribe(vec!["events".to_string()]).unwrap();
530
531 hub.publish("events", json!({"n": 1}));
532
533 let e1 = rx1.recv().await.unwrap();
534 let e2 = rx2.recv().await.unwrap();
535 assert_eq!(e1.data, json!({"n": 1}));
536 assert_eq!(e2.data, json!({"n": 1}));
537 }
538
539 #[tokio::test]
540 async fn publish_no_duplicate_delivery_from_overlapping_patterns() {
541 let hub = default_hub();
542 let (_id, mut rx) = hub
544 .subscribe(vec!["app/deploy".to_string(), "app/#".to_string()])
545 .unwrap();
546
547 hub.publish("app/deploy", json!({"v": 1}));
548
549 let evt = rx.recv().await.unwrap();
550 assert_eq!(evt.topic, "app/deploy");
551
552 let result = rx.try_recv();
554 assert!(result.is_err());
555 }
556
557 #[tokio::test]
560 async fn add_topics_enables_new_subscriptions() {
561 let hub = default_hub();
562 let (id, mut rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
563
564 hub.publish("b", json!(1));
566 assert!(rx.try_recv().is_err());
567
568 hub.add_topics(id, vec!["b".to_string()]);
570 hub.publish("b", json!(2));
571 let evt = rx.recv().await.unwrap();
572 assert_eq!(evt.data, json!(2));
573 }
574
575 #[tokio::test]
576 async fn remove_topics_disables_subscriptions() {
577 let hub = default_hub();
578 let (id, mut rx) = hub
579 .subscribe(vec!["a".to_string(), "b".to_string()])
580 .unwrap();
581
582 hub.publish("b", json!(1));
584 let _ = rx.recv().await.unwrap();
585
586 hub.remove_topics(id, vec!["b".to_string()]);
588 hub.publish("b", json!(2));
589 assert!(rx.try_recv().is_err());
590
591 hub.publish("a", json!(3));
593 let evt = rx.recv().await.unwrap();
594 assert_eq!(evt.data, json!(3));
595 }
596
597 #[test]
598 fn add_topics_deduplicates() {
599 let hub = default_hub();
600 let (id, _rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
601 hub.add_topics(id, vec!["a".to_string()]);
602 let sub = hub.subscribers.get(&id).unwrap();
604 assert_eq!(sub.topics.iter().filter(|t| *t == "a").count(), 1);
605 }
606
607 #[test]
610 fn replay_returns_events_after_id() {
611 let hub = default_hub();
612 hub.publish("a", json!(1));
613 hub.publish("a", json!(2));
614 hub.publish("a", json!(3));
615
616 let replayed = hub.replay_since(1);
617 assert_eq!(replayed.len(), 2);
618 assert_eq!(replayed[0].data, json!(2));
619 assert_eq!(replayed[1].data, json!(3));
620 }
621
622 #[test]
623 fn replay_since_zero_returns_all() {
624 let hub = default_hub();
625 hub.publish("a", json!(1));
626 hub.publish("a", json!(2));
627
628 let replayed = hub.replay_since(0);
629 assert_eq!(replayed.len(), 2);
630 }
631
632 #[test]
633 fn replay_since_future_id_returns_empty() {
634 let hub = default_hub();
635 hub.publish("a", json!(1));
636
637 let replayed = hub.replay_since(999);
638 assert!(replayed.is_empty());
639 }
640
641 #[test]
642 fn replay_buffer_wraps_around() {
643 let hub = BextHub::new(HubConfig {
644 replay_buffer_size: 3,
645 ..Default::default()
646 });
647
648 hub.publish("a", json!(1)); hub.publish("a", json!(2)); hub.publish("a", json!(3)); hub.publish("a", json!(4)); let replayed = hub.replay_since(0);
654 assert_eq!(replayed.len(), 3);
655 assert_eq!(replayed[0].data, json!(2));
656 assert_eq!(replayed[2].data, json!(4));
657 }
658
659 #[tokio::test]
662 async fn stats_track_published_and_delivered() {
663 let hub = default_hub();
664 let (_id, mut rx) = hub.subscribe(vec!["x".to_string()]).unwrap();
665
666 hub.publish("x", json!(1));
667 hub.publish("x", json!(2));
668
669 let _ = rx.recv().await;
671 let _ = rx.recv().await;
672
673 let s = hub.stats();
674 assert_eq!(s.total_published, 2);
675 assert_eq!(s.total_delivered, 2);
676 assert_eq!(s.active_connections, 1);
677 assert_eq!(s.subscriber_count, 1);
678 assert!(s.uptime_secs >= 0.0);
679 }
680
681 #[test]
682 fn stats_topic_count() {
683 let hub = default_hub();
684 hub.subscribe(vec!["a".to_string(), "b".to_string()]);
685 assert_eq!(hub.stats().topic_count, 2);
686 }
687
688 #[tokio::test]
691 async fn concurrent_publish_subscribe() {
692 let hub = Arc::new(default_hub());
693 let mut handles = Vec::new();
694
695 let mut receivers = Vec::new();
697 for _ in 0..10 {
698 let (_id, rx) = hub.subscribe(vec!["concurrent".to_string()]).unwrap();
699 receivers.push(rx);
700 }
701
702 for i in 0..10 {
704 let hub_clone = Arc::clone(&hub);
705 handles.push(tokio::spawn(async move {
706 hub_clone.publish("concurrent", json!(i));
707 }));
708 }
709
710 for h in handles {
711 h.await.unwrap();
712 }
713
714 for rx in &mut receivers {
716 let mut count = 0;
717 while rx.try_recv().is_ok() {
718 count += 1;
719 }
720 assert_eq!(count, 10);
721 }
722 }
723
724 #[tokio::test]
725 async fn concurrent_subscribe_unsubscribe() {
726 let hub = Arc::new(default_hub());
727 let mut handles = Vec::new();
728
729 for _ in 0..50 {
730 let hub_clone = Arc::clone(&hub);
731 handles.push(tokio::spawn(async move {
732 let (id, _rx) = hub_clone.subscribe(vec!["t".to_string()]).unwrap();
733 hub_clone.unsubscribe(id);
734 }));
735 }
736
737 for h in handles {
738 h.await.unwrap();
739 }
740
741 assert_eq!(hub.subscriber_count(), 0);
742 }
743
744 #[tokio::test]
747 async fn publish_event_delivers_to_subscribers() {
748 let hub = default_hub();
749 let (_id, mut rx) = hub.subscribe(vec!["relay".to_string()]).unwrap();
750
751 let event = HubEvent {
752 id: 100,
753 topic: "relay".to_string(),
754 data: json!({"from": "remote"}),
755 timestamp: Utc::now(),
756 };
757 hub.publish_event(event.clone());
758
759 let evt = rx.recv().await.unwrap();
760 assert_eq!(evt.id, 100);
761 assert_eq!(evt.data, json!({"from": "remote"}));
762 }
763
764 #[test]
765 fn publish_event_stored_in_replay() {
766 let hub = default_hub();
767 let event = HubEvent {
768 id: 200,
769 topic: "relay".to_string(),
770 data: json!("test"),
771 timestamp: Utc::now(),
772 };
773 hub.publish_event(event);
774
775 let replayed = hub.replay_since(199);
776 assert_eq!(replayed.len(), 1);
777 assert_eq!(replayed[0].id, 200);
778 }
779}