reifydb_core/event/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync::{Arc, RwLock},
8};
9
10use crate::log_error;
11
12pub mod catalog;
13pub mod cdc;
14pub mod flow;
15pub mod lifecycle;
16pub mod transaction;
17
18pub trait Event: Any + Send + Sync + 'static {
19	fn as_any(&self) -> &dyn Any;
20}
21
22pub trait EventListener<E>: Send + Sync + 'static
23where
24	E: Event,
25{
26	fn on(&self, event: &E);
27}
28
29trait EventListenerList: Any + Send + Sync {
30	fn on_any(&self, event: &dyn Any);
31	fn as_any_mut(&mut self) -> &mut dyn Any;
32}
33
34struct EventListenerListImpl<E> {
35	listeners: RwLock<Vec<Box<dyn EventListener<E>>>>,
36}
37
38impl<E> EventListenerListImpl<E>
39where
40	E: Event,
41{
42	fn new() -> Self {
43		Self {
44			listeners: RwLock::new(Vec::new()),
45		}
46	}
47
48	fn add(&mut self, listener: Box<dyn EventListener<E>>) {
49		self.listeners.write().unwrap().push(listener);
50	}
51}
52
53impl<E> EventListenerList for EventListenerListImpl<E>
54where
55	E: Event,
56{
57	fn on_any(&self, event: &dyn Any) {
58		if let Some(event) = event.downcast_ref::<E>() {
59			for listener in self.listeners.read().unwrap().iter() {
60				// Add panic safety - catch panics and continue
61				// with other listeners
62				let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
63					listener.on(event);
64				}));
65				if let Err(_) = result {
66					log_error!(
67						"Event listener panicked for event type {}",
68						std::any::type_name::<E>()
69					);
70				}
71			}
72		}
73	}
74
75	fn as_any_mut(&mut self) -> &mut dyn Any {
76		self
77	}
78}
79
80#[derive(Clone)]
81pub struct EventBus {
82	listeners: Arc<RwLock<HashMap<TypeId, Box<dyn EventListenerList>>>>,
83}
84
85impl Default for EventBus {
86	fn default() -> Self {
87		Self::new()
88	}
89}
90
91impl EventBus {
92	pub fn new() -> Self {
93		Self {
94			listeners: Arc::new(RwLock::new(HashMap::new())),
95		}
96	}
97
98	pub fn register<E, L>(&self, listener: L)
99	where
100		E: Event,
101		L: EventListener<E>,
102	{
103		let type_id = TypeId::of::<E>();
104
105		self.listeners
106			.write()
107			.unwrap()
108			.entry(type_id)
109			.or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()))
110			.as_any_mut()
111			.downcast_mut::<EventListenerListImpl<E>>()
112			.unwrap()
113			.add(Box::new(listener));
114	}
115
116	pub fn emit<E>(&self, event: E)
117	where
118		E: Event,
119	{
120		// Infallible emit with panic safety
121		let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
122			let type_id = TypeId::of::<E>();
123			let listeners = self.listeners.read().unwrap();
124
125			if let Some(listener_list) = listeners.get(&type_id) {
126				listener_list.on_any(event.as_any());
127			}
128		}));
129
130		if let Err(_) = result {
131			log_error!("Event emission panicked for type {}", std::any::type_name::<E>());
132		}
133	}
134}
135
136#[macro_export]
137macro_rules! impl_event {
138	($ty:ty) => {
139		impl $crate::event::Event for $ty {
140			fn as_any(&self) -> &dyn std::any::Any {
141				self
142			}
143		}
144	};
145}
146
147#[cfg(test)]
148mod tests {
149	use std::{
150		sync::{Arc, Mutex},
151		thread,
152	};
153
154	use crate::event::{Event, EventBus, EventListener};
155
156	#[derive(Debug)]
157	pub struct TestEvent {}
158
159	impl_event!(TestEvent);
160
161	#[derive(Debug)]
162	pub struct AnotherEvent {}
163
164	impl_event!(AnotherEvent);
165
166	#[derive(Default, Debug, Clone)]
167	pub struct TestEventListener(Arc<TestHandlerInner>);
168
169	#[derive(Default, Debug)]
170	pub struct TestHandlerInner {
171		pub counter: Arc<Mutex<i32>>,
172	}
173
174	impl EventListener<TestEvent> for TestEventListener {
175		fn on(&self, _event: &TestEvent) {
176			let mut x = self.0.counter.lock().unwrap();
177			*x += 1;
178		}
179	}
180
181	impl EventListener<AnotherEvent> for TestEventListener {
182		fn on(&self, _event: &AnotherEvent) {
183			let mut x = self.0.counter.lock().unwrap();
184			*x *= 2;
185		}
186	}
187
188	#[test]
189	fn test_event_bus_new() {
190		let event_bus = EventBus::new();
191		event_bus.emit(TestEvent {});
192	}
193
194	#[test]
195	fn test_event_bus_default() {
196		let event_bus = EventBus::default();
197		event_bus.emit(TestEvent {});
198	}
199
200	#[test]
201	fn test_register_single_listener() {
202		let event_bus = EventBus::new();
203		let listener = TestEventListener::default();
204
205		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
206		event_bus.emit(TestEvent {});
207		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
208	}
209
210	#[test]
211	fn test_emit_unregistered_event() {
212		let event_bus = EventBus::new();
213		event_bus.emit(TestEvent {});
214	}
215
216	#[test]
217	fn test_multiple_listeners_same_event() {
218		let event_bus = EventBus::new();
219		let listener1 = TestEventListener::default();
220		let listener2 = TestEventListener::default();
221
222		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
223		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
224
225		event_bus.emit(TestEvent {});
226		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
227		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
228	}
229
230	#[test]
231	fn test_event_bus_clone() {
232		let event_bus1 = EventBus::new();
233		let listener = TestEventListener::default();
234		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
235
236		let event_bus2 = event_bus1.clone();
237		event_bus2.emit(TestEvent {});
238		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
239	}
240
241	#[test]
242	fn test_concurrent_registration() {
243		let event_bus = Arc::new(EventBus::new());
244		let handles: Vec<_> = (0..10)
245			.map(|_| {
246				let event_bus = event_bus.clone();
247				thread::spawn(move || {
248					let listener = TestEventListener::default();
249					event_bus.register::<TestEvent, TestEventListener>(listener);
250				})
251			})
252			.collect();
253
254		for handle in handles {
255			handle.join().unwrap();
256		}
257
258		event_bus.emit(TestEvent {});
259	}
260
261	#[test]
262	fn test_concurrent_emitting() {
263		let event_bus = Arc::new(EventBus::new());
264		let listener = TestEventListener::default();
265		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
266
267		let handles: Vec<_> = (0..10)
268			.map(|_| {
269				let event_bus = event_bus.clone();
270				thread::spawn(move || {
271					event_bus.emit(TestEvent {});
272				})
273			})
274			.collect();
275
276		for handle in handles {
277			handle.join().unwrap();
278		}
279
280		assert!(*listener.0.counter.lock().unwrap() >= 10);
281	}
282
283	#[derive(Debug)]
284	pub struct MacroTestEvent {
285		pub value: i32,
286	}
287
288	impl_event!(MacroTestEvent);
289
290	#[test]
291	fn test_impl_event_macro() {
292		let event = MacroTestEvent {
293			value: 42,
294		};
295		let any_ref = event.as_any();
296		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
297		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value, 42);
298	}
299
300	#[test]
301	fn test_multi_event_listener() {
302		let event_bus = EventBus::default();
303		let listener = TestEventListener::default();
304
305		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
306		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
307
308		// Each event type triggers only its own listeners
309		event_bus.emit(TestEvent {});
310		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
311
312		event_bus.emit(TestEvent {});
313		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
314
315		event_bus.emit(AnotherEvent {});
316		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
317	}
318}