use crate::prelude::{Command, Message};
use crate::responses::{ApplicationError, ApplicationResponse, BaseError};
use tokio::sync::{
mpsc::{channel, error::TryRecvError, Receiver, Sender},
RwLock,
};
use std::{
any::{Any, TypeId},
collections::HashMap,
pin::Pin,
sync::Arc,
};
pub type Future<T, E> = Pin<Box<dyn futures::Future<Output = Result<T, E>> + Send>>;
pub type AtomicContextManager = Arc<RwLock<ContextManager>>;
pub type TEventHandler<R, E> = HashMap<String, Vec<Box<dyn Fn(Box<dyn Message>, AtomicContextManager) -> Future<R, E> + Send + Sync>>>;
pub struct ContextManager {
pub sender: Sender<Box<dyn Message>>,
}
impl ContextManager {
pub fn new() -> (Arc<RwLock<Self>>, Receiver<Box<dyn Message>>) {
let (sender, receiver) = channel(20);
(Arc::new(RwLock::new(Self { sender })), receiver)
}
}
pub struct MessageBus<R: ApplicationResponse, E: ApplicationError + std::convert::Into<crate::responses::BaseError> + std::convert::From<crate::responses::BaseError>> {
command_handler: &'static TCommandHandler<R, E>,
event_handler: &'static TEventHandler<R, E>,
}
impl<R: ApplicationResponse, E: ApplicationError + std::convert::Into<crate::responses::BaseError> + std::convert::From<crate::responses::BaseError>> MessageBus<R, E> {
pub fn new(command_handler: &'static TCommandHandler<R, E>, event_handler: &'static TEventHandler<R, E>) -> Arc<Self> {
Self { command_handler, event_handler }.into()
}
pub async fn handle<C>(&self, message: C) -> Result<R, E>
where
C: Command,
{
println!("Handle Command {:?}", message);
let (context_manager, mut event_receiver) = ContextManager::new();
let res = self.command_handler.get(&message.type_id()).ok_or_else(|| {
eprintln!("Unprocessable Command Given!");
BaseError::CommandNotFound
})?(Box::new(message), context_manager.clone())
.await?;
'event_handling_loop: loop {
match event_receiver.try_recv() {
Ok(msg) => {
tracing::info!("BreakPoint on OK");
if let Err(err) = self.handle_event(msg, context_manager.clone()).await {
if let "EventNotFound" = err.to_string().as_str() {
continue;
}
}
}
Err(TryRecvError::Empty) => {
tracing::info!("BreakPoint on Empty");
if Arc::strong_count(&context_manager) == 1 {
break 'event_handling_loop;
} else {
continue;
}
}
Err(TryRecvError::Disconnected) => {
tracing::error!("BreakPoint on Disconnected");
break 'event_handling_loop;
}
};
}
drop(context_manager);
Ok(res)
}
async fn handle_event(&self, msg: Box<dyn Message>, context_manager: AtomicContextManager) -> Result<(), E> {
let handlers = self.event_handler.get(&msg.metadata().topic).ok_or_else(|| {
eprintln!("Unprocessable Event Given! {:?}", msg);
BaseError::EventNotFound
})?;
println!("Handle Event : {:?}", msg);
for handler in handlers.iter() {
match handler(msg.message_clone(), context_manager.clone()).await {
Ok(_val) => {
eprintln!("Event Handling Succeeded!");
}
Err(err) => match err.into() {
BaseError::StopSentinel => {
eprintln!("Stop Sentinel Arrived!");
break;
}
BaseError::StopSentinelWithEvent(event) => {
eprintln!("Stop Sentinel With Event Arrived!");
context_manager.write().await.sender.send(event).await.expect("Event Collecting failed!");
break;
}
err => {
eprintln!("Error Occurred While Handling Event! Error:{:?}", err);
}
},
};
}
drop(context_manager);
Ok(())
}
}
#[macro_export]
macro_rules! create_dependency {
() => {
pub struct Dependency;
pub fn dependency() -> &'static Dependency {
static DEPENDENCY: ::std::sync::OnceLock<Dependency> = ::std::sync::OnceLock::new();
DEPENDENCY.get_or_init(|| Dependency)
}
};
}
pub type TCommandHandler<R, E> = HashMap<TypeId, fn(Box<dyn Any + Send + Sync>, AtomicContextManager) -> Future<R, E>>;
#[macro_export]
macro_rules! init_command_handler {
(
{$($command:ty:$handler:expr $(=>($($injectable:ident),*))? ),* $(,)?}
)
=> {
pub fn command_handler() -> &'static TCommandHandler<ServiceResponse, ServiceError> {
extern crate self as current_crate;
static COMMAND_HANDLER: ::std::sync::OnceLock<TCommandHandler<ServiceResponse, ServiceError>> = OnceLock::new();
COMMAND_HANDLER.get_or_init(||{
let dependency= current_crate::dependencies::dependency();
let mut _map: TCommandHandler<ServiceResponse,ServiceError>= HashMap::new();
$(
_map.insert(
TypeId::of::<$command>(),
|c:Box<dyn Any+Send+Sync>, context_manager: AtomicContextManager|->Future<ServiceResponse,ServiceError> {
Box::pin($handler(
*c.downcast::<$command>().unwrap(),
context_manager,
$(
$(dependency.$injectable(),)*
)?
))
},
);
)*
_map
})
}
};
}
#[macro_export]
macro_rules! init_event_handler {
(
{$($event:ty: [$($handler:expr $(=>($($injectable:ident),*))? ),* $(,)? ]),* $(,)?}
) =>{
pub fn event_handler() -> &'static TEventHandler<ServiceResponse, ServiceError> {
extern crate self as current_crate;
static EVENT_HANDLER: ::std::sync::OnceLock<TEventHandler<ServiceResponse, ServiceError>> = OnceLock::new();
EVENT_HANDLER.get_or_init(||{
let dependency= current_crate::dependencies::dependency();
let mut _map : TEventHandler<ServiceResponse, ServiceError> = HashMap::new();
$(
_map.insert(
stringify!($event).into(),
vec![
$(
Box::new(
|e:Box<dyn Message>, context_manager:AtomicContextManager| -> Future<ServiceResponse,ServiceError>{
Box::pin($handler(
*e.downcast::<$event>().expect("Not Convertible!"), context_manager,
$(
$(dependency.$injectable(),)*
)?
))
}
),
)*
]
);
)*
_map
})
}
}}