ruva_core/bus_components/
messagebus.rs1use super::contexts::*;
22use super::executor::TConnection;
23use super::handler::EventHandlers;
24use crate::prelude::{TCommand, TEvent};
25use crate::responses::{self, ApplicationError, ApplicationResponse, BaseError};
26use async_recursion::async_recursion;
27use async_trait::async_trait;
28use std::sync::Arc;
29
30pub type TEventHandler<E> = hashbrown::HashMap<String, EventHandlers<E>>;
32
33#[async_trait]
34pub trait TEventBus<E> {
35 fn event_handler(&self) -> &'static TEventHandler<E>;
36}
37
38#[async_recursion]
40async fn handle_event<E>(msg: Arc<dyn TEvent>, context_manager: AtomicContextManager, event_handler: &'static TEventHandler<E>) -> Result<AtomicContextManager, E>
41where
42 E: ApplicationError + std::convert::From<crate::responses::BaseError> + std::convert::From<E>,
43 crate::responses::BaseError: std::convert::From<E>,
44{
45 #[cfg(feature = "tracing")]
47 {
48 tracing::info!("Processing {}...", msg.metadata().topic);
49 }
50
51 let handlers = event_handler.get(&msg.metadata().topic).ok_or_else(|| {
52 tracing::error!("Unprocessable Event Given! {:?}", msg);
53 BaseError::NotFound
54 })?;
55
56 match handlers {
57 EventHandlers::Sync(h) => {
58 for (i, handler) in h.iter().enumerate() {
59 if let Err(err) = handler(msg.clone(), Arc::clone(&context_manager)).await {
60 match err.into() {
62 BaseError::StopSentinel => {
63 let error_msg = format!("Stop Sentinel Arrived In {i}th Event!");
64 crate::backtrace_error!("{}", error_msg);
65 break;
66 }
67 BaseError::StopSentinelWithEvent(event) => {
68 let error_msg = format!("Stop Sentinel With Event Arrived In {i}th Event!");
69 crate::backtrace_error!("{}", error_msg);
70 context_manager.get_mut().push_back(event);
71 break;
72 }
73 err => {
74 let error_msg = format!("Error Occurred While Handling Event In {i}th Event! Error:{:?}", err);
75 crate::backtrace_error!("{}", error_msg);
76 }
77 }
78 }
79 }
80 }
81 EventHandlers::Async(h) => {
82 let futures = h.iter().map(|handler| handler(msg.clone(), Arc::clone(&context_manager)));
83 if let Err(err) = futures::future::try_join_all(futures).await {
84 let error_msg = format!("Error Occurred While Handling Event! Error:{:?}", err);
85 crate::backtrace_error!("{}", error_msg);
86 }
87 }
88 }
89
90 let incoming_event = context_manager.get_mut().pop_front();
92
93 if let Some(event) = incoming_event {
94 if let Err(err) = handle_event(event, Arc::clone(&context_manager), event_handler).await {
95 tracing::error!("{:?}", err);
97 }
98 }
99 Ok(context_manager)
100}
101
102pub trait TCommandService<R, E>: Send + Sync {
104 fn execute(self) -> impl std::future::Future<Output = Result<R, E>> + Send;
105}
106
107#[async_trait]
108pub trait TMessageBus<R, E, C>: TEventBus<E>
109where
110 responses::BaseError: std::convert::From<E>,
111 R: ApplicationResponse,
112 E: ApplicationError + std::convert::From<crate::responses::BaseError>,
113 C: TCommand,
114{
115 fn command_handler(&self, context_manager: AtomicContextManager, cmd: C) -> impl TCommandService<R, E>;
116
117 async fn execute_and_wait(&self, message: C, conn: &'static dyn TConnection) -> Result<R, E> {
124 #[cfg(feature = "tracing")]
125 {
126 tracing::info!("{}", std::any::type_name::<C>());
127 }
128
129 let context_manager = Arc::new(ContextManager::new(conn));
130 let res = self.command_handler(Arc::clone(&context_manager), message).execute().await?;
131
132 if !context_manager.event_queue.is_empty() {
134 let event = context_manager.get_mut().pop_front();
135 handle_event(event.unwrap(), Arc::clone(&context_manager), self.event_handler()).await?;
136 }
137 Ok(res)
138 }
139
140 async fn execute_and_forget(&self, message: C, conn: &'static dyn TConnection) -> Result<CommandResponseWithEventFutures<R, E>, E> {
148 #[cfg(feature = "tracing")]
149 {
150 tracing::info!("{}", std::any::type_name::<C>());
151 }
152
153 let context_manager = Arc::new(ContextManager::new(conn));
154 let res = self.command_handler(Arc::clone(&context_manager), message).execute().await?;
155 let mut res = CommandResponseWithEventFutures { result: res, join_handler: None };
156
157 if !context_manager.event_queue.is_empty() {
159 let event = context_manager.get_mut().pop_front().unwrap();
160
161 res.join_handler = Some(tokio::spawn(handle_event(event, context_manager, self.event_handler())));
162 }
163 Ok(res)
164 }
165}
166
167pub struct CommandResponseWithEventFutures<T, E> {
168 result: T,
169 join_handler: Option<tokio::task::JoinHandle<std::result::Result<AtomicContextManager, E>>>,
170}
171impl<T, E> CommandResponseWithEventFutures<T, E>
172where
173 responses::BaseError: std::convert::From<E>,
174 T: ApplicationResponse,
175 E: ApplicationError + std::convert::From<crate::responses::BaseError>,
176{
177 pub async fn wait_until_event_processing_done(mut self) -> Result<Self, E> {
178 if let Some(join_handler) = self.join_handler.take() {
179 join_handler.await.map_err(|err| {
180 tracing::error!("{:?}", err);
181 BaseError::ServiceError
182 })??;
183 }
184 Ok(self)
185 }
186 pub fn result(self) -> T {
187 self.result
188 }
189}
190
191#[macro_export]
206macro_rules! init_event_handler {
207 (
208 $E:ty,
209 $event_handler :expr,
210 $(
211 $(#[$asynchrony:ident])?
212 $event:ty:[$($handler:ident $(=>($($injectable:ident $(( $($arg:ident),* ))? ),*))?),* $(,)? ]
213 ),*
214 $(,)?
215
216 ) =>{
217 pub(crate) static EVENT_HANDLERS: std::sync::LazyLock<ruva::TEventHandler<$E>> = std::sync::LazyLock::new(
218 ||{
219 let mut _map : ::ruva::TEventHandler<$E> = ::ruva::HandlerMapper::new();
220 $(
221
222 let mut handlers = if stringify!($($asynchrony)?) == "async" {
223 ::ruva::EventHandlers::Async(vec![])
224 } else {
225 ::ruva::EventHandlers::Sync(vec![])
226 };
227 handlers.extend(vec![
228 $(
229 Box::new(
230 |e: ::std::sync::Arc<dyn ::ruva::TEvent>, context_manager: ruva::AtomicContextManager | -> ::ruva::Future<$E> {
231 let event_handler = $event_handler(context_manager);
232 Box::pin(event_handler.$handler(
233 e.downcast_ref::<$event>().expect("Not Convertible!").clone(),
237 ))
238 }
239 ),
240 )*
241 ]);
242 _map.insert(
243 stringify!($event).into(),
244 handlers
245 );
246 )*
247 _map
248 }
249 );
250
251 impl ruva::TEventBus<$E> for ::ruva::MessageBus{
252 fn event_handler(&self) -> &'static ruva::TEventHandler<$E>{
253 &EVENT_HANDLERS
254 }
255 }
256
257 };
258
259}
260
261pub struct MessageBus;