reifydb_core/event/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync::{Arc, RwLock},
8};
9
10use crate::log_error;
11
12pub mod catalog;
13pub mod flow;
14pub mod lifecycle;
15pub mod transaction;
16
17pub trait Event: Any + Send + Sync + 'static {
18	fn as_any(&self) -> &dyn Any;
19}
20
21pub trait EventListener<E>: Send + Sync + 'static
22where
23	E: Event,
24{
25	fn on(&self, event: &E);
26}
27
28trait EventListenerList: Any + Send + Sync {
29	fn on_any(&self, event: &dyn Any);
30	fn as_any_mut(&mut self) -> &mut dyn Any;
31}
32
33struct EventListenerListImpl<E> {
34	listeners: RwLock<Vec<Box<dyn EventListener<E>>>>,
35}
36
37impl<E> EventListenerListImpl<E>
38where
39	E: Event,
40{
41	fn new() -> Self {
42		Self {
43			listeners: RwLock::new(Vec::new()),
44		}
45	}
46
47	fn add(&mut self, listener: Box<dyn EventListener<E>>) {
48		self.listeners.write().unwrap().push(listener);
49	}
50}
51
52impl<E> EventListenerList for EventListenerListImpl<E>
53where
54	E: Event,
55{
56	fn on_any(&self, event: &dyn Any) {
57		if let Some(event) = event.downcast_ref::<E>() {
58			for listener in self.listeners.read().unwrap().iter() {
59				// Add panic safety - catch panics and continue
60				// with other listeners
61				let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
62					listener.on(event);
63				}));
64				if let Err(_) = result {
65					log_error!(
66						"Event listener panicked for event type {}",
67						std::any::type_name::<E>()
68					);
69				}
70			}
71		}
72	}
73
74	fn as_any_mut(&mut self) -> &mut dyn Any {
75		self
76	}
77}
78
79#[derive(Clone)]
80pub struct EventBus {
81	listeners: Arc<RwLock<HashMap<TypeId, Box<dyn EventListenerList>>>>,
82}
83
84impl Default for EventBus {
85	fn default() -> Self {
86		Self::new()
87	}
88}
89
90impl EventBus {
91	pub fn new() -> Self {
92		Self {
93			listeners: Arc::new(RwLock::new(HashMap::new())),
94		}
95	}
96
97	pub fn register<E, L>(&self, listener: L)
98	where
99		E: Event,
100		L: EventListener<E>,
101	{
102		let type_id = TypeId::of::<E>();
103
104		self.listeners
105			.write()
106			.unwrap()
107			.entry(type_id)
108			.or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()))
109			.as_any_mut()
110			.downcast_mut::<EventListenerListImpl<E>>()
111			.unwrap()
112			.add(Box::new(listener));
113	}
114
115	pub fn emit<E>(&self, event: E)
116	where
117		E: Event,
118	{
119		// Infallible emit with panic safety
120		let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
121			let type_id = TypeId::of::<E>();
122			let listeners = self.listeners.read().unwrap();
123
124			if let Some(listener_list) = listeners.get(&type_id) {
125				listener_list.on_any(event.as_any());
126			}
127		}));
128
129		if let Err(_) = result {
130			log_error!("Event emission panicked for type {}", std::any::type_name::<E>());
131		}
132	}
133}
134
135#[macro_export]
136macro_rules! impl_event {
137	($ty:ty) => {
138		impl $crate::event::Event for $ty {
139			fn as_any(&self) -> &dyn std::any::Any {
140				self
141			}
142		}
143	};
144}
145
146#[cfg(test)]
147mod tests {
148	use std::{
149		sync::{Arc, Mutex},
150		thread,
151	};
152
153	use crate::event::{Event, EventBus, EventListener};
154
155	#[derive(Debug)]
156	pub struct TestEvent {}
157
158	impl_event!(TestEvent);
159
160	#[derive(Debug)]
161	pub struct AnotherEvent {}
162
163	impl_event!(AnotherEvent);
164
165	#[derive(Default, Debug, Clone)]
166	pub struct TestEventListener(Arc<TestHandlerInner>);
167
168	#[derive(Default, Debug)]
169	pub struct TestHandlerInner {
170		pub counter: Arc<Mutex<i32>>,
171	}
172
173	impl EventListener<TestEvent> for TestEventListener {
174		fn on(&self, _event: &TestEvent) {
175			let mut x = self.0.counter.lock().unwrap();
176			*x += 1;
177		}
178	}
179
180	impl EventListener<AnotherEvent> for TestEventListener {
181		fn on(&self, _event: &AnotherEvent) {
182			let mut x = self.0.counter.lock().unwrap();
183			*x *= 2;
184		}
185	}
186
187	#[test]
188	fn test_event_bus_new() {
189		let event_bus = EventBus::new();
190		event_bus.emit(TestEvent {});
191	}
192
193	#[test]
194	fn test_event_bus_default() {
195		let event_bus = EventBus::default();
196		event_bus.emit(TestEvent {});
197	}
198
199	#[test]
200	fn test_register_single_listener() {
201		let event_bus = EventBus::new();
202		let listener = TestEventListener::default();
203
204		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
205		event_bus.emit(TestEvent {});
206		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
207	}
208
209	#[test]
210	fn test_emit_unregistered_event() {
211		let event_bus = EventBus::new();
212		event_bus.emit(TestEvent {});
213	}
214
215	#[test]
216	fn test_multiple_listeners_same_event() {
217		let event_bus = EventBus::new();
218		let listener1 = TestEventListener::default();
219		let listener2 = TestEventListener::default();
220
221		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
222		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
223
224		event_bus.emit(TestEvent {});
225		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
226		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
227	}
228
229	#[test]
230	fn test_event_bus_clone() {
231		let event_bus1 = EventBus::new();
232		let listener = TestEventListener::default();
233		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
234
235		let event_bus2 = event_bus1.clone();
236		event_bus2.emit(TestEvent {});
237		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
238	}
239
240	#[test]
241	fn test_concurrent_registration() {
242		let event_bus = Arc::new(EventBus::new());
243		let handles: Vec<_> = (0..10)
244			.map(|_| {
245				let event_bus = event_bus.clone();
246				thread::spawn(move || {
247					let listener = TestEventListener::default();
248					event_bus.register::<TestEvent, TestEventListener>(listener);
249				})
250			})
251			.collect();
252
253		for handle in handles {
254			handle.join().unwrap();
255		}
256
257		event_bus.emit(TestEvent {});
258	}
259
260	#[test]
261	fn test_concurrent_emitting() {
262		let event_bus = Arc::new(EventBus::new());
263		let listener = TestEventListener::default();
264		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
265
266		let handles: Vec<_> = (0..10)
267			.map(|_| {
268				let event_bus = event_bus.clone();
269				thread::spawn(move || {
270					event_bus.emit(TestEvent {});
271				})
272			})
273			.collect();
274
275		for handle in handles {
276			handle.join().unwrap();
277		}
278
279		assert!(*listener.0.counter.lock().unwrap() >= 10);
280	}
281
282	#[derive(Debug)]
283	pub struct MacroTestEvent {
284		pub value: i32,
285	}
286
287	impl_event!(MacroTestEvent);
288
289	#[test]
290	fn test_impl_event_macro() {
291		let event = MacroTestEvent {
292			value: 42,
293		};
294		let any_ref = event.as_any();
295		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
296		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value, 42);
297	}
298
299	#[test]
300	fn test_multi_event_listener() {
301		let event_bus = EventBus::default();
302		let listener = TestEventListener::default();
303
304		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
305		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
306
307		// Each event type triggers only its own listeners
308		event_bus.emit(TestEvent {});
309		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
310
311		event_bus.emit(TestEvent {});
312		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
313
314		event_bus.emit(AnotherEvent {});
315		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
316	}
317}