ruva_core/bus_components/
contexts.rs

1use super::executor::TConnection;
2use crate::{make_smart_pointer, prelude::TEvent};
3use std::{collections::VecDeque, sync::Arc};
4
5/// Request Context Manager
6/// it lives as long as the request lives
7
8pub struct ContextManager {
9	pub event_queue: VecDeque<Arc<dyn TEvent>>,
10	pub conn: &'static dyn TConnection,
11}
12
13pub type AtomicContextManager = Arc<ContextManager>;
14
15impl ContextManager {
16	/// Creation of context manager returns context manager AND event receiver
17	pub fn new(conn: &'static dyn TConnection) -> Self {
18		Self { event_queue: VecDeque::new(), conn }
19	}
20
21	/// SAFETY: This is safe because we are sure this method is used only in the context of command and event handling
22	pub(crate) fn get_mut<'a>(self: &Arc<Self>) -> &'a mut ContextManager {
23		unsafe { &mut *(Arc::as_ptr(self) as *mut ContextManager) }
24	}
25}
26
27make_smart_pointer!(ContextManager, VecDeque<Arc<dyn TEvent>>, event_queue);
28
29/// Local context
30/// it lasts only until logical unit of operation is done
31pub struct Context {
32	pub(crate) curr_events: VecDeque<std::sync::Arc<dyn TEvent>>,
33	pub(crate) super_ctx: AtomicContextManager,
34
35	#[cfg(feature = "sqlx-postgres")]
36	pub(crate) pg_transaction: Option<sqlx::Transaction<'static, sqlx::Postgres>>,
37}
38
39impl Context {
40	pub fn new(super_ctx: AtomicContextManager) -> Self {
41		Self {
42			curr_events: Default::default(),
43			super_ctx,
44			#[cfg(feature = "sqlx-postgres")]
45			pg_transaction: None,
46		}
47	}
48
49	pub fn event_hook(&mut self, aggregate: &mut impl crate::prelude::TAggregate) {
50		self.set_current_events(aggregate.take_events());
51	}
52
53	pub async fn send_internally_notifiable_messages(&mut self) {
54		// SAFETY: This is safe because we are sure that the context manager is not dropped
55
56		self.curr_events
57			.iter()
58			.filter(|e| e.internally_notifiable())
59			.for_each(|e| self.super_ctx.get_mut().push_back(e.clone()));
60	}
61}
62
63pub trait TSetCurrentEvents: Send + Sync {
64	fn set_current_events(&mut self, events: VecDeque<std::sync::Arc<dyn TEvent>>);
65}
66
67impl TSetCurrentEvents for Context {
68	fn set_current_events(&mut self, events: VecDeque<std::sync::Arc<dyn TEvent>>) {
69		self.curr_events.extend(events)
70	}
71}
72
73#[tokio::test]
74async fn test_context_managers() {
75	struct CustomConnection;
76	#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
77	struct CustomEvent(usize);
78	impl TEvent for CustomEvent {
79		fn externally_notifiable(&self) -> bool {
80			true
81		}
82		fn internally_notifiable(&self) -> bool {
83			true
84		}
85		fn state(&self) -> String {
86			"state".to_string()
87		}
88	}
89	impl TConnection for CustomConnection {}
90
91	async fn add_event_to_queue(context_manager: Arc<ContextManager>, order: usize) {
92		context_manager.get_mut().push_back(std::sync::Arc::new(CustomEvent(order)));
93	}
94
95	let context_manager = Arc::new(ContextManager::new(&CustomConnection));
96
97	let count = 10000000;
98	let futures = (0..count).map(|order| add_event_to_queue(Arc::clone(&context_manager), order));
99	futures::future::join_all(futures).await;
100
101	assert_eq!(context_manager.len(), count);
102	let events = context_manager.iter().map(|e| e.downcast_ref::<CustomEvent>().unwrap().0).collect::<Vec<_>>();
103	assert_eq!(events, (0..count).collect::<Vec<_>>());
104}