use crate::core::*;
use async_trait::async_trait;
pub use noema_macros::event;
use std::{
pin::Pin,
sync::{OnceLock, RwLock},
};
pub trait Event {
fn name(&self) -> String {
std::any::type_name::<Self>().to_string()
}
fn describe(&self) -> String {
format!("Event {}", self.name())
}
}
#[async_trait]
pub trait EventHandler<E: Event + Send + Sync> {
async fn handle(&self, event: Arc<E>) -> Result<(), BoxDynError>;
}
pub trait BackgroundSpawner: Send + Sync {
fn spawn(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>);
}
pub trait BackgroundErrorHandler: Send + Sync {
fn handle(&self, error: BoxDynError);
}
pub static __NOEMA_DISPATCHER_SPAWNER: OnceLock<arc_dyn!(BackgroundSpawner)> = OnceLock::new();
pub static __NOEMA_DISPATCHER_ERROR_HANDLER: OnceLock<arc_dyn!(BackgroundErrorHandler)> =
OnceLock::new();
pub trait Dispatcher<E: Event + Send + Sync + 'static> {
fn get_event_handlers(
&self,
) -> Arc<Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<E> + Send + Sync> + Send + Sync>>>;
fn dispatch(&self, event: E) {
let spawner = __NOEMA_DISPATCHER_SPAWNER
.get()
.expect("use dispatchers init macro for set spawner")
.clone();
let error_handler = __NOEMA_DISPATCHER_ERROR_HANDLER
.get()
.expect("use dispatchers init macro for set error handler")
.clone();
let event = Arc::new(event);
for h in self
.get_event_handlers()
.iter()
.map(|factory| factory())
.collect::<Vec<_>>()
{
let error_handler_clone = error_handler.clone();
let event_clone = event.clone();
spawner.spawn(Box::pin(async move {
h.handle(event_clone)
.await
.unwrap_or_else(|e| error_handler_clone.handle(e));
}));
}
}
}
pub trait EventListeners<E: Event + Send + Sync> {
fn handlers(
&self,
) -> Arc<RwLock<Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<E> + Send + Sync> + Send + Sync>>>>;
fn add_handler(
&self,
factory: Arc<dyn Fn() -> Arc<dyn EventHandler<E> + Send + Sync> + Send + Sync>,
) {
self.handlers().write().unwrap().push(factory);
}
fn get_handlers(
&self,
) -> Arc<Vec<Arc<dyn Fn() -> Arc<dyn EventHandler<E> + Send + Sync> + Send + Sync>>> {
Arc::new(self.handlers().read().unwrap().clone())
}
}
pub fn listen<E, H>()
where
E: Event + Send + Sync + 'static,
H: EventHandler<E> + Injectable<Container> + Send + Sync + 'static,
Container: EventListeners<E>,
{
Container.add_handler(Arc::new(|| {
let handler = H::inject(&Container);
Arc::new(handler) as Arc<dyn EventHandler<E> + Send + Sync>
}));
}
pub fn dispatch<E: Event + Send + Sync + 'static>(event: E)
where
Container: Dispatcher<E>,
{
<Container as Dispatcher<E>>::dispatch(&Container, event);
}
#[macro_export]
macro_rules! dispatchers_init {
(
spawner: $spawner:path,
error_handler: $error_handler:path
) => {
use $crate::dispatcher::{__NOEMA_DISPATCHER_ERROR_HANDLER, __NOEMA_DISPATCHER_SPAWNER};
__NOEMA_DISPATCHER_SPAWNER
.set(Arc::new(<$spawner>::inject(&Container)))
.is_err()
.then(|| {
panic!("Dispatcher spawner already set");
});
__NOEMA_DISPATCHER_ERROR_HANDLER
.set(Arc::new(<$error_handler>::inject(&Container)))
.is_err()
.then(|| {
panic!("Dispatcher error handler already set");
});
};
}
#[cfg(test)]
mod test {
use crate::core::*;
use crate::dispatcher::{
BackgroundErrorHandler, BackgroundSpawner, Dispatcher, Event, EventHandler, EventListeners,
event, listen,
};
#[event(name = "Papus", description = "A simple hola event")]
struct Hola {
value: i32,
}
#[event(name = "Adios", description = "A simple hola event")]
struct Adios {
value: i32,
}
#[derive(Injectable)]
struct HolaHandler {}
#[async_trait::async_trait]
impl EventHandler<Hola> for HolaHandler {
async fn handle(&self, event: Arc<Hola>) -> Result<(), BoxDynError> {
println!("Handling event: {}", event.describe());
Ok(())
}
}
#[derive(Injectable)]
struct AdiosHandler {}
#[async_trait::async_trait]
impl EventHandler<Adios> for AdiosHandler {
async fn handle(&self, event: Arc<Adios>) -> NoemaResult<()> {
println!("Handling event: {}", event.describe());
Ok(())
}
}
#[test]
fn subscriber_test() {
#[derive(Injectable)]
struct FuturesSpawner {}
impl BackgroundSpawner for FuturesSpawner {
fn spawn(&self, fut: std::pin::Pin<Box<dyn Future<Output = ()> + Send>>) {
futures::executor::block_on(fut);
}
}
#[derive(Injectable)]
struct StdErrorHandler {}
impl BackgroundErrorHandler for StdErrorHandler {
fn handle(&self, error: BoxDynError) {
eprintln!("Background task error: {}", error);
}
}
let hola: Arc<dyn Dispatcher<Hola> + Send + Sync> = Arc::new(Container);
#[derive(Injectable)]
struct TestPapu {
dispatcher: Arc<dyn Dispatcher<Hola> + Send + Sync>,
}
impl TestPapu {
fn dispatch(&self, event: Hola) {
self.dispatcher.dispatch(event);
}
}
dispatchers_init!(spawner: FuturesSpawner, error_handler: StdErrorHandler);
listen::<Hola, HolaHandler>();
listen::<Hola, HolaHandler>();
listen::<Adios, AdiosHandler>();
let test_papu = TestPapu::inject(&Container);
test_papu.dispatch(Hola { value: 42 });
}
}