reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync::{Arc, RwLock},
8};
9
10use crate::log_error;
11
12pub mod catalog;
13pub mod flow;
14pub mod lifecycle;
15pub mod transaction;
16
17pub trait Event: Any + Send + Sync + 'static {
18 fn as_any(&self) -> &dyn Any;
19}
20
21pub trait EventListener<E>: Send + Sync + 'static
22where
23 E: Event,
24{
25 fn on(&self, event: &E);
26}
27
28trait EventListenerList: Any + Send + Sync {
29 fn on_any(&self, event: &dyn Any);
30 fn as_any_mut(&mut self) -> &mut dyn Any;
31}
32
33struct EventListenerListImpl<E> {
34 listeners: RwLock<Vec<Box<dyn EventListener<E>>>>,
35}
36
37impl<E> EventListenerListImpl<E>
38where
39 E: Event,
40{
41 fn new() -> Self {
42 Self {
43 listeners: RwLock::new(Vec::new()),
44 }
45 }
46
47 fn add(&mut self, listener: Box<dyn EventListener<E>>) {
48 self.listeners.write().unwrap().push(listener);
49 }
50}
51
52impl<E> EventListenerList for EventListenerListImpl<E>
53where
54 E: Event,
55{
56 fn on_any(&self, event: &dyn Any) {
57 if let Some(event) = event.downcast_ref::<E>() {
58 for listener in self.listeners.read().unwrap().iter() {
59 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
62 listener.on(event);
63 }));
64 if let Err(_) = result {
65 log_error!(
66 "Event listener panicked for event type {}",
67 std::any::type_name::<E>()
68 );
69 }
70 }
71 }
72 }
73
74 fn as_any_mut(&mut self) -> &mut dyn Any {
75 self
76 }
77}
78
79#[derive(Clone)]
80pub struct EventBus {
81 listeners: Arc<RwLock<HashMap<TypeId, Box<dyn EventListenerList>>>>,
82}
83
84impl Default for EventBus {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl EventBus {
91 pub fn new() -> Self {
92 Self {
93 listeners: Arc::new(RwLock::new(HashMap::new())),
94 }
95 }
96
97 pub fn register<E, L>(&self, listener: L)
98 where
99 E: Event,
100 L: EventListener<E>,
101 {
102 let type_id = TypeId::of::<E>();
103
104 self.listeners
105 .write()
106 .unwrap()
107 .entry(type_id)
108 .or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()))
109 .as_any_mut()
110 .downcast_mut::<EventListenerListImpl<E>>()
111 .unwrap()
112 .add(Box::new(listener));
113 }
114
115 pub fn emit<E>(&self, event: E)
116 where
117 E: Event,
118 {
119 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
121 let type_id = TypeId::of::<E>();
122 let listeners = self.listeners.read().unwrap();
123
124 if let Some(listener_list) = listeners.get(&type_id) {
125 listener_list.on_any(event.as_any());
126 }
127 }));
128
129 if let Err(_) = result {
130 log_error!("Event emission panicked for type {}", std::any::type_name::<E>());
131 }
132 }
133}
134
135#[macro_export]
136macro_rules! impl_event {
137 ($ty:ty) => {
138 impl $crate::event::Event for $ty {
139 fn as_any(&self) -> &dyn std::any::Any {
140 self
141 }
142 }
143 };
144}
145
146#[cfg(test)]
147mod tests {
148 use std::{
149 sync::{Arc, Mutex},
150 thread,
151 };
152
153 use crate::event::{Event, EventBus, EventListener};
154
155 #[derive(Debug)]
156 pub struct TestEvent {}
157
158 impl_event!(TestEvent);
159
160 #[derive(Debug)]
161 pub struct AnotherEvent {}
162
163 impl_event!(AnotherEvent);
164
165 #[derive(Default, Debug, Clone)]
166 pub struct TestEventListener(Arc<TestHandlerInner>);
167
168 #[derive(Default, Debug)]
169 pub struct TestHandlerInner {
170 pub counter: Arc<Mutex<i32>>,
171 }
172
173 impl EventListener<TestEvent> for TestEventListener {
174 fn on(&self, _event: &TestEvent) {
175 let mut x = self.0.counter.lock().unwrap();
176 *x += 1;
177 }
178 }
179
180 impl EventListener<AnotherEvent> for TestEventListener {
181 fn on(&self, _event: &AnotherEvent) {
182 let mut x = self.0.counter.lock().unwrap();
183 *x *= 2;
184 }
185 }
186
187 #[test]
188 fn test_event_bus_new() {
189 let event_bus = EventBus::new();
190 event_bus.emit(TestEvent {});
191 }
192
193 #[test]
194 fn test_event_bus_default() {
195 let event_bus = EventBus::default();
196 event_bus.emit(TestEvent {});
197 }
198
199 #[test]
200 fn test_register_single_listener() {
201 let event_bus = EventBus::new();
202 let listener = TestEventListener::default();
203
204 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
205 event_bus.emit(TestEvent {});
206 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
207 }
208
209 #[test]
210 fn test_emit_unregistered_event() {
211 let event_bus = EventBus::new();
212 event_bus.emit(TestEvent {});
213 }
214
215 #[test]
216 fn test_multiple_listeners_same_event() {
217 let event_bus = EventBus::new();
218 let listener1 = TestEventListener::default();
219 let listener2 = TestEventListener::default();
220
221 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
222 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
223
224 event_bus.emit(TestEvent {});
225 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
226 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
227 }
228
229 #[test]
230 fn test_event_bus_clone() {
231 let event_bus1 = EventBus::new();
232 let listener = TestEventListener::default();
233 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
234
235 let event_bus2 = event_bus1.clone();
236 event_bus2.emit(TestEvent {});
237 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
238 }
239
240 #[test]
241 fn test_concurrent_registration() {
242 let event_bus = Arc::new(EventBus::new());
243 let handles: Vec<_> = (0..10)
244 .map(|_| {
245 let event_bus = event_bus.clone();
246 thread::spawn(move || {
247 let listener = TestEventListener::default();
248 event_bus.register::<TestEvent, TestEventListener>(listener);
249 })
250 })
251 .collect();
252
253 for handle in handles {
254 handle.join().unwrap();
255 }
256
257 event_bus.emit(TestEvent {});
258 }
259
260 #[test]
261 fn test_concurrent_emitting() {
262 let event_bus = Arc::new(EventBus::new());
263 let listener = TestEventListener::default();
264 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
265
266 let handles: Vec<_> = (0..10)
267 .map(|_| {
268 let event_bus = event_bus.clone();
269 thread::spawn(move || {
270 event_bus.emit(TestEvent {});
271 })
272 })
273 .collect();
274
275 for handle in handles {
276 handle.join().unwrap();
277 }
278
279 assert!(*listener.0.counter.lock().unwrap() >= 10);
280 }
281
282 #[derive(Debug)]
283 pub struct MacroTestEvent {
284 pub value: i32,
285 }
286
287 impl_event!(MacroTestEvent);
288
289 #[test]
290 fn test_impl_event_macro() {
291 let event = MacroTestEvent {
292 value: 42,
293 };
294 let any_ref = event.as_any();
295 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
296 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value, 42);
297 }
298
299 #[test]
300 fn test_multi_event_listener() {
301 let event_bus = EventBus::default();
302 let listener = TestEventListener::default();
303
304 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
305 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
306
307 event_bus.emit(TestEvent {});
309 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
310
311 event_bus.emit(TestEvent {});
312 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
313
314 event_bus.emit(AnotherEvent {});
315 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
317}