reifydb_core/event/
mod.rs1use std::{
16 any::{Any, TypeId},
17 collections::HashMap,
18 sync,
19 sync::Arc,
20};
21
22use reifydb_runtime::actor::{
23 context::Context,
24 mailbox::ActorRef,
25 system::ActorSystem,
26 traits::{Actor, Directive},
27};
28use sync::mpsc::Sender;
29
30pub mod flow;
31pub mod lifecycle;
32#[macro_use]
33pub mod r#macro;
34pub mod metric;
35pub mod procedure;
36pub mod row;
37pub mod store;
38pub mod transaction;
39
40type EventListenerInstaller = Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>;
41
42pub trait Event: Any + Send + Sync + Clone + 'static {
43 fn as_any(&self) -> &dyn Any;
44 fn into_any(self) -> Box<dyn Any + Send>;
45}
46
47pub trait EventListener<E>: Send + Sync + 'static
48where
49 E: Event,
50{
51 fn on(&self, event: &E);
52}
53
54trait EventListenerList: Any + Send + Sync {
55 fn on_any(&self, event: Box<dyn Any + Send>);
56 fn as_any_mut(&mut self) -> &mut dyn Any;
57}
58
59struct EventListenerListImpl<E> {
60 listeners: Vec<Arc<dyn EventListener<E>>>,
61}
62
63impl<E> EventListenerListImpl<E>
64where
65 E: Event,
66{
67 fn new() -> Self {
68 Self {
69 listeners: Vec::new(),
70 }
71 }
72
73 fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
74 self.listeners.push(listener);
75 }
76}
77
78impl<E> EventListenerList for EventListenerListImpl<E>
79where
80 E: Event,
81{
82 fn on_any(&self, event: Box<dyn Any + Send>) {
83 if let Ok(event) = event.downcast::<E>() {
84 for listener in &self.listeners {
85 listener.on(&*event);
86 }
87 }
88 }
89
90 fn as_any_mut(&mut self) -> &mut dyn Any {
91 self
92 }
93}
94
95struct EventEnvelope {
96 type_id: TypeId,
97 event: Box<dyn Any + Send>,
98}
99
100enum EventBusMessage {
101 Emit(EventEnvelope),
102 Register {
103 installer: EventListenerInstaller,
104 },
105 WaitForCompletion(Sender<()>),
106}
107
108struct EventBusActor;
109
110impl Actor for EventBusActor {
111 type State = HashMap<TypeId, Box<dyn EventListenerList>>;
112 type Message = EventBusMessage;
113
114 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
115 HashMap::new()
116 }
117
118 fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
119 match msg {
120 EventBusMessage::Emit(envelope) => {
121 if let Some(list) = state.get(&envelope.type_id) {
122 list.on_any(envelope.event);
123 }
124 }
125 EventBusMessage::Register {
126 installer,
127 } => {
128 installer(state);
129 }
130 EventBusMessage::WaitForCompletion(tx) => {
131 let _ = tx.send(());
132 }
133 }
134 Directive::Continue
135 }
136}
137
138#[derive(Clone)]
139pub struct EventBus {
140 actor_ref: ActorRef<EventBusMessage>,
141 _actor_system: ActorSystem,
142}
143
144impl EventBus {
145 pub fn new(actor_system: &ActorSystem) -> Self {
146 let handle = actor_system.spawn_system("event-bus", EventBusActor);
147 Self {
148 actor_ref: handle.actor_ref().clone(),
149 _actor_system: actor_system.clone(),
150 }
151 }
152
153 pub fn register<E, L>(&self, listener: L)
154 where
155 E: Event,
156 L: EventListener<E>,
157 {
158 let type_id = TypeId::of::<E>();
159 let listener = Arc::new(listener);
160
161 let installer: EventListenerInstaller = Box::new(move |map| {
162 let list = map.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
163 list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
164 });
165
166 let _ = self.actor_ref.send(EventBusMessage::Register {
167 installer,
168 });
169 }
170
171 pub fn emit<E>(&self, event: E)
172 where
173 E: Event,
174 {
175 let type_id = TypeId::of::<E>();
176 let _ = self.actor_ref.send(EventBusMessage::Emit(EventEnvelope {
177 type_id,
178 event: event.into_any(),
179 }));
180 }
181
182 pub fn wait_for_completion(&self) {
183 let (tx, rx) = sync::mpsc::channel();
184 let _ = self.actor_ref.send(EventBusMessage::WaitForCompletion(tx));
185 let _ = rx.recv();
186 }
187}
188
189#[cfg(test)]
190pub mod tests {
191 use std::{
192 sync::{Arc, Mutex},
193 thread,
194 };
195
196 use reifydb_runtime::{
197 actor::system::ActorSystem,
198 context::clock::Clock,
199 pool::{PoolConfig, Pools},
200 };
201
202 use crate::event::{Event, EventBus, EventListener};
203
204 fn test_actor_system() -> ActorSystem {
205 let pools = Pools::new(PoolConfig::default());
206 ActorSystem::new(pools, Clock::Real)
207 }
208
209 define_event! {
210 pub struct TestEvent{}
211 }
212
213 define_event! {
214 pub struct AnotherEvent{}
215 }
216
217 #[derive(Default, Debug, Clone)]
218 pub struct TestEventListener(Arc<TestHandlerInner>);
219
220 #[derive(Default, Debug)]
221 pub struct TestHandlerInner {
222 pub counter: Arc<Mutex<i32>>,
223 }
224
225 impl EventListener<TestEvent> for TestEventListener {
226 fn on(&self, _event: &TestEvent) {
227 let mut x = self.0.counter.lock().unwrap();
228 *x += 1;
229 }
230 }
231
232 impl EventListener<AnotherEvent> for TestEventListener {
233 fn on(&self, _event: &AnotherEvent) {
234 let mut x = self.0.counter.lock().unwrap();
235 *x *= 2;
236 }
237 }
238
239 #[test]
240 fn test_event_bus_new() {
241 let actor_system = test_actor_system();
242 let event_bus = EventBus::new(&actor_system);
243 event_bus.emit(TestEvent::new());
244 event_bus.wait_for_completion();
245 }
246
247 #[test]
248 fn test_register_single_listener() {
249 let actor_system = test_actor_system();
250 let event_bus = EventBus::new(&actor_system);
251 let listener = TestEventListener::default();
252
253 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
254 event_bus.emit(TestEvent::new());
255 event_bus.wait_for_completion();
256 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
257 }
258
259 #[test]
260 fn test_emit_unregistered_event() {
261 let actor_system = test_actor_system();
262 let event_bus = EventBus::new(&actor_system);
263 event_bus.emit(TestEvent::new());
264 event_bus.wait_for_completion();
265 }
266
267 #[test]
268 fn test_multiple_listeners_same_event() {
269 let actor_system = test_actor_system();
270 let event_bus = EventBus::new(&actor_system);
271 let listener1 = TestEventListener::default();
272 let listener2 = TestEventListener::default();
273
274 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
275 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
276
277 event_bus.emit(TestEvent::new());
278 event_bus.wait_for_completion();
279 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
280 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
281 }
282
283 #[test]
284 fn test_event_bus_clone() {
285 let actor_system = test_actor_system();
286 let event_bus1 = EventBus::new(&actor_system);
287 let listener = TestEventListener::default();
288 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
289
290 let event_bus2 = event_bus1.clone();
291 event_bus2.emit(TestEvent::new());
292 event_bus2.wait_for_completion();
293 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
294 }
295
296 #[test]
297 fn test_concurrent_registration() {
298 let actor_system = test_actor_system();
299 let event_bus = Arc::new(EventBus::new(&actor_system));
300 let mut handles = Vec::new();
301
302 for _ in 0..10 {
303 let event_bus = event_bus.clone();
304 handles.push(thread::spawn(move || {
305 let listener = TestEventListener::default();
306 event_bus.register::<TestEvent, TestEventListener>(listener);
307 }));
308 }
309
310 for handle in handles {
311 handle.join().unwrap();
312 }
313
314 event_bus.emit(TestEvent::new());
315 event_bus.wait_for_completion();
316 }
317
318 #[test]
319 fn test_concurrent_emitting() {
320 let actor_system = test_actor_system();
321 let event_bus = Arc::new(EventBus::new(&actor_system));
322 let listener = TestEventListener::default();
323 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
324 event_bus.wait_for_completion();
325
326 let mut handles = Vec::new();
327
328 for _ in 0..10 {
329 let event_bus = event_bus.clone();
330 handles.push(thread::spawn(move || {
331 event_bus.emit(TestEvent::new());
332 }));
333 }
334
335 for handle in handles {
336 handle.join().unwrap();
337 }
338
339 event_bus.wait_for_completion();
340 assert_eq!(*listener.0.counter.lock().unwrap(), 10);
341 }
342
343 define_event! {
344 pub struct MacroTestEvent {
345 pub value: i32,
346 }
347 }
348
349 #[test]
350 fn testine_event_macro() {
351 let event = MacroTestEvent::new(42);
352 let any_ref = event.as_any();
353 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
354 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
355 }
356
357 #[test]
358 fn test_multi_event_listener() {
359 let actor_system = test_actor_system();
360 let event_bus = EventBus::new(&actor_system);
361 let listener = TestEventListener::default();
362
363 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
364 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
365
366 event_bus.emit(TestEvent::new());
368 event_bus.wait_for_completion();
369 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
370
371 event_bus.emit(TestEvent::new());
372 event_bus.wait_for_completion();
373 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
374
375 event_bus.emit(AnotherEvent::new());
376 event_bus.wait_for_completion();
377 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
379}