reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync,
8 sync::Arc,
9};
10
11use reifydb_runtime::actor::{
12 context::Context,
13 mailbox::ActorRef,
14 system::ActorSystem,
15 traits::{Actor, Directive},
16};
17use sync::mpsc::Sender;
18
19pub mod flow;
20pub mod lifecycle;
21#[macro_use]
22pub mod r#macro;
23pub mod metric;
24pub mod procedure;
25pub mod row;
26pub mod store;
27pub mod transaction;
28
29type EventListenerInstaller = Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>;
30
31pub trait Event: Any + Send + Sync + Clone + 'static {
32 fn as_any(&self) -> &dyn Any;
33 fn into_any(self) -> Box<dyn Any + Send>;
34}
35
36pub trait EventListener<E>: Send + Sync + 'static
37where
38 E: Event,
39{
40 fn on(&self, event: &E);
41}
42
43trait EventListenerList: Any + Send + Sync {
44 fn on_any(&self, event: Box<dyn Any + Send>);
45 fn as_any_mut(&mut self) -> &mut dyn Any;
46}
47
48struct EventListenerListImpl<E> {
49 listeners: Vec<Arc<dyn EventListener<E>>>,
50}
51
52impl<E> EventListenerListImpl<E>
53where
54 E: Event,
55{
56 fn new() -> Self {
57 Self {
58 listeners: Vec::new(),
59 }
60 }
61
62 fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
63 self.listeners.push(listener);
64 }
65}
66
67impl<E> EventListenerList for EventListenerListImpl<E>
68where
69 E: Event,
70{
71 fn on_any(&self, event: Box<dyn Any + Send>) {
72 if let Ok(event) = event.downcast::<E>() {
73 for listener in &self.listeners {
74 listener.on(&*event);
75 }
76 }
77 }
78
79 fn as_any_mut(&mut self) -> &mut dyn Any {
80 self
81 }
82}
83
84struct EventEnvelope {
85 type_id: TypeId,
86 event: Box<dyn Any + Send>,
87}
88
89enum EventBusMessage {
90 Emit(EventEnvelope),
91 Register {
92 installer: EventListenerInstaller,
93 },
94 WaitForCompletion(Sender<()>),
95}
96
97struct EventBusActor;
98
99impl Actor for EventBusActor {
100 type State = HashMap<TypeId, Box<dyn EventListenerList>>;
101 type Message = EventBusMessage;
102
103 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
104 HashMap::new()
105 }
106
107 fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
108 match msg {
109 EventBusMessage::Emit(envelope) => {
110 if let Some(list) = state.get(&envelope.type_id) {
111 list.on_any(envelope.event);
112 }
113 }
114 EventBusMessage::Register {
115 installer,
116 } => {
117 installer(state);
118 }
119 EventBusMessage::WaitForCompletion(tx) => {
120 let _ = tx.send(());
121 }
122 }
123 Directive::Continue
124 }
125}
126
127#[derive(Clone)]
128pub struct EventBus {
129 actor_ref: ActorRef<EventBusMessage>,
130 _actor_system: ActorSystem,
131}
132
133impl EventBus {
134 pub fn new(actor_system: &ActorSystem) -> Self {
135 let handle = actor_system.spawn("event-bus", EventBusActor);
136 Self {
137 actor_ref: handle.actor_ref().clone(),
138 _actor_system: actor_system.clone(),
139 }
140 }
141
142 pub fn register<E, L>(&self, listener: L)
143 where
144 E: Event,
145 L: EventListener<E>,
146 {
147 let type_id = TypeId::of::<E>();
148 let listener = Arc::new(listener);
149
150 let installer: EventListenerInstaller = Box::new(move |map| {
151 let list = map.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
152 list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
153 });
154
155 let _ = self.actor_ref.send(EventBusMessage::Register {
156 installer,
157 });
158 }
159
160 pub fn emit<E>(&self, event: E)
161 where
162 E: Event,
163 {
164 let type_id = TypeId::of::<E>();
165 let _ = self.actor_ref.send(EventBusMessage::Emit(EventEnvelope {
166 type_id,
167 event: event.into_any(),
168 }));
169 }
170
171 pub fn wait_for_completion(&self) {
172 let (tx, rx) = sync::mpsc::channel();
173 let _ = self.actor_ref.send(EventBusMessage::WaitForCompletion(tx));
174 let _ = rx.recv();
175 }
176}
177
178#[cfg(test)]
179pub mod tests {
180 use std::{
181 sync::{Arc, Mutex},
182 thread,
183 };
184
185 use reifydb_runtime::{
186 actor::system::ActorSystem,
187 context::clock::Clock,
188 pool::{PoolConfig, Pools},
189 };
190
191 use crate::event::{Event, EventBus, EventListener};
192
193 fn test_actor_system() -> ActorSystem {
194 let pools = Pools::new(PoolConfig::default());
195 ActorSystem::new(pools, Clock::Real)
196 }
197
198 define_event! {
199 pub struct TestEvent{}
200 }
201
202 define_event! {
203 pub struct AnotherEvent{}
204 }
205
206 #[derive(Default, Debug, Clone)]
207 pub struct TestEventListener(Arc<TestHandlerInner>);
208
209 #[derive(Default, Debug)]
210 pub struct TestHandlerInner {
211 pub counter: Arc<Mutex<i32>>,
212 }
213
214 impl EventListener<TestEvent> for TestEventListener {
215 fn on(&self, _event: &TestEvent) {
216 let mut x = self.0.counter.lock().unwrap();
217 *x += 1;
218 }
219 }
220
221 impl EventListener<AnotherEvent> for TestEventListener {
222 fn on(&self, _event: &AnotherEvent) {
223 let mut x = self.0.counter.lock().unwrap();
224 *x *= 2;
225 }
226 }
227
228 #[test]
229 fn test_event_bus_new() {
230 let actor_system = test_actor_system();
231 let event_bus = EventBus::new(&actor_system);
232 event_bus.emit(TestEvent::new());
233 event_bus.wait_for_completion();
234 }
235
236 #[test]
237 fn test_register_single_listener() {
238 let actor_system = test_actor_system();
239 let event_bus = EventBus::new(&actor_system);
240 let listener = TestEventListener::default();
241
242 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
243 event_bus.emit(TestEvent::new());
244 event_bus.wait_for_completion();
245 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
246 }
247
248 #[test]
249 fn test_emit_unregistered_event() {
250 let actor_system = test_actor_system();
251 let event_bus = EventBus::new(&actor_system);
252 event_bus.emit(TestEvent::new());
253 event_bus.wait_for_completion();
254 }
255
256 #[test]
257 fn test_multiple_listeners_same_event() {
258 let actor_system = test_actor_system();
259 let event_bus = EventBus::new(&actor_system);
260 let listener1 = TestEventListener::default();
261 let listener2 = TestEventListener::default();
262
263 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
264 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
265
266 event_bus.emit(TestEvent::new());
267 event_bus.wait_for_completion();
268 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
269 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
270 }
271
272 #[test]
273 fn test_event_bus_clone() {
274 let actor_system = test_actor_system();
275 let event_bus1 = EventBus::new(&actor_system);
276 let listener = TestEventListener::default();
277 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
278
279 let event_bus2 = event_bus1.clone();
280 event_bus2.emit(TestEvent::new());
281 event_bus2.wait_for_completion();
282 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
283 }
284
285 #[test]
286 fn test_concurrent_registration() {
287 let actor_system = test_actor_system();
288 let event_bus = Arc::new(EventBus::new(&actor_system));
289 let mut handles = Vec::new();
290
291 for _ in 0..10 {
292 let event_bus = event_bus.clone();
293 handles.push(thread::spawn(move || {
294 let listener = TestEventListener::default();
295 event_bus.register::<TestEvent, TestEventListener>(listener);
296 }));
297 }
298
299 for handle in handles {
300 handle.join().unwrap();
301 }
302
303 event_bus.emit(TestEvent::new());
304 event_bus.wait_for_completion();
305 }
306
307 #[test]
308 fn test_concurrent_emitting() {
309 let actor_system = test_actor_system();
310 let event_bus = Arc::new(EventBus::new(&actor_system));
311 let listener = TestEventListener::default();
312 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
313 event_bus.wait_for_completion();
314
315 let mut handles = Vec::new();
316
317 for _ in 0..10 {
318 let event_bus = event_bus.clone();
319 handles.push(thread::spawn(move || {
320 event_bus.emit(TestEvent::new());
321 }));
322 }
323
324 for handle in handles {
325 handle.join().unwrap();
326 }
327
328 event_bus.wait_for_completion();
329 assert_eq!(*listener.0.counter.lock().unwrap(), 10);
330 }
331
332 define_event! {
333 pub struct MacroTestEvent {
334 pub value: i32,
335 }
336 }
337
338 #[test]
339 fn testine_event_macro() {
340 let event = MacroTestEvent::new(42);
341 let any_ref = event.as_any();
342 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
343 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
344 }
345
346 #[test]
347 fn test_multi_event_listener() {
348 let actor_system = test_actor_system();
349 let event_bus = EventBus::new(&actor_system);
350 let listener = TestEventListener::default();
351
352 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
353 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
354
355 event_bus.emit(TestEvent::new());
357 event_bus.wait_for_completion();
358 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
359
360 event_bus.emit(TestEvent::new());
361 event_bus.wait_for_completion();
362 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
363
364 event_bus.emit(AnotherEvent::new());
365 event_bus.wait_for_completion();
366 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
368}