use crate::prelude::{Command, Message, TCommandService};
use crate::responses::{self, ApplicationError, ApplicationResponse, BaseError};
use async_trait::async_trait;
use hashbrown::HashMap;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use std::{
any::{Any, TypeId},
pin::Pin,
sync::Arc,
};
use tokio::sync::RwLock;
pub type Future<T, E> = Pin<Box<dyn futures::Future<Output = Result<T, E>> + Send>>;
pub type AtomicContextManager = Arc<RwLock<ContextManager>>;
pub type TCommandHandler<R, E> = HashMap<TypeId, Box<dyn Fn(Box<dyn Any + Send + Sync>, AtomicContextManager) -> Future<R, E> + Send + Sync>>;
pub type TEventHandler<R, E> = HashMap<String, Vec<Box<dyn Fn(Box<dyn Message>, AtomicContextManager) -> Future<R, E> + Send + Sync>>>;
pub struct ContextManager {
pub event_queue: VecDeque<Box<dyn Message>>,
}
impl ContextManager {
pub fn new() -> AtomicContextManager {
Arc::new(RwLock::new(Self { event_queue: VecDeque::new() }))
}
}
impl Deref for ContextManager {
type Target = VecDeque<Box<dyn Message>>;
fn deref(&self) -> &Self::Target {
&self.event_queue
}
}
impl DerefMut for ContextManager {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.event_queue
}
}
#[async_trait]
pub trait TMessageBus<R, E, C>
where
responses::BaseError: std::convert::From<E>,
R: ApplicationResponse,
E: ApplicationError + std::convert::From<crate::responses::BaseError>,
C: Command,
{
fn command_handler(&self, context_manager: AtomicContextManager) -> Box<dyn TCommandService<R, E, C>>;
fn event_handler(&self) -> &'static TEventHandler<R, E>;
async fn handle(&self, message: C) -> Result<R, E>
where
C: Command,
{
let context_manager = ContextManager::new();
let res = self.command_handler(context_manager.clone()).execute(message).await?;
if !context_manager.read().await.event_queue.is_empty() {
let event = context_manager.write().await.event_queue.pop_front();
let _ = self._handle_event(event.unwrap(), context_manager.clone()).await;
}
Ok(res)
}
async fn handle_event(&self, msg: Box<dyn Message>) -> Result<(), E> {
let context_manager = ContextManager::new();
self._handle_event(msg, context_manager.clone()).await
}
async fn _handle_event(&self, msg: Box<dyn Message>, context_manager: AtomicContextManager) -> Result<(), E> {
println!("Handle Event : {:?}", msg);
let handlers = self.event_handler().get(&msg.metadata().topic).ok_or_else(|| {
eprintln!("Unprocessable Event Given! {:?}", msg);
BaseError::NotFound
})?;
for handler in handlers.iter() {
match handler(msg.message_clone(), context_manager.clone()).await {
Ok(_val) => {
eprintln!("Event Handling Succeeded!");
}
Err(err) => match err.into() {
BaseError::StopSentinel => {
eprintln!("Stop Sentinel Arrived!");
break;
}
BaseError::StopSentinelWithEvent(event) => {
eprintln!("Stop Sentinel With Event Arrived!");
context_manager.write().await.push_back(event);
break;
}
err => {
eprintln!("Error Occurred While Handling Event! Error:{:?}", err);
}
},
};
}
let incoming_event = context_manager.write().await.event_queue.pop_front();
if let Some(event) = incoming_event {
if let Err(err) = self._handle_event(event, context_manager.clone()).await {
eprintln!("{:?}", err);
}
}
Ok(())
}
}
#[macro_export]
macro_rules! init_event_handler {
(
R: $response:ty,
E: $error:ty $(,)?
{
$(
$event:ty: [$($handler:expr $(=>($($injectable:ident $(( $($arg:ident),* ))? ),*))?),* $(,)? ]
),*
$(,)?
}
) =>{
pub fn event_handler() -> &'static ::ruva::prelude::TEventHandler<$response, $error> {
extern crate self as current_crate;
static EVENT_HANDLER: ::std::sync::OnceLock<::ruva::prelude::TEventHandler<$response, $error>> = std::sync::OnceLock::new();
EVENT_HANDLER.get_or_init(||{
use current_crate::dependencies;
let mut _map : ::ruva::prelude::TEventHandler<$response, $error> = ::ruva::prelude::HandlerMapper::new();
$(
_map.insert(
stringify!($event).into(),
vec![
$(
Box::new(
|e:Box<dyn Message>, context_manager: ::ruva::prelude::AtomicContextManager| -> std::pin::Pin<Box<dyn futures::Future<Output = Result<$response, $error>> + Send>>{
#[allow(unused)]
macro_rules! matcher{
($a:ident)=>{
context_manager.clone()
}
}
Box::pin($handler(
*e.downcast::<$event>().expect("Not Convertible!"),
$(
$(dependencies::$injectable( $( $(matcher!($arg)),*)?),)*
)?
))
}
),
)*
]
);
)*
_map
})
}
};
(
E: $error:ty,
R: $response:ty $(,)?
{
$(
$event:ty: [$($handler:expr $(=>($($injectable:ident $(( $($arg:ident),* ))? ),*))?),* $(,)? ]
),*
$(,)?
}
) =>{
init_event_handler!(
R:$response,E:$error,
{
$(
$event: [$($handler $(=>($($injectable $(( $($arg),* ))? ),*))?),* ]
),*
}
)
}
}
#[macro_export]
macro_rules! init_command {
(
R: $response:ty,
E: $error:ty $(,)?
{
$(
$command:ty:$handler:expr $(=>($($injectable:ident $(( $($arg:ident),* ))? ),*))?
),*
$(,)?
}
)
=> {
pub fn command_handler() -> &'static ruva::prelude::TCommandHandler<$response, $error> {
extern crate self as current_crate;
static COMMAND_HANDLER: ::std::sync::OnceLock<ruva::prelude::TCommandHandler<$response, $error>> = std::sync::OnceLock::new();
COMMAND_HANDLER.get_or_init(||{
use current_crate::dependencies;
let mut _map: ruva::prelude::TCommandHandler<$response,$error>= ::ruva::prelude::TCommandHandler::new();
$(
_map.insert(
std::any::TypeId::of::<$command>(),
Box::new( |c:Box<dyn std::any::Any+Send+Sync>, context_manager: ::ruva::prelude::AtomicContextManager|->std::pin::Pin<Box<dyn futures::Future<Output = Result<$response, $error>> + Send>> {
#[allow(unused)]
macro_rules! matcher{
($a:ident)=>{
context_manager.clone()
}
}
Box::pin($handler(
*c.downcast::<$command>().unwrap(),
$(
$(dependencies::$injectable( $( $(matcher!($arg)),*)?),)*
)?
))
}
),
);
)*
_map
})
}
};
(
E: $error:ty,
R: $response:ty $(,)?
{
$(
$command:ty:$handler:expr $(=>($($injectable:ident $(( $($arg:ident),* ))? ),*))?
),*
$(,)?
}
) =>{
init_command!(
R:$response,E:$error
{
$(
$command:$handler $(=>($($injectable $(( $($arg),* ))? ),*))?
),*
}
)
}
}