ruva_core/bus_components/
messagebus.rs

1//! # Message Bus
2//! ### example
3//! ```rust,no_run
4//! impl ruva::TMessageBus<YourResponse,YourErrorError,YourCommand> for MessageBus{
5//!     fn command_handler(
6//!         &self,
7//!         context_manager: ruva::AtomicContextManager,
8//!         cmd: YourCommand,
9//!
10//!     ) -> impl ruva::TCommandService<YourResponse, YourError> {
11//!             LoggingAspect(
12//!                 UnitOfWorkHandler::new(
13//!                 ::ruva::SqlRepository::new(context_manager),
14//!                 cmd
15//!             )
16//!       )
17//!     }
18//! }
19//! ```
20
21use super::contexts::*;
22use super::executor::TConnection;
23use super::handler::EventHandlers;
24use crate::prelude::{TCommand, TEvent};
25use crate::responses::{self, ApplicationError, ApplicationResponse, BaseError};
26use async_recursion::async_recursion;
27use async_trait::async_trait;
28use std::sync::Arc;
29
30/// Event handlers `TEventBus` work on
31pub type TEventHandler<E> = hashbrown::HashMap<String, EventHandlers<E>>;
32
33#[async_trait]
34pub trait TEventBus<E> {
35	fn event_handler(&self) -> &'static TEventHandler<E>;
36}
37
38/// This function is used to handle event. It is called recursively until there is no event left in the queue.
39#[async_recursion]
40async fn handle_event<E>(msg: Arc<dyn TEvent>, context_manager: AtomicContextManager, event_handler: &'static TEventHandler<E>) -> Result<AtomicContextManager, E>
41where
42	E: ApplicationError + std::convert::From<crate::responses::BaseError> + std::convert::From<E>,
43	crate::responses::BaseError: std::convert::From<E>,
44{
45	// ! msg.topic returns the name of event. It is crucial that it corresponds to the key registered on Event Handler.
46	#[cfg(feature = "tracing")]
47	{
48		tracing::info!("Processing {}...", msg.metadata().topic);
49	}
50
51	let handlers = event_handler.get(&msg.metadata().topic).ok_or_else(|| {
52		tracing::error!("Unprocessable Event Given! {:?}", msg);
53		BaseError::NotFound
54	})?;
55
56	match handlers {
57		EventHandlers::Sync(h) => {
58			for (i, handler) in h.iter().enumerate() {
59				if let Err(err) = handler(msg.clone(), Arc::clone(&context_manager)).await {
60					// ! Safety:: BaseError Must Be Enforced To Be Accepted As Variant On ServiceError
61					match err.into() {
62						BaseError::StopSentinel => {
63							let error_msg = format!("Stop Sentinel Arrived In {i}th Event!");
64							crate::backtrace_error!("{}", error_msg);
65							break;
66						}
67						BaseError::StopSentinelWithEvent(event) => {
68							let error_msg = format!("Stop Sentinel With Event Arrived In {i}th Event!");
69							crate::backtrace_error!("{}", error_msg);
70							context_manager.get_mut().push_back(event);
71							break;
72						}
73						err => {
74							let error_msg = format!("Error Occurred While Handling Event In {i}th Event! Error:{:?}", err);
75							crate::backtrace_error!("{}", error_msg);
76						}
77					}
78				}
79			}
80		}
81		EventHandlers::Async(h) => {
82			let futures = h.iter().map(|handler| handler(msg.clone(), Arc::clone(&context_manager)));
83			if let Err(err) = futures::future::try_join_all(futures).await {
84				let error_msg = format!("Error Occurred While Handling Event! Error:{:?}", err);
85				crate::backtrace_error!("{}", error_msg);
86			}
87		}
88	}
89
90	// Resursive case
91	let incoming_event = context_manager.get_mut().pop_front();
92
93	if let Some(event) = incoming_event {
94		if let Err(err) = handle_event(event, Arc::clone(&context_manager), event_handler).await {
95			// ! Safety:: BaseError Must Be Enforced To Be Accepted As Variant On ServiceError
96			tracing::error!("{:?}", err);
97		}
98	}
99	Ok(context_manager)
100}
101
102/// Interface for messagebus to work on
103pub trait TCommandService<R, E>: Send + Sync {
104	fn execute(self) -> impl std::future::Future<Output = Result<R, E>> + Send;
105}
106
107#[async_trait]
108pub trait TMessageBus<R, E, C>: TEventBus<E>
109where
110	responses::BaseError: std::convert::From<E>,
111	R: ApplicationResponse,
112	E: ApplicationError + std::convert::From<crate::responses::BaseError>,
113	C: TCommand,
114{
115	fn command_handler(&self, context_manager: AtomicContextManager, cmd: C) -> impl TCommandService<R, E>;
116
117	/// This method is used to handle command and return result.
118	/// ## Example
119	/// ```rust,no_run
120	/// let res = service.execute_and_wait(message).await?;
121	/// ```
122
123	async fn execute_and_wait(&self, message: C, conn: &'static dyn TConnection) -> Result<R, E> {
124		#[cfg(feature = "tracing")]
125		{
126			tracing::info!("{}", std::any::type_name::<C>());
127		}
128
129		let context_manager = Arc::new(ContextManager::new(conn));
130		let res = self.command_handler(Arc::clone(&context_manager), message).execute().await?;
131
132		// Trigger event handler
133		if !context_manager.event_queue.is_empty() {
134			let event = context_manager.get_mut().pop_front();
135			handle_event(event.unwrap(), Arc::clone(&context_manager), self.event_handler()).await?;
136		}
137		Ok(res)
138	}
139
140	/// This method is used to handle command and return result proxy which holds the result and join handler.
141	/// ## Example
142	/// ```rust,no_run
143	/// let res = service.execute_and_forget(message).await?;
144	/// let res = res.wait_until_event_processing_done().await?;
145	/// let res = res.result();
146	/// ```
147	async fn execute_and_forget(&self, message: C, conn: &'static dyn TConnection) -> Result<CommandResponseWithEventFutures<R, E>, E> {
148		#[cfg(feature = "tracing")]
149		{
150			tracing::info!("{}", std::any::type_name::<C>());
151		}
152
153		let context_manager = Arc::new(ContextManager::new(conn));
154		let res = self.command_handler(Arc::clone(&context_manager), message).execute().await?;
155		let mut res = CommandResponseWithEventFutures { result: res, join_handler: None };
156
157		// Trigger event handler
158		if !context_manager.event_queue.is_empty() {
159			let event = context_manager.get_mut().pop_front().unwrap();
160
161			res.join_handler = Some(tokio::spawn(handle_event(event, context_manager, self.event_handler())));
162		}
163		Ok(res)
164	}
165}
166
167pub struct CommandResponseWithEventFutures<T, E> {
168	result: T,
169	join_handler: Option<tokio::task::JoinHandle<std::result::Result<AtomicContextManager, E>>>,
170}
171impl<T, E> CommandResponseWithEventFutures<T, E>
172where
173	responses::BaseError: std::convert::From<E>,
174	T: ApplicationResponse,
175	E: ApplicationError + std::convert::From<crate::responses::BaseError>,
176{
177	pub async fn wait_until_event_processing_done(mut self) -> Result<Self, E> {
178		if let Some(join_handler) = self.join_handler.take() {
179			join_handler.await.map_err(|err| {
180				tracing::error!("{:?}", err);
181				BaseError::ServiceError
182			})??;
183		}
184		Ok(self)
185	}
186	pub fn result(self) -> T {
187		self.result
188	}
189}
190
191/// This macro is used to create event handler for each event.
192/// ## Example
193/// ```rust,no_run
194///
195/// init_event_handler!(
196///     YourServiceError,
197///     |ctx| YourEventHandler(ApplicationRepository::new(ctx)),
198///     #[async]
199///     YourEvent:[handler1, handler2],
200///     YourEvent2:[handler3, handler4],
201/// );
202/// ```
203///
204
205#[macro_export]
206macro_rules! init_event_handler {
207    (
208		$E:ty,
209		$event_handler :expr,
210			$(
211				$(#[$asynchrony:ident])?
212				$event:ty:[$($handler:ident $(=>($($injectable:ident $(( $($arg:ident),* ))? ),*))?),* $(,)? ]
213			),*
214			$(,)?
215
216    ) =>{
217		pub(crate) static EVENT_HANDLERS: std::sync::LazyLock<ruva::TEventHandler<$E>> = std::sync::LazyLock::new(
218			||{
219				let mut _map : ::ruva::TEventHandler<$E> = ::ruva::HandlerMapper::new();
220				$(
221
222				let mut handlers = if stringify!($($asynchrony)?) == "async" {
223					::ruva::EventHandlers::Async(vec![])
224				} else {
225					::ruva::EventHandlers::Sync(vec![])
226				};
227				handlers.extend(vec![
228					$(
229						Box::new(
230							|e: ::std::sync::Arc<dyn ::ruva::TEvent>, context_manager: ruva::AtomicContextManager | -> ::ruva::Future<$E> {
231								let event_handler = $event_handler(context_manager);
232								Box::pin(event_handler.$handler(
233									// * Convert event so event handler accepts not Arc<dyn TEvent> but `event_happend` type of message.
234									// Safety:: client should access this vector of handlers by providing the corresponding event name
235									// So, when it is followed, it logically doesn't make sense to cause an error.
236									e.downcast_ref::<$event>().expect("Not Convertible!").clone(),
237								))
238							}
239						),
240					)*
241				]);
242                _map.insert(
243                    stringify!($event).into(),
244					handlers
245                );
246            )*
247            _map
248			}
249		);
250
251		impl ruva::TEventBus<$E> for ::ruva::MessageBus{
252			fn event_handler(&self) -> &'static ruva::TEventHandler<$E>{
253				&EVENT_HANDLERS
254			}
255		}
256
257	};
258
259}
260
261pub struct MessageBus;