use crate::prelude::{Command, Message};
use crate::responses::{ApplicationError, ApplicationResponse, BaseError};
use tokio::sync::{
mpsc::{channel, error::TryRecvError, Receiver, Sender},
RwLock,
};
use std::{
any::{Any, TypeId},
collections::HashMap,
pin::Pin,
sync::Arc,
};
pub type Future<T, E> = Pin<Box<dyn futures::Future<Output = Result<T, E>> + Send>>;
pub type TEventHandler<T, R, E> = HashMap<String, Vec<Box<dyn Fn(Box<dyn Message>, T) -> Future<R, E> + Send + Sync>>>;
pub type TCommandHandler<T, R, E> = HashMap<TypeId, Box<dyn Fn(Box<dyn Any + Send + Sync>, T) -> Future<R, E> + Send + Sync>>;
pub type AtomicContextManager = Arc<RwLock<ContextManager>>;
pub struct ContextManager {
pub sender: Sender<Box<dyn Message>>,
}
impl ContextManager {
pub fn new() -> (Arc<RwLock<Self>>, Receiver<Box<dyn Message>>) {
let (sender, receiver) = channel(20);
(Arc::new(RwLock::new(Self { sender })), receiver)
}
}
pub struct MessageBus<R: ApplicationResponse, E: ApplicationError + std::convert::From<crate::responses::BaseError>> {
command_handler: &'static TCommandHandler<AtomicContextManager, R, E>,
event_handler: &'static TEventHandler<AtomicContextManager, R, E>,
}
impl<R: ApplicationResponse, E: ApplicationError + std::convert::From<crate::responses::BaseError>> MessageBus<R, E> {
pub fn new(
command_handler: &'static TCommandHandler<AtomicContextManager, R, E>,
event_handler: &'static TEventHandler<AtomicContextManager, R, E>,
) -> Arc<Self> {
Self {
command_handler,
event_handler,
}
.into()
}
pub async fn handle<C>(&self, message: C) -> Result<R, E>
where
C: Command,
{
let (context_manager, mut event_receiver) = ContextManager::new();
let res = self.command_handler.get(&message.type_id()).ok_or_else(|| {
eprintln!("Unprocessable Command Given!");
BaseError::CommandNotFound
})?(Box::new(message), context_manager.clone())
.await?;
'event_handling_loop: loop {
match event_receiver.try_recv() {
Ok(msg) => {
tracing::info!("BreakPoint on OK");
if let Err(err) = self.handle_event(msg, context_manager.clone()).await {
if let "EventNotFound" = err.to_string().as_str() {
continue;
}
}
}
Err(TryRecvError::Empty) => {
tracing::info!("BreakPoint on Empty");
if Arc::strong_count(&context_manager) == 1 {
break 'event_handling_loop;
} else {
continue;
}
}
Err(TryRecvError::Disconnected) => {
tracing::error!("BreakPoint on Disconnected");
break 'event_handling_loop;
}
};
}
drop(context_manager);
Ok(res)
}
async fn handle_event(&self, msg: Box<dyn Message>, context_manager: AtomicContextManager) -> Result<(), E> {
let handlers = self.event_handler.get(&msg.metadata().topic).ok_or_else(|| {
eprintln!("Unprocessable Event Given! {:?}", msg);
BaseError::EventNotFound
})?;
for handler in handlers.iter() {
println!("Handle Event : {:?}", msg);
match handler(msg.message_clone(), context_manager.clone()).await {
Ok(_val) => {
println!("Event Handling Succeeded!");
}
Err(err) => match err.to_string().as_str() {
"StopSentinel" => {
eprintln!("Stop Sentinel Arrived!");
break;
}
_ => {
eprintln!("Error Occurred While Handling Event! Error:{}", err);
}
},
};
}
drop(context_manager);
Ok(())
}
}
#[macro_export]
macro_rules! init_command_handler {
(
{$($command:ty:$handler:expr $(=>($($injectable:ident),*))? ),* $(,)?}
)
=> {
pub async fn init_command_handler() -> HashMap::<TypeId,Box<dyn Fn(Box<dyn Any + Send + Sync>, AtomicContextManager) -> Future<ServiceResponse,ServiceError> + Send + Sync>>{
let _dependency= dependency();
let mut _map: HashMap::<_,Box<dyn Fn(_, _ ) -> Future<_,_> + Send + Sync>> = HashMap::new();
$(
_map.insert(
TypeId::of::<$command>(),
Box::new(
|c:Box<dyn Any+Send+Sync>, context_manager: AtomicContextManager|->Future<ServiceResponse,ServiceError>{
Box::pin($handler(
*c.downcast::<$command>().unwrap(),
context_manager,
$(
$(dependency.$injectable(),)*
)?
))
},
)
);
)*
_map
}
};
}
#[macro_export]
macro_rules! init_event_handler {
(
{$($event:ty: [$($handler:expr $(=>($($injectable:ident),*))? ),* $(,)? ]),* $(,)?}
) =>{
pub async fn init_event_handler() -> HashMap<String, Vec<Box<dyn Fn(Box<dyn Message>, AtomicContextManager) -> Future<ServiceResponse,ServiceError> + Send + Sync>>>{
let dependency= dependency();
let mut _map : HashMap<String, Vec<Box<dyn Fn(_, _) -> Future<_,_> + Send + Sync>>> = HashMap::new();
$(
_map.insert(
stringify!($event).into(),
vec![
$(
Box::new(
|e:Box<dyn Message>, context_manager:AtomicContextManager| -> Future<ServiceResponse,ServiceError>{
Box::pin($handler(
*e.downcast::<$event>().expect("Not Convertible!"), context_manager,
$(
$(dependency.$injectable(),)*
)?
))
}
),
)*
]
);
)*
_map
}
};
}