Skip to main content

nestrs_events/
lib.rs

1//! In-process async event bus for domain and integration events (`#[on_event]` / Nest `EventEmitter2`).
2//!
3//! This crate is separate from `nestrs-microservices` so HTTP-only apps can depend on events
4//! without pulling transport adapters.
5
6use async_trait::async_trait;
7use serde::Serialize;
8use std::collections::HashMap;
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::{Arc, RwLock};
12
13type EventHandler =
14    Arc<dyn Fn(serde_json::Value) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
15
16/// In-process async event bus for integration/domain events.
17#[derive(Default)]
18pub struct EventBus {
19    handlers: RwLock<HashMap<String, Vec<EventHandler>>>,
20}
21
22impl EventBus {
23    pub fn new() -> Self {
24        Self::default()
25    }
26
27    pub fn subscribe<F, Fut>(&self, pattern: impl Into<String>, handler: F)
28    where
29        F: Fn(serde_json::Value) -> Fut + Send + Sync + 'static,
30        Fut: Future<Output = ()> + Send + 'static,
31    {
32        let mut guard = self.handlers.write().expect("event bus lock poisoned");
33        let entry = guard.entry(pattern.into()).or_default();
34        entry.push(Arc::new(move |payload| Box::pin(handler(payload))));
35    }
36
37    pub async fn emit_json(&self, pattern: &str, payload: serde_json::Value) {
38        let handlers = {
39            let guard = self.handlers.read().expect("event bus lock poisoned");
40            guard.get(pattern).cloned().unwrap_or_default()
41        };
42        for handler in handlers {
43            handler(payload.clone()).await;
44        }
45    }
46
47    pub async fn emit<T>(&self, pattern: &str, payload: &T)
48    where
49        T: Serialize + Send + Sync,
50    {
51        let json = serde_json::to_value(payload).unwrap_or_else(|e| {
52            panic!("EventBus emit serialize failed for pattern `{pattern}`: {e}")
53        });
54        self.emit_json(pattern, json).await;
55    }
56}
57
58#[async_trait]
59impl nestrs_core::Injectable for EventBus {
60    fn construct(_registry: &nestrs_core::ProviderRegistry) -> Arc<Self> {
61        Arc::new(Self::new())
62    }
63}