reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync,
8 sync::Arc,
9};
10
11use reifydb_runtime::actor::{
12 context::Context,
13 mailbox::ActorRef,
14 system::ActorSystem,
15 traits::{Actor, Directive},
16};
17use sync::mpsc::Sender;
18
19pub mod flow;
20pub mod lifecycle;
21#[macro_use]
22pub mod r#macro;
23pub mod metric;
24pub mod procedure;
25pub mod store;
26pub mod transaction;
27
28pub trait Event: Any + Send + Sync + Clone + 'static {
29 fn as_any(&self) -> &dyn Any;
30 fn into_any(self) -> Box<dyn Any + Send>;
31}
32
33pub trait EventListener<E>: Send + Sync + 'static
34where
35 E: Event,
36{
37 fn on(&self, event: &E);
38}
39
40trait EventListenerList: Any + Send + Sync {
41 fn on_any(&self, event: Box<dyn Any + Send>);
42 fn as_any_mut(&mut self) -> &mut dyn Any;
43}
44
45struct EventListenerListImpl<E> {
46 listeners: Vec<Arc<dyn EventListener<E>>>,
47}
48
49impl<E> EventListenerListImpl<E>
50where
51 E: Event,
52{
53 fn new() -> Self {
54 Self {
55 listeners: Vec::new(),
56 }
57 }
58
59 fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
60 self.listeners.push(listener);
61 }
62}
63
64impl<E> EventListenerList for EventListenerListImpl<E>
65where
66 E: Event,
67{
68 fn on_any(&self, event: Box<dyn Any + Send>) {
69 if let Ok(event) = event.downcast::<E>() {
70 for listener in &self.listeners {
71 listener.on(&*event);
72 }
73 }
74 }
75
76 fn as_any_mut(&mut self) -> &mut dyn Any {
77 self
78 }
79}
80
81struct EventEnvelope {
82 type_id: TypeId,
83 event: Box<dyn Any + Send>,
84}
85
86enum EventBusMsg {
87 Emit(EventEnvelope),
88 Register {
89 installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>,
90 },
91 WaitForCompletion(Sender<()>),
92}
93
94struct EventBusActor;
95
96impl Actor for EventBusActor {
97 type State = HashMap<TypeId, Box<dyn EventListenerList>>;
98 type Message = EventBusMsg;
99
100 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
101 HashMap::new()
102 }
103
104 fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
105 match msg {
106 EventBusMsg::Emit(envelope) => {
107 if let Some(list) = state.get(&envelope.type_id) {
108 list.on_any(envelope.event);
109 }
110 }
111 EventBusMsg::Register {
112 installer,
113 } => {
114 installer(state);
115 }
116 EventBusMsg::WaitForCompletion(tx) => {
117 let _ = tx.send(());
118 }
119 }
120 Directive::Continue
121 }
122}
123
124#[derive(Clone)]
125pub struct EventBus {
126 actor_ref: ActorRef<EventBusMsg>,
127 _actor_system: ActorSystem,
128}
129
130impl EventBus {
131 pub fn new(actor_system: &ActorSystem) -> Self {
132 let handle = actor_system.spawn("event-bus", EventBusActor);
133 Self {
134 actor_ref: handle.actor_ref().clone(),
135 _actor_system: actor_system.clone(),
136 }
137 }
138
139 pub fn register<E, L>(&self, listener: L)
140 where
141 E: Event,
142 L: EventListener<E>,
143 {
144 let type_id = TypeId::of::<E>();
145 let listener = Arc::new(listener);
146
147 let installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send> =
148 Box::new(move |map| {
149 let list = map
150 .entry(type_id)
151 .or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
152 list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
153 });
154
155 let _ = self.actor_ref.send(EventBusMsg::Register {
156 installer,
157 });
158 }
159
160 pub fn emit<E>(&self, event: E)
161 where
162 E: Event,
163 {
164 let type_id = TypeId::of::<E>();
165 let _ = self.actor_ref.send(EventBusMsg::Emit(EventEnvelope {
166 type_id,
167 event: event.into_any(),
168 }));
169 }
170
171 pub fn wait_for_completion(&self) {
172 let (tx, rx) = sync::mpsc::channel();
173 let _ = self.actor_ref.send(EventBusMsg::WaitForCompletion(tx));
174 let _ = rx.recv();
175 }
176}
177
178#[cfg(test)]
179pub mod tests {
180 use std::{
181 sync::{Arc, Mutex},
182 thread,
183 };
184
185 use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
186
187 use crate::event::{Event, EventBus, EventListener};
188
189 fn test_actor_system() -> ActorSystem {
190 ActorSystem::new(SharedRuntimeConfig::default().actor_system_config())
191 }
192
193 define_event! {
194 pub struct TestEvent{}
195 }
196
197 define_event! {
198 pub struct AnotherEvent{}
199 }
200
201 #[derive(Default, Debug, Clone)]
202 pub struct TestEventListener(Arc<TestHandlerInner>);
203
204 #[derive(Default, Debug)]
205 pub struct TestHandlerInner {
206 pub counter: Arc<Mutex<i32>>,
207 }
208
209 impl EventListener<TestEvent> for TestEventListener {
210 fn on(&self, _event: &TestEvent) {
211 let mut x = self.0.counter.lock().unwrap();
212 *x += 1;
213 }
214 }
215
216 impl EventListener<AnotherEvent> for TestEventListener {
217 fn on(&self, _event: &AnotherEvent) {
218 let mut x = self.0.counter.lock().unwrap();
219 *x *= 2;
220 }
221 }
222
223 #[test]
224 fn test_event_bus_new() {
225 let actor_system = test_actor_system();
226 let event_bus = EventBus::new(&actor_system);
227 event_bus.emit(TestEvent::new());
228 event_bus.wait_for_completion();
229 }
230
231 #[test]
232 fn test_register_single_listener() {
233 let actor_system = test_actor_system();
234 let event_bus = EventBus::new(&actor_system);
235 let listener = TestEventListener::default();
236
237 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
238 event_bus.emit(TestEvent::new());
239 event_bus.wait_for_completion();
240 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
241 }
242
243 #[test]
244 fn test_emit_unregistered_event() {
245 let actor_system = test_actor_system();
246 let event_bus = EventBus::new(&actor_system);
247 event_bus.emit(TestEvent::new());
248 event_bus.wait_for_completion();
249 }
250
251 #[test]
252 fn test_multiple_listeners_same_event() {
253 let actor_system = test_actor_system();
254 let event_bus = EventBus::new(&actor_system);
255 let listener1 = TestEventListener::default();
256 let listener2 = TestEventListener::default();
257
258 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
259 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
260
261 event_bus.emit(TestEvent::new());
262 event_bus.wait_for_completion();
263 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
264 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
265 }
266
267 #[test]
268 fn test_event_bus_clone() {
269 let actor_system = test_actor_system();
270 let event_bus1 = EventBus::new(&actor_system);
271 let listener = TestEventListener::default();
272 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
273
274 let event_bus2 = event_bus1.clone();
275 event_bus2.emit(TestEvent::new());
276 event_bus2.wait_for_completion();
277 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
278 }
279
280 #[test]
281 fn test_concurrent_registration() {
282 let actor_system = test_actor_system();
283 let event_bus = Arc::new(EventBus::new(&actor_system));
284 let mut handles = Vec::new();
285
286 for _ in 0..10 {
287 let event_bus = event_bus.clone();
288 handles.push(thread::spawn(move || {
289 let listener = TestEventListener::default();
290 event_bus.register::<TestEvent, TestEventListener>(listener);
291 }));
292 }
293
294 for handle in handles {
295 handle.join().unwrap();
296 }
297
298 event_bus.emit(TestEvent::new());
299 event_bus.wait_for_completion();
300 }
301
302 #[test]
303 fn test_concurrent_emitting() {
304 let actor_system = test_actor_system();
305 let event_bus = Arc::new(EventBus::new(&actor_system));
306 let listener = TestEventListener::default();
307 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
308 event_bus.wait_for_completion();
309
310 let mut handles = Vec::new();
311
312 for _ in 0..10 {
313 let event_bus = event_bus.clone();
314 handles.push(thread::spawn(move || {
315 event_bus.emit(TestEvent::new());
316 }));
317 }
318
319 for handle in handles {
320 handle.join().unwrap();
321 }
322
323 event_bus.wait_for_completion();
324 assert_eq!(*listener.0.counter.lock().unwrap(), 10);
325 }
326
327 define_event! {
328 pub struct MacroTestEvent {
329 pub value: i32,
330 }
331 }
332
333 #[test]
334 fn testine_event_macro() {
335 let event = MacroTestEvent::new(42);
336 let any_ref = event.as_any();
337 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
338 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
339 }
340
341 #[test]
342 fn test_multi_event_listener() {
343 let actor_system = test_actor_system();
344 let event_bus = EventBus::new(&actor_system);
345 let listener = TestEventListener::default();
346
347 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
348 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
349
350 event_bus.emit(TestEvent::new());
352 event_bus.wait_for_completion();
353 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
354
355 event_bus.emit(TestEvent::new());
356 event_bus.wait_for_completion();
357 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
358
359 event_bus.emit(AnotherEvent::new());
360 event_bus.wait_for_completion();
361 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
363}