event_driven_core/
messagebus.rs1use crate::prelude::{Command, Message};
2use crate::responses::{ApplicationError, ApplicationResponse, BaseError};
3use async_recursion::async_recursion;
4use hashbrown::HashMap;
5use std::collections::VecDeque;
6use std::ops::{Deref, DerefMut};
7use std::{
8 any::{Any, TypeId},
9 pin::Pin,
10 sync::Arc,
11};
12use tokio::sync::RwLock;
13pub type Future<T, E> = Pin<Box<dyn futures::Future<Output = Result<T, E>> + Send>>;
14pub type AtomicContextManager = Arc<RwLock<ContextManager>>;
15pub type TCommandHandler<R, E> = HashMap<TypeId, Box<dyn Fn(Box<dyn Any + Send + Sync>, AtomicContextManager) -> Future<R, E> + Send + Sync>>;
16pub type TEventHandler<R, E> = HashMap<String, Vec<Box<dyn Fn(Box<dyn Message>, AtomicContextManager) -> Future<R, E> + Send + Sync>>>;
17
18pub struct ContextManager {
22 pub event_queue: VecDeque<Box<dyn Message>>,
23}
24
25impl ContextManager {
26 pub fn new() -> AtomicContextManager {
28 Arc::new(RwLock::new(Self { event_queue: VecDeque::new() }))
29 }
30}
31
32impl Deref for ContextManager {
33 type Target = VecDeque<Box<dyn Message>>;
34 fn deref(&self) -> &Self::Target {
35 &self.event_queue
36 }
37}
38impl DerefMut for ContextManager {
39 fn deref_mut(&mut self) -> &mut Self::Target {
40 &mut self.event_queue
41 }
42}
43
44pub struct MessageBus<R: ApplicationResponse, E: ApplicationError> {
45 command_handler: &'static TCommandHandler<R, E>,
46 event_handler: &'static TEventHandler<R, E>,
47}
48
49impl<R, E> MessageBus<R, E>
50where
51 R: ApplicationResponse + std::marker::Send,
52 E: ApplicationError + std::convert::From<crate::responses::BaseError> + std::convert::Into<crate::responses::BaseError> + std::marker::Send,
53{
54 pub fn new(command_handler: &'static TCommandHandler<R, E>, event_handler: &'static TEventHandler<R, E>) -> Arc<Self> {
55 Self { command_handler, event_handler }.into()
56 }
57
58 pub async fn handle<C>(&self, message: C) -> Result<R, E>
59 where
60 C: Command,
61 {
62 println!("Handle Command {:?}", message);
63 let context_manager = ContextManager::new();
64
65 let res = self.command_handler.get(&message.type_id()).ok_or_else(|| {
66 eprintln!("Unprocessable Command Given!");
67 BaseError::NotFound
68 })?(Box::new(message), context_manager.clone())
69 .await?;
70
71 if !context_manager.read().await.event_queue.is_empty() {
73 let event = context_manager.write().await.event_queue.pop_front();
74 let _ = self._handle_event(event.unwrap(), context_manager.clone()).await;
75 }
76
77 Ok(res)
78 }
79
80 pub async fn handle_event(&self, msg: Box<dyn Message>) -> Result<(), E> {
82 let context_manager = ContextManager::new();
83 self._handle_event(msg, context_manager.clone()).await
84 }
85
86 #[async_recursion]
87 async fn _handle_event(&self, msg: Box<dyn Message>, context_manager: AtomicContextManager) -> Result<(), E> {
88 println!("Handle Event : {:?}", msg);
91
92 let handlers = self.event_handler.get(&msg.metadata().topic).ok_or_else(|| {
93 eprintln!("Unprocessable Event Given! {:?}", msg);
94 BaseError::NotFound
95 })?;
96
97 for handler in handlers.iter() {
98 match handler(msg.message_clone(), context_manager.clone()).await {
99 Ok(_val) => {
100 eprintln!("Event Handling Succeeded!");
101 }
102
103 Err(err) => match err.into() {
105 BaseError::StopSentinel => {
106 eprintln!("Stop Sentinel Arrived!");
107
108 break;
109 }
110 BaseError::StopSentinelWithEvent(event) => {
111 eprintln!("Stop Sentinel With Event Arrived!");
112 context_manager.write().await.push_back(event);
113 break;
114 }
115 err => {
116 eprintln!("Error Occurred While Handling Event! Error:{:?}", err);
117 }
118 },
119 };
120 }
121
122 let event = context_manager.write().await.event_queue.pop_front();
124 if event.is_some() {
125 if let Err(err) = self._handle_event(event.unwrap(), context_manager.clone()).await {
126 eprintln!("{:?}", err);
128 }
129 }
130
131 Ok(())
132 }
133}
134
135#[macro_export]
138macro_rules! init_command_handler {
139 (
140 {$($command:ty:$handler:expr ),* $(,)?}
141 )
142 => {
143
144 pub fn command_handler() -> &'static TCommandHandler<ServiceResponse, ServiceError> {
145 extern crate self as current_crate;
146 static COMMAND_HANDLER: ::std::sync::OnceLock<TCommandHandler<ServiceResponse, ServiceError>> = OnceLock::new();
147
148 COMMAND_HANDLER.get_or_init(||{
149 let mut _map: TCommandHandler<ServiceResponse,ServiceError>= event_driven_library::prelude::HandlerMapper::new();
150 $(
151 _map.insert(
152 TypeId::of::<$command>(),
154
155 Box::new(|c:Box<dyn Any+Send+Sync>, context_manager: event_driven_library::prelude::AtomicContextManager|->Future<ServiceResponse,ServiceError> {
156 Box::pin($handler(
159 *c.downcast::<$command>().unwrap(),
160 context_manager,
161
162 ))
163 },
164 ));
165 )*
166 _map
167 })
168
169 }
170 };
171}
172
173#[macro_export]
176macro_rules! init_event_handler {
177 (
178 {$($event:ty: [$($handler:expr),* $(,)? ]),* $(,)?}
179 ) =>{
180 pub fn event_handler() -> &'static TEventHandler<ServiceResponse, ServiceError> {
181 extern crate self as current_crate;
182 static EVENT_HANDLER: ::std::sync::OnceLock<TEventHandler<ServiceResponse, ServiceError>> = OnceLock::new();
183 EVENT_HANDLER.get_or_init(||{
184 let mut _map : TEventHandler<ServiceResponse, ServiceError> = event_driven_library::prelude::HandlerMapper::new();
185 $(
186 _map.insert(
187 stringify!($event).into(),
188 vec![
189 $(
190 Box::new(
191 |e:Box<dyn Message>, context_manager:event_driven_library::prelude::AtomicContextManager| -> Future<ServiceResponse,ServiceError>{
192 Box::pin($handler(
193 *e.downcast::<$event>().expect("Not Convertible!"), context_manager,
197 ))
198 }
199 ),
200 )*
201 ]
202 );
203 )*
204 _map
205 })
206 }
207}}