use std::collections::HashMap;
use std::sync::Arc;
use arc_swap::ArcSwap;
use futures::future::{join_all, BoxFuture};
pub trait Listener: Send + Sync + 'static {
fn on_event(&self, payload: &serde_json::Value) -> BoxFuture<'static, ()>;
}
type Listeners = HashMap<&'static str, Vec<Arc<dyn Listener>>>;
#[derive(Default, Clone)]
pub struct EventBus {
inner: Arc<ArcSwap<Listeners>>,
}
impl EventBus {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe(&self, event: &'static str, listener: Arc<dyn Listener>) {
self.inner.rcu(|cur| {
let mut next: Listeners = (**cur).clone();
next.entry(event).or_default().push(Arc::clone(&listener));
next
});
}
pub async fn emit(&self, event: &'static str, payload: serde_json::Value) {
let snapshot = self.inner.load();
let Some(listeners) = snapshot.get(event) else {
return;
};
let futs = listeners.iter().map(|l| l.on_event(&payload));
join_all(futs).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
struct Count(&'static AtomicU32);
impl Listener for Count {
fn on_event(&self, _p: &serde_json::Value) -> BoxFuture<'static, ()> {
self.0.fetch_add(1, Ordering::Relaxed);
Box::pin(async {})
}
}
#[tokio::test]
async fn emit_reaches_all_subscribers_and_ignores_unknown_events() {
static HITS: AtomicU32 = AtomicU32::new(0);
let bus = EventBus::new();
bus.subscribe("user.created", Arc::new(Count(&HITS)));
bus.subscribe("user.created", Arc::new(Count(&HITS)));
bus.emit("user.created", serde_json::json!({"id": 1})).await;
assert_eq!(HITS.load(Ordering::Relaxed), 2);
bus.emit("other.event", serde_json::Value::Null).await; assert_eq!(HITS.load(Ordering::Relaxed), 2);
bus.subscribe("user.created", Arc::new(Count(&HITS)));
bus.emit("user.created", serde_json::Value::Null).await;
assert_eq!(HITS.load(Ordering::Relaxed), 5);
}
}