arcly-http 0.3.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! In-process async event bus — **actually** lock-free on the emit path.
//!
//! Listeners live in an `ArcSwap` snapshot (the same proven pattern as
//! tenants / secrets / masking / dynamic routes): `subscribe` is a
//! clone-and-swap on the control plane, `emit` is ONE atomic pointer load
//! plus a frozen `HashMap` probe. The previous implementation used
//! `RwLock` reads per emit while *documenting itself as lock-free* — the
//! kind of drift this framework's invariants exist to prevent.

use std::collections::HashMap;
use std::sync::Arc;

use arc_swap::ArcSwap;
use futures::future::{join_all, BoxFuture};

pub trait Listener: Send + Sync + 'static {
    fn on_event(&self, payload: &serde_json::Value) -> BoxFuture<'static, ()>;
}

type Listeners = HashMap<&'static str, Vec<Arc<dyn Listener>>>;

#[derive(Default, Clone)]
pub struct EventBus {
    inner: Arc<ArcSwap<Listeners>>,
}

impl EventBus {
    pub fn new() -> Self {
        Self::default()
    }

    /// Register a listener. Control-plane operation (boot / plugin init):
    /// clones the listener table and swaps the snapshot — emitters in
    /// flight keep their consistent view, the next emit sees the new one.
    pub fn subscribe(&self, event: &'static str, listener: Arc<dyn Listener>) {
        self.inner.rcu(|cur| {
            let mut next: Listeners = (**cur).clone();
            next.entry(event).or_default().push(Arc::clone(&listener));
            next
        });
    }

    /// Fan an event out to every subscribed listener, concurrently.
    /// Hot-path cost: one atomic load + one hash probe. No locks.
    pub async fn emit(&self, event: &'static str, payload: serde_json::Value) {
        let snapshot = self.inner.load();
        let Some(listeners) = snapshot.get(event) else {
            return;
        };
        let futs = listeners.iter().map(|l| l.on_event(&payload));
        join_all(futs).await;
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicU32, Ordering};

    struct Count(&'static AtomicU32);
    impl Listener for Count {
        fn on_event(&self, _p: &serde_json::Value) -> BoxFuture<'static, ()> {
            self.0.fetch_add(1, Ordering::Relaxed);
            Box::pin(async {})
        }
    }

    #[tokio::test]
    async fn emit_reaches_all_subscribers_and_ignores_unknown_events() {
        static HITS: AtomicU32 = AtomicU32::new(0);
        let bus = EventBus::new();
        bus.subscribe("user.created", Arc::new(Count(&HITS)));
        bus.subscribe("user.created", Arc::new(Count(&HITS)));

        bus.emit("user.created", serde_json::json!({"id": 1})).await;
        assert_eq!(HITS.load(Ordering::Relaxed), 2);

        bus.emit("other.event", serde_json::Value::Null).await; // no-op
        assert_eq!(HITS.load(Ordering::Relaxed), 2);

        // Late subscriber sees subsequent emits (snapshot swap works).
        bus.subscribe("user.created", Arc::new(Count(&HITS)));
        bus.emit("user.created", serde_json::Value::Null).await;
        assert_eq!(HITS.load(Ordering::Relaxed), 5);
    }
}