use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use crate::AsyncSubscriptionPolicy;
use crate::SyncSubscriptionPolicy;
use crate::deps::Deps;
use crate::error::{EventBusError, HandlerResult};
use crate::registry::{ErasedAsyncHandlerFn, ErasedSyncHandlerFn, EventType, ListenerEntry, ListenerKind, ListenerPolicy};
use crate::subscription::Subscription;
use crate::types::Event;
pub trait EventHandler<E: Event>: Send + Sync + 'static {
fn handle(&self, event: &E, bus: &crate::bus::EventBus) -> impl Future<Output = HandlerResult> + Send;
fn name(&self) -> Option<&'static str> {
None
}
}
pub trait SyncEventHandler<E: Event>: Send + Sync + 'static {
fn handle(&self, event: &E, bus: &crate::bus::EventBus) -> HandlerResult;
fn name(&self) -> Option<&'static str> {
None
}
}
pub struct AsyncMode;
pub struct SyncMode;
pub struct AsyncFnMode;
pub struct SyncFnMode;
pub(crate) type RegisterAsyncFn =
Box<dyn FnOnce(crate::types::SubscriptionId, AsyncSubscriptionPolicy, bool, crate::bus::EventBus) -> ListenerEntry + Send>;
pub(crate) type RegisterSyncFn =
Box<dyn FnOnce(crate::types::SubscriptionId, SyncSubscriptionPolicy, bool, crate::bus::EventBus) -> ListenerEntry + Send>;
pub(crate) enum RegisteredHandler {
Async { register: RegisterAsyncFn, name: Option<&'static str> },
Sync { register: RegisterSyncFn, name: Option<&'static str> },
}
impl RegisteredHandler {
#[cfg(feature = "trace")]
pub(crate) const fn name(&self) -> Option<&'static str> {
match self {
Self::Async { name, .. } | Self::Sync { name, .. } => *name,
}
}
}
mod private {
use crate::registry::ListenerPolicy;
pub trait SealedHandlerPolicy {
#[allow(private_interfaces)]
fn into_listener_policy(self) -> ListenerPolicy;
}
}
pub trait HandlerPolicy: Copy + 'static + private::SealedHandlerPolicy {}
impl private::SealedHandlerPolicy for AsyncSubscriptionPolicy {
#[allow(private_interfaces)]
fn into_listener_policy(self) -> ListenerPolicy {
ListenerPolicy::Async(self)
}
}
impl private::SealedHandlerPolicy for SyncSubscriptionPolicy {
#[allow(private_interfaces)]
fn into_listener_policy(self) -> ListenerPolicy {
ListenerPolicy::Sync(self)
}
}
impl HandlerPolicy for AsyncSubscriptionPolicy {}
impl HandlerPolicy for SyncSubscriptionPolicy {}
pub(crate) fn into_listener_policy<P: HandlerPolicy>(policy: P) -> ListenerPolicy {
policy.into_listener_policy()
}
#[allow(private_interfaces)]
pub trait IntoHandler<E: Event, Mode> {
#[doc(hidden)]
type Policy;
#[doc(hidden)]
fn default_policy(defaults: &crate::types::SubscriptionDefaults) -> Self::Policy;
#[doc(hidden)]
fn into_handler(self) -> RegisteredHandler;
}
#[allow(private_interfaces)]
impl<E, H> IntoHandler<E, AsyncMode> for H
where
E: Event,
H: EventHandler<E>,
{
type Policy = AsyncSubscriptionPolicy;
fn default_policy(defaults: &crate::types::SubscriptionDefaults) -> Self::Policy {
defaults.policy
}
fn into_handler(self) -> RegisteredHandler {
let name = self.name();
let handler = Arc::new(self);
let register: RegisterAsyncFn = Box::new(move |id, subscription_policy, once, bus| {
let typed_fn: ErasedAsyncHandlerFn = Arc::new(move |event: EventType| {
let handler = Arc::clone(&handler);
let bus = bus.clone();
let event = event.downcast::<E>();
Box::pin(async move {
let event = event.map_err(|_| "event type mismatch")?;
handler.handle(&event, &bus).await
})
});
ListenerEntry {
id,
registration_order: 0,
kind: ListenerKind::Async(typed_fn),
subscription_policy: ListenerPolicy::Async(subscription_policy),
name,
once,
fired: once.then(|| Arc::new(AtomicBool::new(false))),
}
});
RegisteredHandler::Async { register, name }
}
}
#[allow(private_interfaces)]
impl<E, H> IntoHandler<E, SyncMode> for H
where
E: Event,
H: SyncEventHandler<E>,
{
type Policy = SyncSubscriptionPolicy;
fn default_policy(defaults: &crate::types::SubscriptionDefaults) -> Self::Policy {
defaults.sync_policy
}
fn into_handler(self) -> RegisteredHandler {
let name = self.name();
let handler = Arc::new(self);
let register: RegisterSyncFn = Box::new(move |id, subscription_policy, once, bus| {
let typed_fn: ErasedSyncHandlerFn = Arc::new(move |event: &(dyn std::any::Any + Send + Sync)| {
let Some(event) = event.downcast_ref::<E>() else {
return Err("event type mismatch".into());
};
handler.handle(event, &bus)
});
ListenerEntry {
id,
registration_order: 0,
kind: ListenerKind::Sync(typed_fn),
subscription_policy: ListenerPolicy::Sync(subscription_policy),
name,
once,
fired: once.then(|| Arc::new(AtomicBool::new(false))),
}
});
RegisteredHandler::Sync { register, name }
}
}
#[allow(private_interfaces)]
impl<E, F> IntoHandler<E, SyncFnMode> for F
where
E: Event,
F: Fn(&E, &crate::bus::EventBus) -> HandlerResult + Send + Sync + 'static,
{
type Policy = SyncSubscriptionPolicy;
fn default_policy(defaults: &crate::types::SubscriptionDefaults) -> Self::Policy {
defaults.sync_policy
}
fn into_handler(self) -> RegisteredHandler {
let handler = Arc::new(self);
let register: RegisterSyncFn = Box::new(move |id, subscription_policy, once, bus| {
let typed_fn: ErasedSyncHandlerFn = Arc::new(move |event: &(dyn std::any::Any + Send + Sync)| {
let Some(event) = event.downcast_ref::<E>() else {
return Err("event type mismatch".into());
};
handler(event, &bus)
});
ListenerEntry {
id,
registration_order: 0,
kind: ListenerKind::Sync(typed_fn),
subscription_policy: ListenerPolicy::Sync(subscription_policy),
name: None,
once,
fired: once.then(|| Arc::new(AtomicBool::new(false))),
}
});
RegisteredHandler::Sync { register, name: None }
}
}
#[allow(private_interfaces)]
impl<E, F, Fut> IntoHandler<E, AsyncFnMode> for F
where
E: Event + Clone,
F: Fn(E, crate::bus::EventBus) -> Fut + Send + Sync + 'static,
Fut: Future<Output = HandlerResult> + Send + 'static,
{
type Policy = AsyncSubscriptionPolicy;
fn default_policy(defaults: &crate::types::SubscriptionDefaults) -> Self::Policy {
defaults.policy
}
fn into_handler(self) -> RegisteredHandler {
let handler = Arc::new(self);
let register: RegisterAsyncFn = Box::new(move |id, subscription_policy, once, bus| {
let typed_fn: ErasedAsyncHandlerFn = Arc::new(move |event: EventType| {
let handler = Arc::clone(&handler);
let bus = bus.clone();
let event = event.downcast::<E>();
Box::pin(async move {
let event = event.map_err(|_| "event type mismatch")?;
let event = (*event).clone();
handler(event, bus).await
})
});
ListenerEntry {
id,
registration_order: 0,
kind: ListenerKind::Async(typed_fn),
subscription_policy: ListenerPolicy::Async(subscription_policy),
name: None,
once,
fired: once.then(|| Arc::new(AtomicBool::new(false))),
}
});
RegisteredHandler::Async { register, name: None }
}
}
pub trait HandlerDescriptor: Send + Sync + 'static {
fn register<'a>(
&'a self,
bus: &'a crate::bus::EventBus,
deps: &'a Deps,
) -> Pin<Box<dyn Future<Output = Result<Subscription, EventBusError>> + Send + 'a>>;
}
pub trait DeadLetterDescriptor: Send + Sync + 'static {
fn register_dead_letter<'a>(
&'a self,
bus: &'a crate::bus::EventBus,
deps: &'a Deps,
) -> Pin<Box<dyn Future<Output = Result<Subscription, EventBusError>> + Send + 'a>>;
}