use crate::dispatcher::EventDispatcher;
use crate::registry::EventRegistry;
use crate::subscription::{EventHandler, SubscriptionHandle, SubscriptionManager};
use crate::{Error, Event, EventEnvelope, EventMetadata, Result};
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;
pub mod builder;
pub mod config;
pub use builder::EventBusBuilder;
pub use config::EventBusConfig;
type ShutdownHook = Box<dyn Fn() -> futures::future::BoxFuture<'static, Result<()>> + Send + Sync>;
#[derive(Clone)]
#[allow(missing_debug_implementations)]
pub struct EventBus {
pub(crate) config: EventBusConfig,
pub(crate) registry: Arc<dyn EventRegistry>,
pub(crate) subscription_manager: Arc<SubscriptionManager>,
pub(crate) dispatcher: Arc<tokio::sync::Mutex<Option<Box<dyn EventDispatcher>>>>,
pub(crate) shutdown_hooks: Arc<Mutex<Vec<ShutdownHook>>>,
pub(crate) is_shutting_down: Arc<AtomicBool>,
pub(crate) dlq_rx: Arc<tokio::sync::Mutex<Option<tokio::sync::mpsc::Receiver<Arc<EventEnvelope>>>>>,
}
impl EventBus {
pub fn builder() -> EventBusBuilder {
EventBusBuilder::new()
}
pub async fn take_dlq_receiver(&self) -> Option<tokio::sync::mpsc::Receiver<Arc<EventEnvelope>>> {
self.dlq_rx.lock().await.take()
}
pub async fn publish<T: Event>(&self, event: T) -> Result<Uuid> {
self.publish_with_metadata(event, EventMetadata::new())
.await
}
pub async fn publish_with_metadata<T: Event>(
&self,
event: T,
metadata: EventMetadata,
) -> Result<Uuid> {
if self.is_shutting_down.load(Ordering::SeqCst) {
return Err(Error::ShuttingDown);
}
let event_id = metadata.event_id;
trace!(
event_id = %event_id,
event_type = T::event_type(),
"Publishing event"
);
let envelope = EventEnvelope::with_metadata(event, metadata);
{
let guard = self.dispatcher.lock().await;
let dispatcher = guard.as_ref().ok_or(Error::ShuttingDown)?;
dispatcher.dispatch(envelope).await?;
}
debug!(
event_id = %event_id,
event_type = T::event_type(),
"Event published successfully"
);
Ok(event_id)
}
pub async fn subscribe<T, F, Fut>(&self, handler: F) -> Result<SubscriptionHandle>
where
T: Event,
F: Fn(T) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
if self.is_shutting_down.load(Ordering::SeqCst) {
return Err(Error::ShuttingDown);
}
self.subscription_manager.subscribe_fn(handler).await
}
pub async fn subscribe_fallible<T, F, Fut>(&self, handler: F) -> Result<SubscriptionHandle>
where
T: Event,
F: Fn(T) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
if self.is_shutting_down.load(Ordering::SeqCst) {
return Err(Error::ShuttingDown);
}
let function_handler = crate::subscription::handler::FallibleFunctionHandler::new(handler);
self.subscription_manager.subscribe::<T, _>(function_handler).await
}
pub async fn subscribe_handler<T, H>(&self, handler: H) -> Result<SubscriptionHandle>
where
T: Event,
H: EventHandler,
{
if self.is_shutting_down.load(Ordering::SeqCst) {
return Err(Error::ShuttingDown);
}
self.subscription_manager.subscribe::<T, H>(handler).await
}
pub async fn unsubscribe(&self, handle: SubscriptionHandle) -> Result<()> {
self.subscription_manager.unsubscribe(handle).await
}
pub async fn replay_pending(&self) -> Result<()> {
let guard = self.dispatcher.lock().await;
if let Some(dispatcher) = guard.as_ref() {
dispatcher.replay_pending().await
} else {
Err(Error::ShuttingDown)
}
}
pub fn stats(&self) -> EventBusStats {
let dispatcher_stats = self.dispatcher.try_lock()
.ok()
.and_then(|guard| guard.as_ref().map(|d| d.stats()))
.unwrap_or_default();
EventBusStats {
total_subscriptions: self.registry.total_subscriptions(),
event_types: self.registry.event_types().len(),
dispatcher_stats,
subscription_stats: self.subscription_manager.stats(),
}
}
pub async fn register_shutdown_hook<F, Fut>(&self, hook: F) -> Result<()>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
let hook = Box::new(move || -> futures::future::BoxFuture<'static, Result<()>> {
Box::pin(hook())
});
self.shutdown_hooks.lock().await.push(hook);
Ok(())
}
pub fn is_shutting_down(&self) -> bool {
self.is_shutting_down.load(Ordering::SeqCst)
}
pub async fn shutdown(&self) -> Result<()> {
info!("Shutting down EventBus (abruptly)");
self.is_shutting_down.store(true, Ordering::SeqCst);
let hooks = self.shutdown_hooks.lock().await;
for hook in hooks.iter() {
if let Err(e) = hook().await {
error!("Shutdown hook failed: {}", e);
}
}
drop(hooks);
let dispatcher_shutdown = tokio::time::timeout(self.config.shutdown_timeout, async {
let mut guard = self.dispatcher.lock().await;
if let Some(mut dispatcher) = guard.take() {
dispatcher.stop().await
} else {
Ok(())
}
});
if dispatcher_shutdown.await.is_err() {
warn!("Dispatcher shutdown timed out");
}
self.subscription_manager.shutdown().await?;
info!("EventBus abrupt shutdown complete");
Ok(())
}
pub async fn shutdown_gracefully(&self) -> Result<()> {
info!("Shutting down EventBus gracefully");
self.is_shutting_down.store(true, Ordering::SeqCst);
let hooks = self.shutdown_hooks.lock().await;
for hook in hooks.iter() {
if let Err(e) = hook().await {
error!("Shutdown hook failed: {}", e);
}
}
drop(hooks);
{
let mut guard = self.dispatcher.lock().await;
if let Some(mut dispatcher) = guard.take() {
if let Err(e) = dispatcher.shutdown_gracefully().await {
error!("Dispatcher graceful shutdown failed: {}", e);
}
}
}
self.subscription_manager.shutdown_gracefully().await;
info!("EventBus graceful shutdown complete");
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct EventBusStats {
pub total_subscriptions: usize,
pub event_types: usize,
pub dispatcher_stats: crate::dispatcher::DispatcherStats,
pub subscription_stats: crate::subscription::SubscriptionStats,
}
impl std::fmt::Display for EventBusStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"EventBus Stats: {} subscriptions, {} event types, {} events dispatched",
self.total_subscriptions, self.event_types, self.dispatcher_stats.events_dispatched
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct TestEvent {
value: String,
}
impl Event for TestEvent {
fn event_type() -> &'static str {
"TestEvent"
}
}
#[tokio::test]
async fn test_event_bus_basic() {
let bus = EventBus::builder()
.configure(|c| c.enable_tracing(false))
.build()
.await
.unwrap();
let received = Arc::new(Mutex::new(Vec::new()));
let received_clone = received.clone();
let handle = bus
.subscribe(move |event: TestEvent| {
let received = received_clone.clone();
async move {
received.lock().await.push(event.value);
}
})
.await
.unwrap();
bus.publish(TestEvent {
value: "first".into(),
})
.await
.unwrap();
bus.publish(TestEvent {
value: "second".into(),
})
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let messages = received.lock().await;
assert_eq!(messages.len(), 2);
assert!(messages.contains(&"first".to_string()));
assert!(messages.contains(&"second".to_string()));
bus.unsubscribe(handle).await.unwrap();
bus.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_event_bus_stats() {
let bus = EventBus::builder().build().await.unwrap();
let _handle1 = bus.subscribe(|_: TestEvent| async {}).await.unwrap();
let _handle2 = bus.subscribe(|_: TestEvent| async {}).await.unwrap();
let stats = bus.stats();
assert_eq!(stats.total_subscriptions, 2);
assert_eq!(stats.event_types, 1);
bus.shutdown().await.unwrap();
}
}