ruva_core/bus_components/
contexts.rs1use super::executor::TConnection;
2use crate::{make_smart_pointer, prelude::TEvent};
3use std::{collections::VecDeque, sync::Arc};
4
5pub struct ContextManager {
9 pub event_queue: VecDeque<Arc<dyn TEvent>>,
10 pub conn: &'static dyn TConnection,
11}
12
13pub type AtomicContextManager = Arc<ContextManager>;
14
15impl ContextManager {
16 pub fn new(conn: &'static dyn TConnection) -> Self {
18 Self { event_queue: VecDeque::new(), conn }
19 }
20
21 pub(crate) fn get_mut<'a>(self: &Arc<Self>) -> &'a mut ContextManager {
23 unsafe { &mut *(Arc::as_ptr(self) as *mut ContextManager) }
24 }
25}
26
27make_smart_pointer!(ContextManager, VecDeque<Arc<dyn TEvent>>, event_queue);
28
29pub struct Context {
32 pub(crate) curr_events: VecDeque<std::sync::Arc<dyn TEvent>>,
33 pub(crate) super_ctx: AtomicContextManager,
34
35 #[cfg(feature = "sqlx-postgres")]
36 pub(crate) pg_transaction: Option<sqlx::Transaction<'static, sqlx::Postgres>>,
37}
38
39impl Context {
40 pub fn new(super_ctx: AtomicContextManager) -> Self {
41 Self {
42 curr_events: Default::default(),
43 super_ctx,
44 #[cfg(feature = "sqlx-postgres")]
45 pg_transaction: None,
46 }
47 }
48
49 pub fn event_hook(&mut self, aggregate: &mut impl crate::prelude::TAggregate) {
50 self.set_current_events(aggregate.take_events());
51 }
52
53 pub async fn send_internally_notifiable_messages(&mut self) {
54 self.curr_events
57 .iter()
58 .filter(|e| e.internally_notifiable())
59 .for_each(|e| self.super_ctx.get_mut().push_back(e.clone()));
60 }
61}
62
63pub trait TSetCurrentEvents: Send + Sync {
64 fn set_current_events(&mut self, events: VecDeque<std::sync::Arc<dyn TEvent>>);
65}
66
67impl TSetCurrentEvents for Context {
68 fn set_current_events(&mut self, events: VecDeque<std::sync::Arc<dyn TEvent>>) {
69 self.curr_events.extend(events)
70 }
71}
72
73#[tokio::test]
74async fn test_context_managers() {
75 struct CustomConnection;
76 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
77 struct CustomEvent(usize);
78 impl TEvent for CustomEvent {
79 fn externally_notifiable(&self) -> bool {
80 true
81 }
82 fn internally_notifiable(&self) -> bool {
83 true
84 }
85 fn state(&self) -> String {
86 "state".to_string()
87 }
88 }
89 impl TConnection for CustomConnection {}
90
91 async fn add_event_to_queue(context_manager: Arc<ContextManager>, order: usize) {
92 context_manager.get_mut().push_back(std::sync::Arc::new(CustomEvent(order)));
93 }
94
95 let context_manager = Arc::new(ContextManager::new(&CustomConnection));
96
97 let count = 10000000;
98 let futures = (0..count).map(|order| add_event_to_queue(Arc::clone(&context_manager), order));
99 futures::future::join_all(futures).await;
100
101 assert_eq!(context_manager.len(), count);
102 let events = context_manager.iter().map(|e| e.downcast_ref::<CustomEvent>().unwrap().0).collect::<Vec<_>>();
103 assert_eq!(events, (0..count).collect::<Vec<_>>());
104}