arcly_http/messaging/
event.rs1use std::collections::HashMap;
11use std::sync::Arc;
12
13use arc_swap::ArcSwap;
14use futures::future::{join_all, BoxFuture};
15
16pub trait Listener: Send + Sync + 'static {
17 fn on_event(&self, payload: &serde_json::Value) -> BoxFuture<'static, ()>;
18}
19
20type Listeners = HashMap<&'static str, Vec<Arc<dyn Listener>>>;
21
22#[derive(Default, Clone)]
23pub struct EventBus {
24 inner: Arc<ArcSwap<Listeners>>,
25}
26
27impl EventBus {
28 pub fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn subscribe(&self, event: &'static str, listener: Arc<dyn Listener>) {
36 self.inner.rcu(|cur| {
37 let mut next: Listeners = (**cur).clone();
38 next.entry(event).or_default().push(Arc::clone(&listener));
39 next
40 });
41 }
42
43 pub async fn emit(&self, event: &'static str, payload: serde_json::Value) {
46 let snapshot = self.inner.load();
47 let Some(listeners) = snapshot.get(event) else {
48 return;
49 };
50 let futs = listeners.iter().map(|l| l.on_event(&payload));
51 join_all(futs).await;
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58 use std::sync::atomic::{AtomicU32, Ordering};
59
60 struct Count(&'static AtomicU32);
61 impl Listener for Count {
62 fn on_event(&self, _p: &serde_json::Value) -> BoxFuture<'static, ()> {
63 self.0.fetch_add(1, Ordering::Relaxed);
64 Box::pin(async {})
65 }
66 }
67
68 #[tokio::test]
69 async fn emit_reaches_all_subscribers_and_ignores_unknown_events() {
70 static HITS: AtomicU32 = AtomicU32::new(0);
71 let bus = EventBus::new();
72 bus.subscribe("user.created", Arc::new(Count(&HITS)));
73 bus.subscribe("user.created", Arc::new(Count(&HITS)));
74
75 bus.emit("user.created", serde_json::json!({"id": 1})).await;
76 assert_eq!(HITS.load(Ordering::Relaxed), 2);
77
78 bus.emit("other.event", serde_json::Value::Null).await; assert_eq!(HITS.load(Ordering::Relaxed), 2);
80
81 bus.subscribe("user.created", Arc::new(Count(&HITS)));
83 bus.emit("user.created", serde_json::Value::Null).await;
84 assert_eq!(HITS.load(Ordering::Relaxed), 5);
85 }
86}