1use async_trait::async_trait;
7use serde::Serialize;
8use std::collections::HashMap;
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::{Arc, RwLock};
12
13type EventHandler =
14 Arc<dyn Fn(serde_json::Value) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
15
16#[derive(Default)]
18pub struct EventBus {
19 handlers: RwLock<HashMap<String, Vec<EventHandler>>>,
20}
21
22impl EventBus {
23 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn subscribe<F, Fut>(&self, pattern: impl Into<String>, handler: F)
28 where
29 F: Fn(serde_json::Value) -> Fut + Send + Sync + 'static,
30 Fut: Future<Output = ()> + Send + 'static,
31 {
32 let mut guard = self.handlers.write().expect("event bus lock poisoned");
33 let entry = guard.entry(pattern.into()).or_default();
34 entry.push(Arc::new(move |payload| Box::pin(handler(payload))));
35 }
36
37 pub async fn emit_json(&self, pattern: &str, payload: serde_json::Value) {
38 let handlers = {
39 let guard = self.handlers.read().expect("event bus lock poisoned");
40 guard.get(pattern).cloned().unwrap_or_default()
41 };
42 for handler in handlers {
43 handler(payload.clone()).await;
44 }
45 }
46
47 pub async fn emit<T>(&self, pattern: &str, payload: &T)
48 where
49 T: Serialize + Send + Sync,
50 {
51 let json = serde_json::to_value(payload).unwrap_or_else(|e| {
52 panic!("EventBus emit serialize failed for pattern `{pattern}`: {e}")
53 });
54 self.emit_json(pattern, json).await;
55 }
56}
57
58#[async_trait]
59impl nestrs_core::Injectable for EventBus {
60 fn construct(_registry: &nestrs_core::ProviderRegistry) -> Arc<Self> {
61 Arc::new(Self::new())
62 }
63}