use std::collections::{HashSet, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use chrono::Utc;
use dashmap::DashMap;
use parking_lot::Mutex;
use serde_json::Value;
use tokio::sync::mpsc;
use tracing::warn;
use crate::message::HubEvent;
use crate::topic::TopicMatcher;
#[derive(Debug, Clone)]
pub struct HubConfig {
pub max_connections: usize,
pub heartbeat_interval_ms: u64,
pub replay_buffer_size: usize,
}
impl Default for HubConfig {
fn default() -> Self {
Self {
max_connections: 10_000,
heartbeat_interval_ms: 30_000,
replay_buffer_size: 1_000,
}
}
}
#[derive(Debug)]
pub struct Subscriber {
pub id: u64,
pub topics: Vec<String>,
pub sender: mpsc::Sender<HubEvent>,
pub created_at: chrono::DateTime<Utc>,
}
pub struct HubStats {
pub total_published: AtomicU64,
pub total_delivered: AtomicU64,
pub active_connections: AtomicU64,
}
impl Default for HubStats {
fn default() -> Self {
Self {
total_published: AtomicU64::new(0),
total_delivered: AtomicU64::new(0),
active_connections: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct HubStatsSnapshot {
pub active_connections: u64,
pub total_published: u64,
pub total_delivered: u64,
pub topic_count: usize,
pub subscriber_count: usize,
pub uptime_secs: f64,
}
pub struct BextHub {
subscribers: DashMap<u64, Subscriber>,
topics: DashMap<String, Vec<u64>>,
next_id: AtomicU64,
next_subscriber_id: AtomicU64,
replay_buffer: Mutex<VecDeque<HubEvent>>,
stats: HubStats,
config: HubConfig,
created_at: chrono::DateTime<Utc>,
}
impl BextHub {
pub fn new(config: HubConfig) -> Self {
Self {
subscribers: DashMap::new(),
topics: DashMap::new(),
next_id: AtomicU64::new(1),
next_subscriber_id: AtomicU64::new(1),
replay_buffer: Mutex::new(VecDeque::with_capacity(config.replay_buffer_size)),
stats: HubStats::default(),
config,
created_at: Utc::now(),
}
}
pub fn subscribe(
&self,
topics: Vec<String>,
) -> Option<(u64, mpsc::Receiver<HubEvent>)> {
if self.config.max_connections > 0 {
let current = self.stats.active_connections.load(Ordering::Relaxed);
if current >= self.config.max_connections as u64 {
warn!(
limit = self.config.max_connections,
"hub: max connections reached"
);
return None;
}
}
let id = self.next_subscriber_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(256);
let subscriber = Subscriber {
id,
topics: topics.clone(),
sender: tx,
created_at: Utc::now(),
};
self.subscribers.insert(id, subscriber);
for topic in &topics {
self.topics.entry(topic.clone()).or_default().push(id);
}
self.stats
.active_connections
.fetch_add(1, Ordering::Relaxed);
Some((id, rx))
}
pub fn unsubscribe(&self, subscriber_id: u64) {
if let Some((_, subscriber)) = self.subscribers.remove(&subscriber_id) {
for topic in &subscriber.topics {
if let Some(mut subs) = self.topics.get_mut(topic) {
subs.retain(|&id| id != subscriber_id);
if subs.is_empty() {
drop(subs);
self.topics.remove(topic);
}
}
}
self.stats
.active_connections
.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn add_topics(&self, subscriber_id: u64, topics: Vec<String>) {
if let Some(mut sub) = self.subscribers.get_mut(&subscriber_id) {
for topic in &topics {
if !sub.topics.contains(topic) {
sub.topics.push(topic.clone());
self.topics
.entry(topic.clone())
.or_default()
.push(subscriber_id);
}
}
}
}
pub fn remove_topics(&self, subscriber_id: u64, topics: Vec<String>) {
if let Some(mut sub) = self.subscribers.get_mut(&subscriber_id) {
for topic in &topics {
sub.topics.retain(|t| t != topic);
if let Some(mut subs) = self.topics.get_mut(topic) {
subs.retain(|&id| id != subscriber_id);
if subs.is_empty() {
drop(subs);
self.topics.remove(topic);
}
}
}
}
}
pub fn publish(&self, topic: &str, data: Value) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let event = HubEvent {
id,
topic: topic.to_string(),
data,
timestamp: Utc::now(),
};
{
let mut buf = self.replay_buffer.lock();
if buf.len() >= self.config.replay_buffer_size {
buf.pop_front();
}
buf.push_back(event.clone());
}
self.stats.total_published.fetch_add(1, Ordering::Relaxed);
let mut delivered_to: HashSet<u64> = HashSet::new();
let mut dead_subscribers: Vec<u64> = Vec::new();
for entry in self.topics.iter() {
let pattern = entry.key();
if TopicMatcher::matches(pattern, topic) {
for &sub_id in entry.value() {
if delivered_to.contains(&sub_id) {
continue; }
if let Some(sub) = self.subscribers.get(&sub_id) {
match sub.sender.try_send(event.clone()) {
Ok(()) => {
self.stats.total_delivered.fetch_add(1, Ordering::Relaxed);
delivered_to.insert(sub_id);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
dead_subscribers.push(sub_id);
}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!(subscriber_id = sub_id, "dropping slow subscriber (channel full)");
dead_subscribers.push(sub_id);
}
}
}
}
}
}
for dead_id in dead_subscribers {
self.remove_subscriber_from_topics(dead_id);
self.subscribers.remove(&dead_id);
self.stats
.active_connections
.fetch_sub(1, Ordering::Relaxed);
}
}
pub fn publish_event(&self, event: HubEvent) {
{
let mut buf = self.replay_buffer.lock();
if buf.len() >= self.config.replay_buffer_size {
buf.pop_front();
}
buf.push_back(event.clone());
}
self.stats.total_published.fetch_add(1, Ordering::Relaxed);
let mut delivered_to: HashSet<u64> = HashSet::new();
let mut dead_subscribers: Vec<u64> = Vec::new();
for entry in self.topics.iter() {
let pattern = entry.key();
if TopicMatcher::matches(pattern, &event.topic) {
for &sub_id in entry.value() {
if delivered_to.contains(&sub_id) {
continue;
}
if let Some(sub) = self.subscribers.get(&sub_id) {
match sub.sender.try_send(event.clone()) {
Ok(()) => {
self.stats.total_delivered.fetch_add(1, Ordering::Relaxed);
delivered_to.insert(sub_id);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
dead_subscribers.push(sub_id);
}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!(subscriber_id = sub_id, "dropping slow subscriber (channel full)");
dead_subscribers.push(sub_id);
}
}
}
}
}
}
for dead_id in dead_subscribers {
self.remove_subscriber_from_topics(dead_id);
self.subscribers.remove(&dead_id);
self.stats
.active_connections
.fetch_sub(1, Ordering::Relaxed);
}
}
fn remove_subscriber_from_topics(&self, subscriber_id: u64) {
if let Some(sub) = self.subscribers.get(&subscriber_id) {
for topic in &sub.topics {
if let Some(mut subs) = self.topics.get_mut(topic) {
subs.retain(|&id| id != subscriber_id);
if subs.is_empty() {
drop(subs);
self.topics.remove(topic);
}
}
}
}
}
pub fn replay_since(&self, last_event_id: u64) -> Vec<HubEvent> {
let buf = self.replay_buffer.lock();
buf.iter()
.filter(|e| e.id > last_event_id)
.cloned()
.collect()
}
pub fn subscriber_count(&self) -> usize {
self.subscribers.len()
}
pub fn topic_count(&self) -> usize {
self.topics.len()
}
pub fn stats(&self) -> HubStatsSnapshot {
let uptime = Utc::now()
.signed_duration_since(self.created_at)
.num_milliseconds() as f64
/ 1_000.0;
HubStatsSnapshot {
active_connections: self.stats.active_connections.load(Ordering::Relaxed),
total_published: self.stats.total_published.load(Ordering::Relaxed),
total_delivered: self.stats.total_delivered.load(Ordering::Relaxed),
topic_count: self.topics.len(),
subscriber_count: self.subscribers.len(),
uptime_secs: uptime,
}
}
pub fn config(&self) -> &HubConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::sync::Arc;
fn default_hub() -> BextHub {
BextHub::new(HubConfig::default())
}
#[test]
fn subscribe_returns_id_and_receiver() {
let hub = default_hub();
let result = hub.subscribe(vec!["test".to_string()]);
assert!(result.is_some());
let (id, _rx) = result.unwrap();
assert!(id > 0);
}
#[test]
fn subscribe_increments_active_connections() {
let hub = default_hub();
assert_eq!(hub.subscriber_count(), 0);
hub.subscribe(vec!["a".to_string()]);
assert_eq!(hub.subscriber_count(), 1);
hub.subscribe(vec!["b".to_string()]);
assert_eq!(hub.subscriber_count(), 2);
}
#[test]
fn unsubscribe_decrements_active_connections() {
let hub = default_hub();
let (id, _rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
assert_eq!(hub.subscriber_count(), 1);
hub.unsubscribe(id);
assert_eq!(hub.subscriber_count(), 0);
}
#[test]
fn unsubscribe_nonexistent_is_noop() {
let hub = default_hub();
hub.unsubscribe(999); }
#[test]
fn unsubscribe_cleans_up_topic_entries() {
let hub = default_hub();
let (id, _rx) = hub.subscribe(vec!["topic/a".to_string()]).unwrap();
assert_eq!(hub.topic_count(), 1);
hub.unsubscribe(id);
assert_eq!(hub.topic_count(), 0);
}
#[test]
fn max_connections_enforced() {
let hub = BextHub::new(HubConfig {
max_connections: 2,
..Default::default()
});
let _s1 = hub.subscribe(vec!["a".to_string()]).unwrap();
let _s2 = hub.subscribe(vec!["b".to_string()]).unwrap();
let s3 = hub.subscribe(vec!["c".to_string()]);
assert!(s3.is_none());
}
#[test]
fn max_connections_zero_means_unlimited() {
let hub = BextHub::new(HubConfig {
max_connections: 0,
..Default::default()
});
for i in 0..100 {
let r = hub.subscribe(vec![format!("t/{}", i)]);
assert!(r.is_some());
}
}
#[tokio::test]
async fn publish_delivers_to_exact_subscriber() {
let hub = default_hub();
let (_id, mut rx) = hub.subscribe(vec!["deploy".to_string()]).unwrap();
hub.publish("deploy", json!({"v": 1}));
let evt = rx.recv().await.unwrap();
assert_eq!(evt.topic, "deploy");
assert_eq!(evt.data, json!({"v": 1}));
}
#[tokio::test]
async fn publish_does_not_deliver_non_matching() {
let hub = default_hub();
let (_id, mut rx) = hub.subscribe(vec!["deploy".to_string()]).unwrap();
hub.publish("restart", json!({"v": 1}));
let result = rx.try_recv();
assert!(result.is_err());
}
#[tokio::test]
async fn publish_with_wildcard_subscriber() {
let hub = default_hub();
let (_id, mut rx) = hub.subscribe(vec!["app/*".to_string()]).unwrap();
hub.publish("app/marketing", json!({"action": "send"}));
let evt = rx.recv().await.unwrap();
assert_eq!(evt.topic, "app/marketing");
}
#[tokio::test]
async fn publish_with_multi_wildcard_subscriber() {
let hub = default_hub();
let (_id, mut rx) = hub.subscribe(vec!["app/#".to_string()]).unwrap();
hub.publish("app/marketing/events/click", json!({}));
let evt = rx.recv().await.unwrap();
assert_eq!(evt.topic, "app/marketing/events/click");
}
#[tokio::test]
async fn publish_to_multiple_subscribers() {
let hub = default_hub();
let (_id1, mut rx1) = hub.subscribe(vec!["events".to_string()]).unwrap();
let (_id2, mut rx2) = hub.subscribe(vec!["events".to_string()]).unwrap();
hub.publish("events", json!({"n": 1}));
let e1 = rx1.recv().await.unwrap();
let e2 = rx2.recv().await.unwrap();
assert_eq!(e1.data, json!({"n": 1}));
assert_eq!(e2.data, json!({"n": 1}));
}
#[tokio::test]
async fn publish_no_duplicate_delivery_from_overlapping_patterns() {
let hub = default_hub();
let (_id, mut rx) = hub
.subscribe(vec!["app/deploy".to_string(), "app/#".to_string()])
.unwrap();
hub.publish("app/deploy", json!({"v": 1}));
let evt = rx.recv().await.unwrap();
assert_eq!(evt.topic, "app/deploy");
let result = rx.try_recv();
assert!(result.is_err());
}
#[tokio::test]
async fn add_topics_enables_new_subscriptions() {
let hub = default_hub();
let (id, mut rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
hub.publish("b", json!(1));
assert!(rx.try_recv().is_err());
hub.add_topics(id, vec!["b".to_string()]);
hub.publish("b", json!(2));
let evt = rx.recv().await.unwrap();
assert_eq!(evt.data, json!(2));
}
#[tokio::test]
async fn remove_topics_disables_subscriptions() {
let hub = default_hub();
let (id, mut rx) = hub
.subscribe(vec!["a".to_string(), "b".to_string()])
.unwrap();
hub.publish("b", json!(1));
let _ = rx.recv().await.unwrap();
hub.remove_topics(id, vec!["b".to_string()]);
hub.publish("b", json!(2));
assert!(rx.try_recv().is_err());
hub.publish("a", json!(3));
let evt = rx.recv().await.unwrap();
assert_eq!(evt.data, json!(3));
}
#[test]
fn add_topics_deduplicates() {
let hub = default_hub();
let (id, _rx) = hub.subscribe(vec!["a".to_string()]).unwrap();
hub.add_topics(id, vec!["a".to_string()]);
let sub = hub.subscribers.get(&id).unwrap();
assert_eq!(sub.topics.iter().filter(|t| *t == "a").count(), 1);
}
#[test]
fn replay_returns_events_after_id() {
let hub = default_hub();
hub.publish("a", json!(1));
hub.publish("a", json!(2));
hub.publish("a", json!(3));
let replayed = hub.replay_since(1);
assert_eq!(replayed.len(), 2);
assert_eq!(replayed[0].data, json!(2));
assert_eq!(replayed[1].data, json!(3));
}
#[test]
fn replay_since_zero_returns_all() {
let hub = default_hub();
hub.publish("a", json!(1));
hub.publish("a", json!(2));
let replayed = hub.replay_since(0);
assert_eq!(replayed.len(), 2);
}
#[test]
fn replay_since_future_id_returns_empty() {
let hub = default_hub();
hub.publish("a", json!(1));
let replayed = hub.replay_since(999);
assert!(replayed.is_empty());
}
#[test]
fn replay_buffer_wraps_around() {
let hub = BextHub::new(HubConfig {
replay_buffer_size: 3,
..Default::default()
});
hub.publish("a", json!(1)); hub.publish("a", json!(2)); hub.publish("a", json!(3)); hub.publish("a", json!(4));
let replayed = hub.replay_since(0);
assert_eq!(replayed.len(), 3);
assert_eq!(replayed[0].data, json!(2));
assert_eq!(replayed[2].data, json!(4));
}
#[tokio::test]
async fn stats_track_published_and_delivered() {
let hub = default_hub();
let (_id, mut rx) = hub.subscribe(vec!["x".to_string()]).unwrap();
hub.publish("x", json!(1));
hub.publish("x", json!(2));
let _ = rx.recv().await;
let _ = rx.recv().await;
let s = hub.stats();
assert_eq!(s.total_published, 2);
assert_eq!(s.total_delivered, 2);
assert_eq!(s.active_connections, 1);
assert_eq!(s.subscriber_count, 1);
assert!(s.uptime_secs >= 0.0);
}
#[test]
fn stats_topic_count() {
let hub = default_hub();
hub.subscribe(vec!["a".to_string(), "b".to_string()]);
assert_eq!(hub.stats().topic_count, 2);
}
#[tokio::test]
async fn concurrent_publish_subscribe() {
let hub = Arc::new(default_hub());
let mut handles = Vec::new();
let mut receivers = Vec::new();
for _ in 0..10 {
let (_id, rx) = hub.subscribe(vec!["concurrent".to_string()]).unwrap();
receivers.push(rx);
}
for i in 0..10 {
let hub_clone = Arc::clone(&hub);
handles.push(tokio::spawn(async move {
hub_clone.publish("concurrent", json!(i));
}));
}
for h in handles {
h.await.unwrap();
}
for rx in &mut receivers {
let mut count = 0;
while rx.try_recv().is_ok() {
count += 1;
}
assert_eq!(count, 10);
}
}
#[tokio::test]
async fn concurrent_subscribe_unsubscribe() {
let hub = Arc::new(default_hub());
let mut handles = Vec::new();
for _ in 0..50 {
let hub_clone = Arc::clone(&hub);
handles.push(tokio::spawn(async move {
let (id, _rx) = hub_clone.subscribe(vec!["t".to_string()]).unwrap();
hub_clone.unsubscribe(id);
}));
}
for h in handles {
h.await.unwrap();
}
assert_eq!(hub.subscriber_count(), 0);
}
#[tokio::test]
async fn publish_event_delivers_to_subscribers() {
let hub = default_hub();
let (_id, mut rx) = hub.subscribe(vec!["relay".to_string()]).unwrap();
let event = HubEvent {
id: 100,
topic: "relay".to_string(),
data: json!({"from": "remote"}),
timestamp: Utc::now(),
};
hub.publish_event(event.clone());
let evt = rx.recv().await.unwrap();
assert_eq!(evt.id, 100);
assert_eq!(evt.data, json!({"from": "remote"}));
}
#[test]
fn publish_event_stored_in_replay() {
let hub = default_hub();
let event = HubEvent {
id: 200,
topic: "relay".to_string(),
data: json!("test"),
timestamp: Utc::now(),
};
hub.publish_event(event);
let replayed = hub.replay_since(199);
assert_eq!(replayed.len(), 1);
assert_eq!(replayed[0].id, 200);
}
}