use crate::resolver::Container;
use async_trait::async_trait;
use futures::future::join_all;
use std::error::Error;
use std::sync::Arc;
pub trait Event {
fn name(&self) -> String {
std::any::type_name::<Self>().to_string()
}
fn describe(&self) -> String {
format!("Event {}", self.name())
}
fn dispatch_strategy(&self) -> DispatchStrategy {
DispatchStrategy::sequential_continue_on_error()
}
}
#[async_trait]
pub trait EventHandler<E: Event + Send + Sync> {
async fn handle(&self, event: Arc<E>) -> Result<(), Box<dyn Error + Send + Sync>>;
}
pub enum DispatcherMode {
Sequential,
Parallel,
}
pub enum DispatchOnError {
StopOnError,
ContinueOnError,
}
pub struct DispatchStrategy {
pub mode: DispatcherMode,
pub on_error: DispatchOnError,
}
impl DispatchStrategy {
pub fn new(mode: DispatcherMode, on_error: DispatchOnError) -> Self {
Self { mode, on_error }
}
pub fn sequential_stop_on_error() -> Self {
Self {
mode: DispatcherMode::Sequential,
on_error: DispatchOnError::StopOnError,
}
}
pub fn sequential_continue_on_error() -> Self {
Self {
mode: DispatcherMode::Sequential,
on_error: DispatchOnError::ContinueOnError,
}
}
pub fn parallel_stop_on_error() -> Self {
Self {
mode: DispatcherMode::Parallel,
on_error: DispatchOnError::StopOnError,
}
}
pub fn parallel_continue_on_error() -> Self {
Self {
mode: DispatcherMode::Parallel,
on_error: DispatchOnError::ContinueOnError,
}
}
}
#[async_trait]
pub trait Dispatcher<E: Event + Send + Sync + 'static> {
fn get_event_handlers()
-> Arc<Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<E> + Send + Sync> + Send + Sync>>>;
async fn dispatch(event: E) -> Result<(), Vec<Box<dyn Error + Send + Sync>>> {
let event = Arc::new(event);
let handlers = Self::get_event_handlers()
.iter()
.map(|factory| factory())
.collect::<Vec<_>>();
let mut errors: Vec<Box<dyn Error + Send + Sync>> = vec![];
let strategy = event.dispatch_strategy();
let mode = strategy.mode;
let on_error = strategy.on_error;
match mode {
DispatcherMode::Sequential => {
for handler in handlers.iter() {
let res = handler.handle(event.clone()).await;
if res.is_err() {
if let DispatchOnError::StopOnError = on_error {
return Err(vec![res.err().unwrap()]);
} else {
errors.push(res.err().unwrap());
}
}
}
}
DispatcherMode::Parallel => {
let mut errors: Vec<Box<dyn Error + Send + Sync>> = vec![];
let futures_vec = handlers
.iter()
.map(|handler| {
let event_clone = event.clone();
let handler_clone = handler.clone();
async move { handler_clone.handle(event_clone).await }
})
.collect::<Vec<_>>();
let results = join_all(futures_vec).await;
for res in results {
if let Err(e) = res {
if let DispatchOnError::StopOnError = on_error {
return Err(vec![e]);
} else {
errors.push(e);
}
}
}
}
}
if !errors.is_empty() {
return Err(errors);
}
Ok(())
}
}
pub async fn dispatch<E: Event + Send + Sync + 'static>(
event: E,
) -> Result<(), Vec<Box<dyn std::error::Error + Send + Sync>>>
where
Container: Dispatcher<E>,
{
<Container as Dispatcher<E>>::dispatch(event).await
}
#[macro_export]
macro_rules! events_handlers {
($($item:tt)*) => {
$crate::__events_handlers_internal!($($item)*);
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! __events_handlers_internal {
() => {};
($event:path: [$($handler:ty),* $(,)?], $($rest:tt)*) => {
const _: () = {
use $crate::dispatcher::{Dispatcher, EventHandler};
use $crate::inject::Injectable;
use std::sync::{Arc, LazyLock};
static _EVENT_HANDLERS: LazyLock<Arc<Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<$event> + Send + Sync> + Send + Sync>>>> =
LazyLock::new(|| {
let mut handlers: Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<$event> + Send + Sync> + Send + Sync>> = Vec::new();
$(
let factory: Arc<dyn Fn() -> Arc<dyn EventHandler<$event> + Send + Sync> + Send + Sync> =
Arc::new(|| {
let handler = <$handler as Injectable>::inject();
Arc::new(handler) as Arc<dyn EventHandler<$event> + Send + Sync>
});
handlers.push(factory);
)*
Arc::new(handlers)
});
use $crate::resolver::{Container};
impl Dispatcher<$event> for Container {
fn get_event_handlers(&self) -> Arc<Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<$event> + Send + Sync> + Send + Sync>>> {
_EVENT_HANDLERS.clone()
}
}
};
$crate::__events_handlers_internal!($($rest)*);
};
($event:path: [$($handler:ty),* $(,)?]) => {
const _: () = {
use $crate::dispatcher::{Dispatcher, EventHandler};
use $crate::inject::Injectable;
use std::sync::{Arc, LazyLock};
static _EVENT_HANDLERS: LazyLock<Arc<Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<$event> + Send + Sync> + Send + Sync>>>> =
LazyLock::new(|| {
let mut handlers: Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<$event> + Send + Sync> + Send + Sync>> = Vec::new();
$(
let factory: Arc<dyn Fn() -> Arc<dyn EventHandler<$event> + Send + Sync> + Send + Sync> =
Arc::new(|| {
let handler = <$handler as Injectable>::inject();
Arc::new(handler) as Arc<dyn EventHandler<$event> + Send + Sync>
});
handlers.push(factory);
)*
Arc::new(handlers)
});
use $crate::resolver::{Container};
impl Dispatcher<$event> for Container {
fn get_event_handlers() -> Arc<Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<$event> + Send + Sync> + Send + Sync>>> {
_EVENT_HANDLERS.clone()
}
}
};
};
}
#[cfg(test)]
mod test {
#[test]
fn dispatcher_test() {
use crate::dispatcher::{Event, EventHandler, dispatch};
struct TestEvent {
pub info: String,
}
impl Event for TestEvent {
fn name(&self) -> String {
"TestEvent".to_string()
}
}
struct TestEventHandler;
#[async_trait::async_trait]
impl EventHandler<TestEvent> for TestEventHandler {
async fn handle(
&self,
event: std::sync::Arc<TestEvent>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Handling event: {}", event.info);
Ok(())
}
}
impl crate::inject::Injectable for TestEventHandler {
fn inject() -> Self {
TestEventHandler {}
}
}
events_handlers!(
TestEvent: [TestEventHandler]
);
let event = TestEvent {
info: "This is a test event".to_string(),
};
let result = futures::executor::block_on(dispatch(event));
assert!(result.is_ok());
}
}