reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync::{Arc, RwLock},
8};
9
10use tracing::error;
11
12pub mod catalog;
13pub mod flow;
14pub mod lifecycle;
15pub mod transaction;
16
17pub trait Event: Any + Send + Sync + 'static {
18 fn as_any(&self) -> &dyn Any;
19}
20
21pub trait EventListener<E>: Send + Sync + 'static
22where
23 E: Event,
24{
25 fn on(&self, event: &E);
26}
27
28trait EventListenerList: Any + Send + Sync {
29 fn on_any(&self, event: &dyn Any);
30 fn as_any_mut(&mut self) -> &mut dyn Any;
31}
32
33struct EventListenerListImpl<E> {
34 listeners: RwLock<Vec<Box<dyn EventListener<E>>>>,
35}
36
37impl<E> EventListenerListImpl<E>
38where
39 E: Event,
40{
41 fn new() -> Self {
42 Self {
43 listeners: RwLock::new(Vec::new()),
44 }
45 }
46
47 fn add(&mut self, listener: Box<dyn EventListener<E>>) {
48 self.listeners.write().unwrap().push(listener);
49 }
50}
51
52impl<E> EventListenerList for EventListenerListImpl<E>
53where
54 E: Event,
55{
56 fn on_any(&self, event: &dyn Any) {
57 if let Some(event) = event.downcast_ref::<E>() {
58 for listener in self.listeners.read().unwrap().iter() {
59 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
62 listener.on(event);
63 }));
64 if let Err(_) = result {
65 error!("Event listener panicked for event type {}", std::any::type_name::<E>());
66 }
67 }
68 }
69 }
70
71 fn as_any_mut(&mut self) -> &mut dyn Any {
72 self
73 }
74}
75
76#[derive(Clone)]
77pub struct EventBus {
78 listeners: Arc<RwLock<HashMap<TypeId, Box<dyn EventListenerList>>>>,
79}
80
81impl Default for EventBus {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl EventBus {
88 pub fn new() -> Self {
89 Self {
90 listeners: Arc::new(RwLock::new(HashMap::new())),
91 }
92 }
93
94 pub fn register<E, L>(&self, listener: L)
95 where
96 E: Event,
97 L: EventListener<E>,
98 {
99 let type_id = TypeId::of::<E>();
100
101 self.listeners
102 .write()
103 .unwrap()
104 .entry(type_id)
105 .or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()))
106 .as_any_mut()
107 .downcast_mut::<EventListenerListImpl<E>>()
108 .unwrap()
109 .add(Box::new(listener));
110 }
111
112 pub fn emit<E>(&self, event: E)
113 where
114 E: Event,
115 {
116 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
118 let type_id = TypeId::of::<E>();
119 let listeners = self.listeners.read().unwrap();
120
121 if let Some(listener_list) = listeners.get(&type_id) {
122 listener_list.on_any(event.as_any());
123 }
124 }));
125
126 if let Err(_) = result {
127 error!("Event emission panicked for type {}", std::any::type_name::<E>());
128 }
129 }
130}
131
132#[macro_export]
133macro_rules! impl_event {
134 ($ty:ty) => {
135 impl $crate::event::Event for $ty {
136 fn as_any(&self) -> &dyn std::any::Any {
137 self
138 }
139 }
140 };
141}
142
143#[cfg(test)]
144mod tests {
145 use std::{
146 sync::{Arc, Mutex},
147 thread,
148 };
149
150 use crate::event::{Event, EventBus, EventListener};
151
152 #[derive(Debug)]
153 pub struct TestEvent {}
154
155 impl_event!(TestEvent);
156
157 #[derive(Debug)]
158 pub struct AnotherEvent {}
159
160 impl_event!(AnotherEvent);
161
162 #[derive(Default, Debug, Clone)]
163 pub struct TestEventListener(Arc<TestHandlerInner>);
164
165 #[derive(Default, Debug)]
166 pub struct TestHandlerInner {
167 pub counter: Arc<Mutex<i32>>,
168 }
169
170 impl EventListener<TestEvent> for TestEventListener {
171 fn on(&self, _event: &TestEvent) {
172 let mut x = self.0.counter.lock().unwrap();
173 *x += 1;
174 }
175 }
176
177 impl EventListener<AnotherEvent> for TestEventListener {
178 fn on(&self, _event: &AnotherEvent) {
179 let mut x = self.0.counter.lock().unwrap();
180 *x *= 2;
181 }
182 }
183
184 #[test]
185 fn test_event_bus_new() {
186 let event_bus = EventBus::new();
187 event_bus.emit(TestEvent {});
188 }
189
190 #[test]
191 fn test_event_bus_default() {
192 let event_bus = EventBus::default();
193 event_bus.emit(TestEvent {});
194 }
195
196 #[test]
197 fn test_register_single_listener() {
198 let event_bus = EventBus::new();
199 let listener = TestEventListener::default();
200
201 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
202 event_bus.emit(TestEvent {});
203 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
204 }
205
206 #[test]
207 fn test_emit_unregistered_event() {
208 let event_bus = EventBus::new();
209 event_bus.emit(TestEvent {});
210 }
211
212 #[test]
213 fn test_multiple_listeners_same_event() {
214 let event_bus = EventBus::new();
215 let listener1 = TestEventListener::default();
216 let listener2 = TestEventListener::default();
217
218 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
219 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
220
221 event_bus.emit(TestEvent {});
222 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
223 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
224 }
225
226 #[test]
227 fn test_event_bus_clone() {
228 let event_bus1 = EventBus::new();
229 let listener = TestEventListener::default();
230 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
231
232 let event_bus2 = event_bus1.clone();
233 event_bus2.emit(TestEvent {});
234 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
235 }
236
237 #[test]
238 fn test_concurrent_registration() {
239 let event_bus = Arc::new(EventBus::new());
240 let handles: Vec<_> = (0..10)
241 .map(|_| {
242 let event_bus = event_bus.clone();
243 thread::spawn(move || {
244 let listener = TestEventListener::default();
245 event_bus.register::<TestEvent, TestEventListener>(listener);
246 })
247 })
248 .collect();
249
250 for handle in handles {
251 handle.join().unwrap();
252 }
253
254 event_bus.emit(TestEvent {});
255 }
256
257 #[test]
258 fn test_concurrent_emitting() {
259 let event_bus = Arc::new(EventBus::new());
260 let listener = TestEventListener::default();
261 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
262
263 let handles: Vec<_> = (0..10)
264 .map(|_| {
265 let event_bus = event_bus.clone();
266 thread::spawn(move || {
267 event_bus.emit(TestEvent {});
268 })
269 })
270 .collect();
271
272 for handle in handles {
273 handle.join().unwrap();
274 }
275
276 assert!(*listener.0.counter.lock().unwrap() >= 10);
277 }
278
279 #[derive(Debug)]
280 pub struct MacroTestEvent {
281 pub value: i32,
282 }
283
284 impl_event!(MacroTestEvent);
285
286 #[test]
287 fn test_impl_event_macro() {
288 let event = MacroTestEvent {
289 value: 42,
290 };
291 let any_ref = event.as_any();
292 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
293 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value, 42);
294 }
295
296 #[test]
297 fn test_multi_event_listener() {
298 let event_bus = EventBus::default();
299 let listener = TestEventListener::default();
300
301 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
302 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
303
304 event_bus.emit(TestEvent {});
306 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
307
308 event_bus.emit(TestEvent {});
309 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
310
311 event_bus.emit(AnotherEvent {});
312 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
314}