event_driven_core/
messagebus.rs

1use crate::prelude::{Command, Message};
2use crate::responses::{ApplicationError, ApplicationResponse, BaseError};
3use async_recursion::async_recursion;
4use hashbrown::HashMap;
5use std::collections::VecDeque;
6use std::ops::{Deref, DerefMut};
7use std::{
8	any::{Any, TypeId},
9	pin::Pin,
10	sync::Arc,
11};
12use tokio::sync::RwLock;
13pub type Future<T, E> = Pin<Box<dyn futures::Future<Output = Result<T, E>> + Send>>;
14pub type AtomicContextManager = Arc<RwLock<ContextManager>>;
15pub type TCommandHandler<R, E> = HashMap<TypeId, Box<dyn Fn(Box<dyn Any + Send + Sync>, AtomicContextManager) -> Future<R, E> + Send + Sync>>;
16pub type TEventHandler<R, E> = HashMap<String, Vec<Box<dyn Fn(Box<dyn Message>, AtomicContextManager) -> Future<R, E> + Send + Sync>>>;
17
18/// Task Local Context Manager
19/// This is called for every time Messagebus.handle is invoked within which it manages events raised in service.
20/// It spawns out Executor that manages transaction.
21pub struct ContextManager {
22	pub event_queue: VecDeque<Box<dyn Message>>,
23}
24
25impl ContextManager {
26	/// Creation of context manager returns context manager AND event receiver
27	pub fn new() -> AtomicContextManager {
28		Arc::new(RwLock::new(Self { event_queue: VecDeque::new() }))
29	}
30}
31
32impl Deref for ContextManager {
33	type Target = VecDeque<Box<dyn Message>>;
34	fn deref(&self) -> &Self::Target {
35		&self.event_queue
36	}
37}
38impl DerefMut for ContextManager {
39	fn deref_mut(&mut self) -> &mut Self::Target {
40		&mut self.event_queue
41	}
42}
43
44pub struct MessageBus<R: ApplicationResponse, E: ApplicationError> {
45	command_handler: &'static TCommandHandler<R, E>,
46	event_handler: &'static TEventHandler<R, E>,
47}
48
49impl<R, E> MessageBus<R, E>
50where
51	R: ApplicationResponse + std::marker::Send,
52	E: ApplicationError + std::convert::From<crate::responses::BaseError> + std::convert::Into<crate::responses::BaseError> + std::marker::Send,
53{
54	pub fn new(command_handler: &'static TCommandHandler<R, E>, event_handler: &'static TEventHandler<R, E>) -> Arc<Self> {
55		Self { command_handler, event_handler }.into()
56	}
57
58	pub async fn handle<C>(&self, message: C) -> Result<R, E>
59	where
60		C: Command,
61	{
62		println!("Handle Command {:?}", message);
63		let context_manager = ContextManager::new();
64
65		let res = self.command_handler.get(&message.type_id()).ok_or_else(|| {
66			eprintln!("Unprocessable Command Given!");
67			BaseError::NotFound
68		})?(Box::new(message), context_manager.clone())
69		.await?;
70
71		// Trigger event
72		if !context_manager.read().await.event_queue.is_empty() {
73			let event = context_manager.write().await.event_queue.pop_front();
74			let _ = self._handle_event(event.unwrap(), context_manager.clone()).await;
75		}
76
77		Ok(res)
78	}
79
80	// Thin Wrapper
81	pub async fn handle_event(&self, msg: Box<dyn Message>) -> Result<(), E> {
82		let context_manager = ContextManager::new();
83		self._handle_event(msg, context_manager.clone()).await
84	}
85
86	#[async_recursion]
87	async fn _handle_event(&self, msg: Box<dyn Message>, context_manager: AtomicContextManager) -> Result<(), E> {
88		// ! msg.topic() returns the name of event. It is crucial that it corresponds to the key registered on Event Handler.
89
90		println!("Handle Event : {:?}", msg);
91
92		let handlers = self.event_handler.get(&msg.metadata().topic).ok_or_else(|| {
93			eprintln!("Unprocessable Event Given! {:?}", msg);
94			BaseError::NotFound
95		})?;
96
97		for handler in handlers.iter() {
98			match handler(msg.message_clone(), context_manager.clone()).await {
99				Ok(_val) => {
100					eprintln!("Event Handling Succeeded!");
101				}
102
103				// ! Safety:: BaseError Must Be Enforced To Be Accepted As Variant On ServiceError
104				Err(err) => match err.into() {
105					BaseError::StopSentinel => {
106						eprintln!("Stop Sentinel Arrived!");
107
108						break;
109					}
110					BaseError::StopSentinelWithEvent(event) => {
111						eprintln!("Stop Sentinel With Event Arrived!");
112						context_manager.write().await.push_back(event);
113						break;
114					}
115					err => {
116						eprintln!("Error Occurred While Handling Event! Error:{:?}", err);
117					}
118				},
119			};
120		}
121
122		// Resursive case
123		let event = context_manager.write().await.event_queue.pop_front();
124		if event.is_some() {
125			if let Err(err) = self._handle_event(event.unwrap(), context_manager.clone()).await {
126				// ! Safety:: BaseError Must Be Enforced To Be Accepted As Variant On ServiceError
127				eprintln!("{:?}", err);
128			}
129		}
130
131		Ok(())
132	}
133}
134
135/// init_command_handler creating macro
136/// Not that crate must have `Dependency` struct with its own implementation
137#[macro_export]
138macro_rules! init_command_handler {
139    (
140        {$($command:ty:$handler:expr ),* $(,)?}
141	)
142        => {
143
144		pub fn command_handler() -> &'static TCommandHandler<ServiceResponse, ServiceError> {
145			extern crate self as current_crate;
146			static COMMAND_HANDLER: ::std::sync::OnceLock<TCommandHandler<ServiceResponse, ServiceError>> = OnceLock::new();
147
148			COMMAND_HANDLER.get_or_init(||{
149				let mut _map: TCommandHandler<ServiceResponse,ServiceError>= event_driven_library::prelude::HandlerMapper::new();
150				$(
151					_map.insert(
152						// ! Only one command per one handler is acceptable, so the later insertion override preceding one.
153						TypeId::of::<$command>(),
154
155							Box::new(|c:Box<dyn Any+Send+Sync>, context_manager: event_driven_library::prelude::AtomicContextManager|->Future<ServiceResponse,ServiceError> {
156								// * Convert event so event handler accepts not Box<dyn Message> but `event_happend` type of message.
157								// ! Logically, as it's from TypId of command, it doesn't make to cause an error.
158								Box::pin($handler(
159									*c.downcast::<$command>().unwrap(),
160									context_manager,
161
162								))
163							},
164					));
165				)*
166				_map
167			})
168
169		}
170    };
171}
172
173/// init_event_handler creating macro
174/// Not that crate must have `Dependency` struct with its own implementation
175#[macro_export]
176macro_rules! init_event_handler {
177    (
178        {$($event:ty: [$($handler:expr),* $(,)? ]),* $(,)?}
179    ) =>{
180		pub fn event_handler() -> &'static TEventHandler<ServiceResponse, ServiceError>  {
181			extern crate self as current_crate;
182			static EVENT_HANDLER: ::std::sync::OnceLock<TEventHandler<ServiceResponse, ServiceError>> = OnceLock::new();
183			EVENT_HANDLER.get_or_init(||{
184            let mut _map : TEventHandler<ServiceResponse, ServiceError> = event_driven_library::prelude::HandlerMapper::new();
185            $(
186                _map.insert(
187                    stringify!($event).into(),
188                    vec![
189                        $(
190                            Box::new(
191                                |e:Box<dyn Message>, context_manager:event_driven_library::prelude::AtomicContextManager| -> Future<ServiceResponse,ServiceError>{
192                                    Box::pin($handler(
193                                        // * Convert event so event handler accepts not Box<dyn Message> but `event_happend` type of message.
194                                        // Safety:: client should access this vector of handlers by providing the corresponding event name
195                                        // So, when it is followed, it logically doesn't make sense to cause an error.
196                                        *e.downcast::<$event>().expect("Not Convertible!"), context_manager,
197                                    ))
198                                }
199                                ),
200                        )*
201                    ]
202                );
203            )*
204            _map
205        })
206    }
207}}