reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync::Arc,
8};
9
10use reifydb_runtime::actor::{
11 context::Context,
12 mailbox::ActorRef,
13 system::ActorSystem,
14 traits::{Actor, Directive},
15};
16
17pub mod flow;
18pub mod lifecycle;
19#[macro_use]
20pub mod r#macro;
21pub mod metric;
22pub mod store;
23pub mod transaction;
24
25pub trait Event: Any + Send + Sync + Clone + 'static {
26 fn as_any(&self) -> &dyn Any;
27 fn into_any(self) -> Box<dyn Any + Send>;
28}
29
30pub trait EventListener<E>: Send + Sync + 'static
31where
32 E: Event,
33{
34 fn on(&self, event: &E);
35}
36
37trait EventListenerList: Any + Send + Sync {
38 fn on_any(&self, event: Box<dyn Any + Send>);
39 fn as_any_mut(&mut self) -> &mut dyn Any;
40}
41
42struct EventListenerListImpl<E> {
43 listeners: Vec<Arc<dyn EventListener<E>>>,
44}
45
46impl<E> EventListenerListImpl<E>
47where
48 E: Event,
49{
50 fn new() -> Self {
51 Self {
52 listeners: Vec::new(),
53 }
54 }
55
56 fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
57 self.listeners.push(listener);
58 }
59}
60
61impl<E> EventListenerList for EventListenerListImpl<E>
62where
63 E: Event,
64{
65 fn on_any(&self, event: Box<dyn Any + Send>) {
66 if let Ok(event) = event.downcast::<E>() {
67 for listener in &self.listeners {
68 listener.on(&*event);
69 }
70 }
71 }
72
73 fn as_any_mut(&mut self) -> &mut dyn Any {
74 self
75 }
76}
77
78struct EventEnvelope {
81 type_id: TypeId,
82 event: Box<dyn Any + Send>,
83}
84
85enum EventBusMsg {
86 Emit(EventEnvelope),
87 Register {
88 installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>,
89 },
90 WaitForCompletion(std::sync::mpsc::Sender<()>),
91}
92
93struct EventBusActor;
94
95impl Actor for EventBusActor {
96 type State = HashMap<TypeId, Box<dyn EventListenerList>>;
97 type Message = EventBusMsg;
98
99 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
100 HashMap::new()
101 }
102
103 fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
104 match msg {
105 EventBusMsg::Emit(envelope) => {
106 if let Some(list) = state.get(&envelope.type_id) {
107 list.on_any(envelope.event);
108 }
109 }
110 EventBusMsg::Register {
111 installer,
112 } => {
113 installer(state);
114 }
115 EventBusMsg::WaitForCompletion(tx) => {
116 let _ = tx.send(());
117 }
118 }
119 Directive::Continue
120 }
121}
122
123#[derive(Clone)]
124pub struct EventBus {
125 actor_ref: ActorRef<EventBusMsg>,
126 _actor_system: ActorSystem,
127}
128
129impl EventBus {
130 pub fn new(actor_system: &ActorSystem) -> Self {
131 let handle = actor_system.spawn("event-bus", EventBusActor);
132 Self {
133 actor_ref: handle.actor_ref().clone(),
134 _actor_system: actor_system.clone(),
135 }
136 }
137
138 pub fn register<E, L>(&self, listener: L)
139 where
140 E: Event,
141 L: EventListener<E>,
142 {
143 let type_id = TypeId::of::<E>();
144 let listener = Arc::new(listener);
145
146 let installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send> =
147 Box::new(move |map| {
148 let list = map
149 .entry(type_id)
150 .or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
151 list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
152 });
153
154 let _ = self.actor_ref.send(EventBusMsg::Register {
155 installer,
156 });
157 }
158
159 pub fn emit<E>(&self, event: E)
160 where
161 E: Event,
162 {
163 let type_id = TypeId::of::<E>();
164 let _ = self.actor_ref.send(EventBusMsg::Emit(EventEnvelope {
165 type_id,
166 event: event.into_any(),
167 }));
168 }
169
170 pub fn wait_for_completion(&self) {
171 let (tx, rx) = std::sync::mpsc::channel();
172 let _ = self.actor_ref.send(EventBusMsg::WaitForCompletion(tx));
173 let _ = rx.recv();
174 }
175}
176
177#[cfg(test)]
178pub mod tests {
179 use std::sync::{Arc, Mutex};
180
181 use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
182
183 use crate::event::{Event, EventBus, EventListener};
184
185 fn test_actor_system() -> ActorSystem {
186 ActorSystem::new(SharedRuntimeConfig::default().actor_system_config())
187 }
188
189 define_event! {
190 pub struct TestEvent{}
191 }
192
193 define_event! {
194 pub struct AnotherEvent{}
195 }
196
197 #[derive(Default, Debug, Clone)]
198 pub struct TestEventListener(Arc<TestHandlerInner>);
199
200 #[derive(Default, Debug)]
201 pub struct TestHandlerInner {
202 pub counter: Arc<Mutex<i32>>,
203 }
204
205 impl EventListener<TestEvent> for TestEventListener {
206 fn on(&self, _event: &TestEvent) {
207 let mut x = self.0.counter.lock().unwrap();
208 *x += 1;
209 }
210 }
211
212 impl EventListener<AnotherEvent> for TestEventListener {
213 fn on(&self, _event: &AnotherEvent) {
214 let mut x = self.0.counter.lock().unwrap();
215 *x *= 2;
216 }
217 }
218
219 #[test]
220 fn test_event_bus_new() {
221 let actor_system = test_actor_system();
222 let event_bus = EventBus::new(&actor_system);
223 event_bus.emit(TestEvent::new());
224 event_bus.wait_for_completion();
225 }
226
227 #[test]
228 fn test_register_single_listener() {
229 let actor_system = test_actor_system();
230 let event_bus = EventBus::new(&actor_system);
231 let listener = TestEventListener::default();
232
233 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
234 event_bus.emit(TestEvent::new());
235 event_bus.wait_for_completion();
236 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
237 }
238
239 #[test]
240 fn test_emit_unregistered_event() {
241 let actor_system = test_actor_system();
242 let event_bus = EventBus::new(&actor_system);
243 event_bus.emit(TestEvent::new());
244 event_bus.wait_for_completion();
245 }
246
247 #[test]
248 fn test_multiple_listeners_same_event() {
249 let actor_system = test_actor_system();
250 let event_bus = EventBus::new(&actor_system);
251 let listener1 = TestEventListener::default();
252 let listener2 = TestEventListener::default();
253
254 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
255 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
256
257 event_bus.emit(TestEvent::new());
258 event_bus.wait_for_completion();
259 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
260 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
261 }
262
263 #[test]
264 fn test_event_bus_clone() {
265 let actor_system = test_actor_system();
266 let event_bus1 = EventBus::new(&actor_system);
267 let listener = TestEventListener::default();
268 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
269
270 let event_bus2 = event_bus1.clone();
271 event_bus2.emit(TestEvent::new());
272 event_bus2.wait_for_completion();
273 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
274 }
275
276 #[test]
277 fn test_concurrent_registration() {
278 let actor_system = test_actor_system();
279 let event_bus = Arc::new(EventBus::new(&actor_system));
280 let mut handles = Vec::new();
281
282 for _ in 0..10 {
283 let event_bus = event_bus.clone();
284 handles.push(std::thread::spawn(move || {
285 let listener = TestEventListener::default();
286 event_bus.register::<TestEvent, TestEventListener>(listener);
287 }));
288 }
289
290 for handle in handles {
291 handle.join().unwrap();
292 }
293
294 event_bus.emit(TestEvent::new());
295 event_bus.wait_for_completion();
296 }
297
298 #[test]
299 fn test_concurrent_emitting() {
300 let actor_system = test_actor_system();
301 let event_bus = Arc::new(EventBus::new(&actor_system));
302 let listener = TestEventListener::default();
303 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
304 event_bus.wait_for_completion();
305
306 let mut handles = Vec::new();
307
308 for _ in 0..10 {
309 let event_bus = event_bus.clone();
310 handles.push(std::thread::spawn(move || {
311 event_bus.emit(TestEvent::new());
312 }));
313 }
314
315 for handle in handles {
316 handle.join().unwrap();
317 }
318
319 event_bus.wait_for_completion();
320 assert_eq!(*listener.0.counter.lock().unwrap(), 10);
321 }
322
323 define_event! {
324 pub struct MacroTestEvent {
325 pub value: i32,
326 }
327 }
328
329 #[test]
330 fn test_define_event_macro() {
331 let event = MacroTestEvent::new(42);
332 let any_ref = event.as_any();
333 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
334 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
335 }
336
337 #[test]
338 fn test_multi_event_listener() {
339 let actor_system = test_actor_system();
340 let event_bus = EventBus::new(&actor_system);
341 let listener = TestEventListener::default();
342
343 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
344 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
345
346 event_bus.emit(TestEvent::new());
348 event_bus.wait_for_completion();
349 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
350
351 event_bus.emit(TestEvent::new());
352 event_bus.wait_for_completion();
353 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
354
355 event_bus.emit(AnotherEvent::new());
356 event_bus.wait_for_completion();
357 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
359}