reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync,
8 sync::Arc,
9};
10
11use reifydb_runtime::actor::{
12 context::Context,
13 mailbox::ActorRef,
14 system::ActorSystem,
15 traits::{Actor, Directive},
16};
17use sync::mpsc::Sender;
18
19pub mod flow;
20pub mod lifecycle;
21#[macro_use]
22pub mod r#macro;
23pub mod metric;
24pub mod procedure;
25pub mod store;
26pub mod transaction;
27
28pub trait Event: Any + Send + Sync + Clone + 'static {
29 fn as_any(&self) -> &dyn Any;
30 fn into_any(self) -> Box<dyn Any + Send>;
31}
32
33pub trait EventListener<E>: Send + Sync + 'static
34where
35 E: Event,
36{
37 fn on(&self, event: &E);
38}
39
40trait EventListenerList: Any + Send + Sync {
41 fn on_any(&self, event: Box<dyn Any + Send>);
42 fn as_any_mut(&mut self) -> &mut dyn Any;
43}
44
45struct EventListenerListImpl<E> {
46 listeners: Vec<Arc<dyn EventListener<E>>>,
47}
48
49impl<E> EventListenerListImpl<E>
50where
51 E: Event,
52{
53 fn new() -> Self {
54 Self {
55 listeners: Vec::new(),
56 }
57 }
58
59 fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
60 self.listeners.push(listener);
61 }
62}
63
64impl<E> EventListenerList for EventListenerListImpl<E>
65where
66 E: Event,
67{
68 fn on_any(&self, event: Box<dyn Any + Send>) {
69 if let Ok(event) = event.downcast::<E>() {
70 for listener in &self.listeners {
71 listener.on(&*event);
72 }
73 }
74 }
75
76 fn as_any_mut(&mut self) -> &mut dyn Any {
77 self
78 }
79}
80
81struct EventEnvelope {
84 type_id: TypeId,
85 event: Box<dyn Any + Send>,
86}
87
88enum EventBusMsg {
89 Emit(EventEnvelope),
90 Register {
91 installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>,
92 },
93 WaitForCompletion(Sender<()>),
94}
95
96struct EventBusActor;
97
98impl Actor for EventBusActor {
99 type State = HashMap<TypeId, Box<dyn EventListenerList>>;
100 type Message = EventBusMsg;
101
102 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
103 HashMap::new()
104 }
105
106 fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
107 match msg {
108 EventBusMsg::Emit(envelope) => {
109 if let Some(list) = state.get(&envelope.type_id) {
110 list.on_any(envelope.event);
111 }
112 }
113 EventBusMsg::Register {
114 installer,
115 } => {
116 installer(state);
117 }
118 EventBusMsg::WaitForCompletion(tx) => {
119 let _ = tx.send(());
120 }
121 }
122 Directive::Continue
123 }
124}
125
126#[derive(Clone)]
127pub struct EventBus {
128 actor_ref: ActorRef<EventBusMsg>,
129 _actor_system: ActorSystem,
130}
131
132impl EventBus {
133 pub fn new(actor_system: &ActorSystem) -> Self {
134 let handle = actor_system.spawn("event-bus", EventBusActor);
135 Self {
136 actor_ref: handle.actor_ref().clone(),
137 _actor_system: actor_system.clone(),
138 }
139 }
140
141 pub fn register<E, L>(&self, listener: L)
142 where
143 E: Event,
144 L: EventListener<E>,
145 {
146 let type_id = TypeId::of::<E>();
147 let listener = Arc::new(listener);
148
149 let installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send> =
150 Box::new(move |map| {
151 let list = map
152 .entry(type_id)
153 .or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
154 list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
155 });
156
157 let _ = self.actor_ref.send(EventBusMsg::Register {
158 installer,
159 });
160 }
161
162 pub fn emit<E>(&self, event: E)
163 where
164 E: Event,
165 {
166 let type_id = TypeId::of::<E>();
167 let _ = self.actor_ref.send(EventBusMsg::Emit(EventEnvelope {
168 type_id,
169 event: event.into_any(),
170 }));
171 }
172
173 pub fn wait_for_completion(&self) {
174 let (tx, rx) = sync::mpsc::channel();
175 let _ = self.actor_ref.send(EventBusMsg::WaitForCompletion(tx));
176 let _ = rx.recv();
177 }
178}
179
180#[cfg(test)]
181pub mod tests {
182 use std::{
183 sync::{Arc, Mutex},
184 thread,
185 };
186
187 use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
188
189 use crate::event::{Event, EventBus, EventListener};
190
191 fn test_actor_system() -> ActorSystem {
192 ActorSystem::new(SharedRuntimeConfig::default().actor_system_config())
193 }
194
195 define_event! {
196 pub struct TestEvent{}
197 }
198
199 define_event! {
200 pub struct AnotherEvent{}
201 }
202
203 #[derive(Default, Debug, Clone)]
204 pub struct TestEventListener(Arc<TestHandlerInner>);
205
206 #[derive(Default, Debug)]
207 pub struct TestHandlerInner {
208 pub counter: Arc<Mutex<i32>>,
209 }
210
211 impl EventListener<TestEvent> for TestEventListener {
212 fn on(&self, _event: &TestEvent) {
213 let mut x = self.0.counter.lock().unwrap();
214 *x += 1;
215 }
216 }
217
218 impl EventListener<AnotherEvent> for TestEventListener {
219 fn on(&self, _event: &AnotherEvent) {
220 let mut x = self.0.counter.lock().unwrap();
221 *x *= 2;
222 }
223 }
224
225 #[test]
226 fn test_event_bus_new() {
227 let actor_system = test_actor_system();
228 let event_bus = EventBus::new(&actor_system);
229 event_bus.emit(TestEvent::new());
230 event_bus.wait_for_completion();
231 }
232
233 #[test]
234 fn test_register_single_listener() {
235 let actor_system = test_actor_system();
236 let event_bus = EventBus::new(&actor_system);
237 let listener = TestEventListener::default();
238
239 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
240 event_bus.emit(TestEvent::new());
241 event_bus.wait_for_completion();
242 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
243 }
244
245 #[test]
246 fn test_emit_unregistered_event() {
247 let actor_system = test_actor_system();
248 let event_bus = EventBus::new(&actor_system);
249 event_bus.emit(TestEvent::new());
250 event_bus.wait_for_completion();
251 }
252
253 #[test]
254 fn test_multiple_listeners_same_event() {
255 let actor_system = test_actor_system();
256 let event_bus = EventBus::new(&actor_system);
257 let listener1 = TestEventListener::default();
258 let listener2 = TestEventListener::default();
259
260 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
261 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
262
263 event_bus.emit(TestEvent::new());
264 event_bus.wait_for_completion();
265 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
266 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
267 }
268
269 #[test]
270 fn test_event_bus_clone() {
271 let actor_system = test_actor_system();
272 let event_bus1 = EventBus::new(&actor_system);
273 let listener = TestEventListener::default();
274 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
275
276 let event_bus2 = event_bus1.clone();
277 event_bus2.emit(TestEvent::new());
278 event_bus2.wait_for_completion();
279 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
280 }
281
282 #[test]
283 fn test_concurrent_registration() {
284 let actor_system = test_actor_system();
285 let event_bus = Arc::new(EventBus::new(&actor_system));
286 let mut handles = Vec::new();
287
288 for _ in 0..10 {
289 let event_bus = event_bus.clone();
290 handles.push(thread::spawn(move || {
291 let listener = TestEventListener::default();
292 event_bus.register::<TestEvent, TestEventListener>(listener);
293 }));
294 }
295
296 for handle in handles {
297 handle.join().unwrap();
298 }
299
300 event_bus.emit(TestEvent::new());
301 event_bus.wait_for_completion();
302 }
303
304 #[test]
305 fn test_concurrent_emitting() {
306 let actor_system = test_actor_system();
307 let event_bus = Arc::new(EventBus::new(&actor_system));
308 let listener = TestEventListener::default();
309 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
310 event_bus.wait_for_completion();
311
312 let mut handles = Vec::new();
313
314 for _ in 0..10 {
315 let event_bus = event_bus.clone();
316 handles.push(thread::spawn(move || {
317 event_bus.emit(TestEvent::new());
318 }));
319 }
320
321 for handle in handles {
322 handle.join().unwrap();
323 }
324
325 event_bus.wait_for_completion();
326 assert_eq!(*listener.0.counter.lock().unwrap(), 10);
327 }
328
329 define_event! {
330 pub struct MacroTestEvent {
331 pub value: i32,
332 }
333 }
334
335 #[test]
336 fn test_define_event_macro() {
337 let event = MacroTestEvent::new(42);
338 let any_ref = event.as_any();
339 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
340 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
341 }
342
343 #[test]
344 fn test_multi_event_listener() {
345 let actor_system = test_actor_system();
346 let event_bus = EventBus::new(&actor_system);
347 let listener = TestEventListener::default();
348
349 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
350 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
351
352 event_bus.emit(TestEvent::new());
354 event_bus.wait_for_completion();
355 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
356
357 event_bus.emit(TestEvent::new());
358 event_bus.wait_for_completion();
359 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
360
361 event_bus.emit(AnotherEvent::new());
362 event_bus.wait_for_completion();
363 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
365}