Skip to main content

arcly_http/messaging/
event.rs

1//! In-process async event bus — **actually** lock-free on the emit path.
2//!
3//! Listeners live in an `ArcSwap` snapshot (the same proven pattern as
4//! tenants / secrets / masking / dynamic routes): `subscribe` is a
5//! clone-and-swap on the control plane, `emit` is ONE atomic pointer load
6//! plus a frozen `HashMap` probe. The previous implementation used
7//! `RwLock` reads per emit while *documenting itself as lock-free* — the
8//! kind of drift this framework's invariants exist to prevent.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use arc_swap::ArcSwap;
14use futures::future::{join_all, BoxFuture};
15
16pub trait Listener: Send + Sync + 'static {
17    fn on_event(&self, payload: &serde_json::Value) -> BoxFuture<'static, ()>;
18}
19
20type Listeners = HashMap<&'static str, Vec<Arc<dyn Listener>>>;
21
22#[derive(Default, Clone)]
23pub struct EventBus {
24    inner: Arc<ArcSwap<Listeners>>,
25}
26
27impl EventBus {
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    /// Register a listener. Control-plane operation (boot / plugin init):
33    /// clones the listener table and swaps the snapshot — emitters in
34    /// flight keep their consistent view, the next emit sees the new one.
35    pub fn subscribe(&self, event: &'static str, listener: Arc<dyn Listener>) {
36        self.inner.rcu(|cur| {
37            let mut next: Listeners = (**cur).clone();
38            next.entry(event).or_default().push(Arc::clone(&listener));
39            next
40        });
41    }
42
43    /// Fan an event out to every subscribed listener, concurrently.
44    /// Hot-path cost: one atomic load + one hash probe. No locks.
45    pub async fn emit(&self, event: &'static str, payload: serde_json::Value) {
46        let snapshot = self.inner.load();
47        let Some(listeners) = snapshot.get(event) else {
48            return;
49        };
50        let futs = listeners.iter().map(|l| l.on_event(&payload));
51        join_all(futs).await;
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use std::sync::atomic::{AtomicU32, Ordering};
59
60    struct Count(&'static AtomicU32);
61    impl Listener for Count {
62        fn on_event(&self, _p: &serde_json::Value) -> BoxFuture<'static, ()> {
63            self.0.fetch_add(1, Ordering::Relaxed);
64            Box::pin(async {})
65        }
66    }
67
68    #[tokio::test]
69    async fn emit_reaches_all_subscribers_and_ignores_unknown_events() {
70        static HITS: AtomicU32 = AtomicU32::new(0);
71        let bus = EventBus::new();
72        bus.subscribe("user.created", Arc::new(Count(&HITS)));
73        bus.subscribe("user.created", Arc::new(Count(&HITS)));
74
75        bus.emit("user.created", serde_json::json!({"id": 1})).await;
76        assert_eq!(HITS.load(Ordering::Relaxed), 2);
77
78        bus.emit("other.event", serde_json::Value::Null).await; // no-op
79        assert_eq!(HITS.load(Ordering::Relaxed), 2);
80
81        // Late subscriber sees subsequent emits (snapshot swap works).
82        bus.subscribe("user.created", Arc::new(Count(&HITS)));
83        bus.emit("user.created", serde_json::Value::Null).await;
84        assert_eq!(HITS.load(Ordering::Relaxed), 5);
85    }
86}