reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync,
8 sync::Arc,
9};
10
11use reifydb_runtime::actor::{
12 context::Context,
13 mailbox::ActorRef,
14 system::ActorSystem,
15 traits::{Actor, Directive},
16};
17use sync::mpsc::Sender;
18
19pub mod flow;
20pub mod lifecycle;
21#[macro_use]
22pub mod r#macro;
23pub mod metric;
24pub mod procedure;
25pub mod store;
26pub mod transaction;
27
28type EventListenerInstaller = Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>;
29
30pub trait Event: Any + Send + Sync + Clone + 'static {
31 fn as_any(&self) -> &dyn Any;
32 fn into_any(self) -> Box<dyn Any + Send>;
33}
34
35pub trait EventListener<E>: Send + Sync + 'static
36where
37 E: Event,
38{
39 fn on(&self, event: &E);
40}
41
42trait EventListenerList: Any + Send + Sync {
43 fn on_any(&self, event: Box<dyn Any + Send>);
44 fn as_any_mut(&mut self) -> &mut dyn Any;
45}
46
47struct EventListenerListImpl<E> {
48 listeners: Vec<Arc<dyn EventListener<E>>>,
49}
50
51impl<E> EventListenerListImpl<E>
52where
53 E: Event,
54{
55 fn new() -> Self {
56 Self {
57 listeners: Vec::new(),
58 }
59 }
60
61 fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
62 self.listeners.push(listener);
63 }
64}
65
66impl<E> EventListenerList for EventListenerListImpl<E>
67where
68 E: Event,
69{
70 fn on_any(&self, event: Box<dyn Any + Send>) {
71 if let Ok(event) = event.downcast::<E>() {
72 for listener in &self.listeners {
73 listener.on(&*event);
74 }
75 }
76 }
77
78 fn as_any_mut(&mut self) -> &mut dyn Any {
79 self
80 }
81}
82
83struct EventEnvelope {
84 type_id: TypeId,
85 event: Box<dyn Any + Send>,
86}
87
88enum EventBusMsg {
89 Emit(EventEnvelope),
90 Register {
91 installer: EventListenerInstaller,
92 },
93 WaitForCompletion(Sender<()>),
94}
95
96struct EventBusActor;
97
98impl Actor for EventBusActor {
99 type State = HashMap<TypeId, Box<dyn EventListenerList>>;
100 type Message = EventBusMsg;
101
102 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
103 HashMap::new()
104 }
105
106 fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
107 match msg {
108 EventBusMsg::Emit(envelope) => {
109 if let Some(list) = state.get(&envelope.type_id) {
110 list.on_any(envelope.event);
111 }
112 }
113 EventBusMsg::Register {
114 installer,
115 } => {
116 installer(state);
117 }
118 EventBusMsg::WaitForCompletion(tx) => {
119 let _ = tx.send(());
120 }
121 }
122 Directive::Continue
123 }
124}
125
126#[derive(Clone)]
127pub struct EventBus {
128 actor_ref: ActorRef<EventBusMsg>,
129 _actor_system: ActorSystem,
130}
131
132impl EventBus {
133 pub fn new(actor_system: &ActorSystem) -> Self {
134 let handle = actor_system.spawn("event-bus", EventBusActor);
135 Self {
136 actor_ref: handle.actor_ref().clone(),
137 _actor_system: actor_system.clone(),
138 }
139 }
140
141 pub fn register<E, L>(&self, listener: L)
142 where
143 E: Event,
144 L: EventListener<E>,
145 {
146 let type_id = TypeId::of::<E>();
147 let listener = Arc::new(listener);
148
149 let installer: EventListenerInstaller = Box::new(move |map| {
150 let list = map.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
151 list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
152 });
153
154 let _ = self.actor_ref.send(EventBusMsg::Register {
155 installer,
156 });
157 }
158
159 pub fn emit<E>(&self, event: E)
160 where
161 E: Event,
162 {
163 let type_id = TypeId::of::<E>();
164 let _ = self.actor_ref.send(EventBusMsg::Emit(EventEnvelope {
165 type_id,
166 event: event.into_any(),
167 }));
168 }
169
170 pub fn wait_for_completion(&self) {
171 let (tx, rx) = sync::mpsc::channel();
172 let _ = self.actor_ref.send(EventBusMsg::WaitForCompletion(tx));
173 let _ = rx.recv();
174 }
175}
176
177#[cfg(test)]
178pub mod tests {
179 use std::{
180 sync::{Arc, Mutex},
181 thread,
182 };
183
184 use reifydb_runtime::actor::system::ActorSystem;
185
186 use crate::event::{Event, EventBus, EventListener};
187
188 fn test_actor_system() -> ActorSystem {
189 ActorSystem::new(1)
190 }
191
192 define_event! {
193 pub struct TestEvent{}
194 }
195
196 define_event! {
197 pub struct AnotherEvent{}
198 }
199
200 #[derive(Default, Debug, Clone)]
201 pub struct TestEventListener(Arc<TestHandlerInner>);
202
203 #[derive(Default, Debug)]
204 pub struct TestHandlerInner {
205 pub counter: Arc<Mutex<i32>>,
206 }
207
208 impl EventListener<TestEvent> for TestEventListener {
209 fn on(&self, _event: &TestEvent) {
210 let mut x = self.0.counter.lock().unwrap();
211 *x += 1;
212 }
213 }
214
215 impl EventListener<AnotherEvent> for TestEventListener {
216 fn on(&self, _event: &AnotherEvent) {
217 let mut x = self.0.counter.lock().unwrap();
218 *x *= 2;
219 }
220 }
221
222 #[test]
223 fn test_event_bus_new() {
224 let actor_system = test_actor_system();
225 let event_bus = EventBus::new(&actor_system);
226 event_bus.emit(TestEvent::new());
227 event_bus.wait_for_completion();
228 }
229
230 #[test]
231 fn test_register_single_listener() {
232 let actor_system = test_actor_system();
233 let event_bus = EventBus::new(&actor_system);
234 let listener = TestEventListener::default();
235
236 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
237 event_bus.emit(TestEvent::new());
238 event_bus.wait_for_completion();
239 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
240 }
241
242 #[test]
243 fn test_emit_unregistered_event() {
244 let actor_system = test_actor_system();
245 let event_bus = EventBus::new(&actor_system);
246 event_bus.emit(TestEvent::new());
247 event_bus.wait_for_completion();
248 }
249
250 #[test]
251 fn test_multiple_listeners_same_event() {
252 let actor_system = test_actor_system();
253 let event_bus = EventBus::new(&actor_system);
254 let listener1 = TestEventListener::default();
255 let listener2 = TestEventListener::default();
256
257 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
258 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
259
260 event_bus.emit(TestEvent::new());
261 event_bus.wait_for_completion();
262 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
263 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
264 }
265
266 #[test]
267 fn test_event_bus_clone() {
268 let actor_system = test_actor_system();
269 let event_bus1 = EventBus::new(&actor_system);
270 let listener = TestEventListener::default();
271 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
272
273 let event_bus2 = event_bus1.clone();
274 event_bus2.emit(TestEvent::new());
275 event_bus2.wait_for_completion();
276 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
277 }
278
279 #[test]
280 fn test_concurrent_registration() {
281 let actor_system = test_actor_system();
282 let event_bus = Arc::new(EventBus::new(&actor_system));
283 let mut handles = Vec::new();
284
285 for _ in 0..10 {
286 let event_bus = event_bus.clone();
287 handles.push(thread::spawn(move || {
288 let listener = TestEventListener::default();
289 event_bus.register::<TestEvent, TestEventListener>(listener);
290 }));
291 }
292
293 for handle in handles {
294 handle.join().unwrap();
295 }
296
297 event_bus.emit(TestEvent::new());
298 event_bus.wait_for_completion();
299 }
300
301 #[test]
302 fn test_concurrent_emitting() {
303 let actor_system = test_actor_system();
304 let event_bus = Arc::new(EventBus::new(&actor_system));
305 let listener = TestEventListener::default();
306 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
307 event_bus.wait_for_completion();
308
309 let mut handles = Vec::new();
310
311 for _ in 0..10 {
312 let event_bus = event_bus.clone();
313 handles.push(thread::spawn(move || {
314 event_bus.emit(TestEvent::new());
315 }));
316 }
317
318 for handle in handles {
319 handle.join().unwrap();
320 }
321
322 event_bus.wait_for_completion();
323 assert_eq!(*listener.0.counter.lock().unwrap(), 10);
324 }
325
326 define_event! {
327 pub struct MacroTestEvent {
328 pub value: i32,
329 }
330 }
331
332 #[test]
333 fn testine_event_macro() {
334 let event = MacroTestEvent::new(42);
335 let any_ref = event.as_any();
336 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
337 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
338 }
339
340 #[test]
341 fn test_multi_event_listener() {
342 let actor_system = test_actor_system();
343 let event_bus = EventBus::new(&actor_system);
344 let listener = TestEventListener::default();
345
346 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
347 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
348
349 event_bus.emit(TestEvent::new());
351 event_bus.wait_for_completion();
352 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
353
354 event_bus.emit(TestEvent::new());
355 event_bus.wait_for_completion();
356 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
357
358 event_bus.emit(AnotherEvent::new());
359 event_bus.wait_for_completion();
360 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
362}