use crate::prelude::{Command, Message};
use crate::responses::{ApplicationError, ApplicationResponse, BaseError};
use async_recursion::async_recursion;
use hashbrown::HashMap;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use std::{
any::{Any, TypeId},
pin::Pin,
sync::Arc,
};
use tokio::sync::RwLock;
pub type Future<T, E> = Pin<Box<dyn futures::Future<Output = Result<T, E>> + Send>>;
pub type AtomicContextManager = Arc<RwLock<ContextManager>>;
pub type TCommandHandler<R, E> = HashMap<TypeId, Box<dyn Fn(Box<dyn Any + Send + Sync>, AtomicContextManager) -> Future<R, E> + Send + Sync>>;
pub type TEventHandler<R, E> = HashMap<String, Vec<Box<dyn Fn(Box<dyn Message>, AtomicContextManager) -> Future<R, E> + Send + Sync>>>;
pub struct ContextManager {
pub event_queue: VecDeque<Box<dyn Message>>,
}
impl ContextManager {
pub fn new() -> AtomicContextManager {
Arc::new(RwLock::new(Self { event_queue: VecDeque::new() }))
}
}
impl Deref for ContextManager {
type Target = VecDeque<Box<dyn Message>>;
fn deref(&self) -> &Self::Target {
&self.event_queue
}
}
impl DerefMut for ContextManager {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.event_queue
}
}
pub struct MessageBus<R: ApplicationResponse, E: ApplicationError> {
command_handler: &'static TCommandHandler<R, E>,
event_handler: &'static TEventHandler<R, E>,
}
impl<R, E> MessageBus<R, E>
where
R: ApplicationResponse + std::marker::Send,
E: ApplicationError + std::convert::From<crate::responses::BaseError> + std::convert::Into<crate::responses::BaseError> + std::marker::Send,
{
pub fn new(command_handler: &'static TCommandHandler<R, E>, event_handler: &'static TEventHandler<R, E>) -> Arc<Self> {
Self { command_handler, event_handler }.into()
}
pub async fn handle<C>(&self, message: C) -> Result<R, E>
where
C: Command,
{
println!("Handle Command {:?}", message);
let context_manager = ContextManager::new();
let res = self.command_handler.get(&message.type_id()).ok_or_else(|| {
eprintln!("Unprocessable Command Given!");
BaseError::NotFound
})?(Box::new(message), context_manager.clone())
.await?;
if !context_manager.read().await.event_queue.is_empty() {
let event = context_manager.write().await.event_queue.pop_front();
let _ = self._handle_event(event.unwrap(), context_manager.clone()).await;
}
Ok(res)
}
pub async fn handle_event(&self, msg: Box<dyn Message>) -> Result<(), E> {
let context_manager = ContextManager::new();
self._handle_event(msg, context_manager.clone()).await
}
#[async_recursion]
async fn _handle_event(&self, msg: Box<dyn Message>, context_manager: AtomicContextManager) -> Result<(), E> {
println!("Handle Event : {:?}", msg);
let handlers = self.event_handler.get(&msg.metadata().topic).ok_or_else(|| {
eprintln!("Unprocessable Event Given! {:?}", msg);
BaseError::NotFound
})?;
for handler in handlers.iter() {
match handler(msg.message_clone(), context_manager.clone()).await {
Ok(_val) => {
eprintln!("Event Handling Succeeded!");
}
Err(err) => match err.into() {
BaseError::StopSentinel => {
eprintln!("Stop Sentinel Arrived!");
break;
}
BaseError::StopSentinelWithEvent(event) => {
eprintln!("Stop Sentinel With Event Arrived!");
context_manager.write().await.push_back(event);
break;
}
err => {
eprintln!("Error Occurred While Handling Event! Error:{:?}", err);
}
},
};
}
let event = context_manager.write().await.event_queue.pop_front();
if event.is_some() {
if let Err(err) = self._handle_event(event.unwrap(), context_manager.clone()).await {
eprintln!("{:?}", err);
}
}
Ok(())
}
}
#[macro_export]
macro_rules! init_command_handler {
(
{$($command:ty:$handler:expr ),* $(,)?}
)
=> {
pub fn command_handler() -> &'static TCommandHandler<ServiceResponse, ServiceError> {
extern crate self as current_crate;
static COMMAND_HANDLER: ::std::sync::OnceLock<TCommandHandler<ServiceResponse, ServiceError>> = OnceLock::new();
COMMAND_HANDLER.get_or_init(||{
let mut _map: TCommandHandler<ServiceResponse,ServiceError>= event_driven_library::prelude::HandlerMapper::new();
$(
_map.insert(
TypeId::of::<$command>(),
Box::new(|c:Box<dyn Any+Send+Sync>, context_manager: event_driven_library::prelude::AtomicContextManager|->Future<ServiceResponse,ServiceError> {
Box::pin($handler(
*c.downcast::<$command>().unwrap(),
context_manager,
))
},
));
)*
_map
})
}
};
}
#[macro_export]
macro_rules! init_event_handler {
(
{$($event:ty: [$($handler:expr),* $(,)? ]),* $(,)?}
) =>{
pub fn event_handler() -> &'static TEventHandler<ServiceResponse, ServiceError> {
extern crate self as current_crate;
static EVENT_HANDLER: ::std::sync::OnceLock<TEventHandler<ServiceResponse, ServiceError>> = OnceLock::new();
EVENT_HANDLER.get_or_init(||{
let mut _map : TEventHandler<ServiceResponse, ServiceError> = event_driven_library::prelude::HandlerMapper::new();
$(
_map.insert(
stringify!($event).into(),
vec![
$(
Box::new(
|e:Box<dyn Message>, context_manager:event_driven_library::prelude::AtomicContextManager| -> Future<ServiceResponse,ServiceError>{
Box::pin($handler(
*e.downcast::<$event>().expect("Not Convertible!"), context_manager,
))
}
),
)*
]
);
)*
_map
})
}
}}