use std::any::TypeId;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::error::EventBusError;
use crate::handler::{HandlerPolicy, IntoHandler, RegisteredHandler, SyncEventHandler, into_listener_policy};
use crate::middleware::{Middleware, SyncMiddleware, TypedMiddleware, TypedSyncMiddleware};
use crate::registry::{ErasedMiddleware, EventType, ListenerPolicy, TypedMiddlewareEntry, TypedMiddlewareSlot};
use crate::subscription::Subscription;
use crate::types::{DeadLetter, Event, SyncSubscriptionPolicy};
use super::EventBus;
impl EventBus {
pub async fn subscribe<E, H, M>(&self, handler: H) -> Result<Subscription, EventBusError>
where
E: Event,
H: IntoHandler<E, M>,
H::Policy: HandlerPolicy,
{
self.subscribe_internal::<E, H::Policy>(handler.into_handler(), H::default_policy(&self.inner.subscription_defaults), false)
.await
}
pub async fn subscribe_with_policy<E, H, M>(&self, handler: H, policy: H::Policy) -> Result<Subscription, EventBusError>
where
E: Event,
H: IntoHandler<E, M>,
H::Policy: HandlerPolicy,
{
self.subscribe_internal::<E, H::Policy>(handler.into_handler(), policy, false).await
}
async fn subscribe_internal<E: Event, P>(
&self,
registered: RegisteredHandler,
subscription_policy: P,
once: bool,
) -> Result<Subscription, EventBusError>
where
P: HandlerPolicy,
{
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
#[cfg(feature = "trace")]
tracing::trace!("event_bus.subscribe {:?}", ®istered.name());
let id = self.next_subscription_id();
let subscription_policy = into_listener_policy(subscription_policy);
let (mut listener, name) = match (registered, subscription_policy) {
(RegisteredHandler::Async { register, name }, ListenerPolicy::Async(policy)) => (register(id, policy, once, self.clone()), name),
(RegisteredHandler::Sync { register, name }, ListenerPolicy::Sync(policy)) => (register(id, policy, once, self.clone()), name),
(RegisteredHandler::Async { .. }, ListenerPolicy::Sync(_)) => {
panic!("async handler registered with sync policy")
}
(RegisteredHandler::Sync { .. }, ListenerPolicy::Async(_)) => {
panic!("sync handler registered with async policy")
}
};
listener.registration_order = id.as_u64();
let mut registry = self.inner.registry.lock().await;
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
registry.add_listener(TypeId::of::<E>(), std::any::type_name::<E>(), listener);
self.refresh_snapshot_locked(®istry).await;
Ok(Subscription::new(id, name, self.clone()))
}
pub async fn subscribe_dead_letters<H>(&self, handler: H) -> Result<Subscription, EventBusError>
where
H: SyncEventHandler<DeadLetter>,
{
let policy = SyncSubscriptionPolicy::default().with_dead_letter(false);
self.subscribe_internal::<DeadLetter, SyncSubscriptionPolicy>(handler.into_handler(), policy, false)
.await
}
pub async fn add_middleware<M: Middleware>(&self, middleware: M) -> Result<Subscription, EventBusError> {
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
let mw = Arc::new(middleware);
let erased = ErasedMiddleware::Async(Arc::new(move |event_name: &'static str, event: EventType| {
let mw = Arc::clone(&mw);
Box::pin(async move { mw.process(event_name, event.as_ref()).await }) as Pin<Box<dyn Future<Output = _> + Send>>
}));
let id = self.next_subscription_id();
let mut registry = self.inner.registry.lock().await;
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
registry.add_global_middleware(id, erased);
self.refresh_snapshot_locked(®istry).await;
Ok(Subscription::new(id, None, self.clone()))
}
pub async fn add_sync_middleware<M: SyncMiddleware>(&self, middleware: M) -> Result<Subscription, EventBusError> {
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
let mw = Arc::new(middleware);
let erased = ErasedMiddleware::Sync(Arc::new(move |event_name: &'static str, event: &(dyn std::any::Any + Send + Sync)| {
mw.process(event_name, event)
}));
let id = self.next_subscription_id();
let mut registry = self.inner.registry.lock().await;
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
registry.add_global_middleware(id, erased);
self.refresh_snapshot_locked(®istry).await;
Ok(Subscription::new(id, None, self.clone()))
}
pub async fn add_typed_middleware<E, M>(&self, middleware: M) -> Result<Subscription, EventBusError>
where
E: Event,
M: TypedMiddleware<E>,
{
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
let mw = Arc::new(middleware);
let slot = TypedMiddlewareSlot {
id: self.next_subscription_id(),
middleware: TypedMiddlewareEntry::Async(Arc::new(move |event_name, event: EventType| {
let mw = Arc::clone(&mw);
let event = event.downcast::<E>();
Box::pin(async move {
match event {
Ok(event) => mw.process(event_name, event.as_ref()).await,
Err(_) => crate::middleware::MiddlewareDecision::Reject("event type mismatch".to_string()),
}
})
})),
};
let mut registry = self.inner.registry.lock().await;
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
registry.add_typed_middleware(TypeId::of::<E>(), std::any::type_name::<E>(), slot.clone());
self.refresh_snapshot_locked(®istry).await;
Ok(Subscription::new(slot.id, None, self.clone()))
}
pub async fn add_typed_sync_middleware<E, M>(&self, middleware: M) -> Result<Subscription, EventBusError>
where
E: Event,
M: TypedSyncMiddleware<E>,
{
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
let mw = Arc::new(middleware);
let slot = TypedMiddlewareSlot {
id: self.next_subscription_id(),
middleware: TypedMiddlewareEntry::Sync(Arc::new(move |event_name, event: &(dyn std::any::Any + Send + Sync)| {
let Some(event) = event.downcast_ref::<E>() else {
return crate::middleware::MiddlewareDecision::Reject("event type mismatch".to_string());
};
mw.process(event_name, event)
})),
};
let mut registry = self.inner.registry.lock().await;
if self.inner.is_stopped() {
return Err(EventBusError::Stopped);
}
registry.add_typed_middleware(TypeId::of::<E>(), std::any::type_name::<E>(), slot.clone());
self.refresh_snapshot_locked(®istry).await;
Ok(Subscription::new(slot.id, None, self.clone()))
}
}