reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync::{Arc, RwLock},
8};
9
10use crate::log_error;
11
12pub mod catalog;
13pub mod cdc;
14pub mod flow;
15pub mod lifecycle;
16pub mod transaction;
17
18pub trait Event: Any + Send + Sync + 'static {
19 fn as_any(&self) -> &dyn Any;
20}
21
22pub trait EventListener<E>: Send + Sync + 'static
23where
24 E: Event,
25{
26 fn on(&self, event: &E);
27}
28
29trait EventListenerList: Any + Send + Sync {
30 fn on_any(&self, event: &dyn Any);
31 fn as_any_mut(&mut self) -> &mut dyn Any;
32}
33
34struct EventListenerListImpl<E> {
35 listeners: RwLock<Vec<Box<dyn EventListener<E>>>>,
36}
37
38impl<E> EventListenerListImpl<E>
39where
40 E: Event,
41{
42 fn new() -> Self {
43 Self {
44 listeners: RwLock::new(Vec::new()),
45 }
46 }
47
48 fn add(&mut self, listener: Box<dyn EventListener<E>>) {
49 self.listeners.write().unwrap().push(listener);
50 }
51}
52
53impl<E> EventListenerList for EventListenerListImpl<E>
54where
55 E: Event,
56{
57 fn on_any(&self, event: &dyn Any) {
58 if let Some(event) = event.downcast_ref::<E>() {
59 for listener in self.listeners.read().unwrap().iter() {
60 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
63 listener.on(event);
64 }));
65 if let Err(_) = result {
66 log_error!(
67 "Event listener panicked for event type {}",
68 std::any::type_name::<E>()
69 );
70 }
71 }
72 }
73 }
74
75 fn as_any_mut(&mut self) -> &mut dyn Any {
76 self
77 }
78}
79
80#[derive(Clone)]
81pub struct EventBus {
82 listeners: Arc<RwLock<HashMap<TypeId, Box<dyn EventListenerList>>>>,
83}
84
85impl Default for EventBus {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl EventBus {
92 pub fn new() -> Self {
93 Self {
94 listeners: Arc::new(RwLock::new(HashMap::new())),
95 }
96 }
97
98 pub fn register<E, L>(&self, listener: L)
99 where
100 E: Event,
101 L: EventListener<E>,
102 {
103 let type_id = TypeId::of::<E>();
104
105 self.listeners
106 .write()
107 .unwrap()
108 .entry(type_id)
109 .or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()))
110 .as_any_mut()
111 .downcast_mut::<EventListenerListImpl<E>>()
112 .unwrap()
113 .add(Box::new(listener));
114 }
115
116 pub fn emit<E>(&self, event: E)
117 where
118 E: Event,
119 {
120 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
122 let type_id = TypeId::of::<E>();
123 let listeners = self.listeners.read().unwrap();
124
125 if let Some(listener_list) = listeners.get(&type_id) {
126 listener_list.on_any(event.as_any());
127 }
128 }));
129
130 if let Err(_) = result {
131 log_error!("Event emission panicked for type {}", std::any::type_name::<E>());
132 }
133 }
134}
135
136#[macro_export]
137macro_rules! impl_event {
138 ($ty:ty) => {
139 impl $crate::event::Event for $ty {
140 fn as_any(&self) -> &dyn std::any::Any {
141 self
142 }
143 }
144 };
145}
146
147#[cfg(test)]
148mod tests {
149 use std::{
150 sync::{Arc, Mutex},
151 thread,
152 };
153
154 use crate::event::{Event, EventBus, EventListener};
155
156 #[derive(Debug)]
157 pub struct TestEvent {}
158
159 impl_event!(TestEvent);
160
161 #[derive(Debug)]
162 pub struct AnotherEvent {}
163
164 impl_event!(AnotherEvent);
165
166 #[derive(Default, Debug, Clone)]
167 pub struct TestEventListener(Arc<TestHandlerInner>);
168
169 #[derive(Default, Debug)]
170 pub struct TestHandlerInner {
171 pub counter: Arc<Mutex<i32>>,
172 }
173
174 impl EventListener<TestEvent> for TestEventListener {
175 fn on(&self, _event: &TestEvent) {
176 let mut x = self.0.counter.lock().unwrap();
177 *x += 1;
178 }
179 }
180
181 impl EventListener<AnotherEvent> for TestEventListener {
182 fn on(&self, _event: &AnotherEvent) {
183 let mut x = self.0.counter.lock().unwrap();
184 *x *= 2;
185 }
186 }
187
188 #[test]
189 fn test_event_bus_new() {
190 let event_bus = EventBus::new();
191 event_bus.emit(TestEvent {});
192 }
193
194 #[test]
195 fn test_event_bus_default() {
196 let event_bus = EventBus::default();
197 event_bus.emit(TestEvent {});
198 }
199
200 #[test]
201 fn test_register_single_listener() {
202 let event_bus = EventBus::new();
203 let listener = TestEventListener::default();
204
205 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
206 event_bus.emit(TestEvent {});
207 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
208 }
209
210 #[test]
211 fn test_emit_unregistered_event() {
212 let event_bus = EventBus::new();
213 event_bus.emit(TestEvent {});
214 }
215
216 #[test]
217 fn test_multiple_listeners_same_event() {
218 let event_bus = EventBus::new();
219 let listener1 = TestEventListener::default();
220 let listener2 = TestEventListener::default();
221
222 event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
223 event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
224
225 event_bus.emit(TestEvent {});
226 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
227 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
228 }
229
230 #[test]
231 fn test_event_bus_clone() {
232 let event_bus1 = EventBus::new();
233 let listener = TestEventListener::default();
234 event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
235
236 let event_bus2 = event_bus1.clone();
237 event_bus2.emit(TestEvent {});
238 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
239 }
240
241 #[test]
242 fn test_concurrent_registration() {
243 let event_bus = Arc::new(EventBus::new());
244 let handles: Vec<_> = (0..10)
245 .map(|_| {
246 let event_bus = event_bus.clone();
247 thread::spawn(move || {
248 let listener = TestEventListener::default();
249 event_bus.register::<TestEvent, TestEventListener>(listener);
250 })
251 })
252 .collect();
253
254 for handle in handles {
255 handle.join().unwrap();
256 }
257
258 event_bus.emit(TestEvent {});
259 }
260
261 #[test]
262 fn test_concurrent_emitting() {
263 let event_bus = Arc::new(EventBus::new());
264 let listener = TestEventListener::default();
265 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
266
267 let handles: Vec<_> = (0..10)
268 .map(|_| {
269 let event_bus = event_bus.clone();
270 thread::spawn(move || {
271 event_bus.emit(TestEvent {});
272 })
273 })
274 .collect();
275
276 for handle in handles {
277 handle.join().unwrap();
278 }
279
280 assert!(*listener.0.counter.lock().unwrap() >= 10);
281 }
282
283 #[derive(Debug)]
284 pub struct MacroTestEvent {
285 pub value: i32,
286 }
287
288 impl_event!(MacroTestEvent);
289
290 #[test]
291 fn test_impl_event_macro() {
292 let event = MacroTestEvent {
293 value: 42,
294 };
295 let any_ref = event.as_any();
296 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
297 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value, 42);
298 }
299
300 #[test]
301 fn test_multi_event_listener() {
302 let event_bus = EventBus::default();
303 let listener = TestEventListener::default();
304
305 event_bus.register::<TestEvent, TestEventListener>(listener.clone());
306 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
307
308 event_bus.emit(TestEvent {});
310 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
311
312 event_bus.emit(TestEvent {});
313 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
314
315 event_bus.emit(AnotherEvent {});
316 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
318}