reifydb_core/event/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync::{Arc, RwLock},
8};
9
10use tracing::error;
11
12pub mod catalog;
13pub mod flow;
14pub mod lifecycle;
15pub mod transaction;
16
17pub trait Event: Any + Send + Sync + 'static {
18	fn as_any(&self) -> &dyn Any;
19}
20
21pub trait EventListener<E>: Send + Sync + 'static
22where
23	E: Event,
24{
25	fn on(&self, event: &E);
26}
27
28trait EventListenerList: Any + Send + Sync {
29	fn on_any(&self, event: &dyn Any);
30	fn as_any_mut(&mut self) -> &mut dyn Any;
31}
32
33struct EventListenerListImpl<E> {
34	listeners: RwLock<Vec<Box<dyn EventListener<E>>>>,
35}
36
37impl<E> EventListenerListImpl<E>
38where
39	E: Event,
40{
41	fn new() -> Self {
42		Self {
43			listeners: RwLock::new(Vec::new()),
44		}
45	}
46
47	fn add(&mut self, listener: Box<dyn EventListener<E>>) {
48		self.listeners.write().unwrap().push(listener);
49	}
50}
51
52impl<E> EventListenerList for EventListenerListImpl<E>
53where
54	E: Event,
55{
56	fn on_any(&self, event: &dyn Any) {
57		if let Some(event) = event.downcast_ref::<E>() {
58			for listener in self.listeners.read().unwrap().iter() {
59				// Add panic safety - catch panics and continue
60				// with other listeners
61				let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
62					listener.on(event);
63				}));
64				if let Err(_) = result {
65					error!("Event listener panicked for event type {}", std::any::type_name::<E>());
66				}
67			}
68		}
69	}
70
71	fn as_any_mut(&mut self) -> &mut dyn Any {
72		self
73	}
74}
75
76#[derive(Clone)]
77pub struct EventBus {
78	listeners: Arc<RwLock<HashMap<TypeId, Box<dyn EventListenerList>>>>,
79}
80
81impl Default for EventBus {
82	fn default() -> Self {
83		Self::new()
84	}
85}
86
87impl EventBus {
88	pub fn new() -> Self {
89		Self {
90			listeners: Arc::new(RwLock::new(HashMap::new())),
91		}
92	}
93
94	pub fn register<E, L>(&self, listener: L)
95	where
96		E: Event,
97		L: EventListener<E>,
98	{
99		let type_id = TypeId::of::<E>();
100
101		self.listeners
102			.write()
103			.unwrap()
104			.entry(type_id)
105			.or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()))
106			.as_any_mut()
107			.downcast_mut::<EventListenerListImpl<E>>()
108			.unwrap()
109			.add(Box::new(listener));
110	}
111
112	pub fn emit<E>(&self, event: E)
113	where
114		E: Event,
115	{
116		// Infallible emit with panic safety
117		let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
118			let type_id = TypeId::of::<E>();
119			let listeners = self.listeners.read().unwrap();
120
121			if let Some(listener_list) = listeners.get(&type_id) {
122				listener_list.on_any(event.as_any());
123			}
124		}));
125
126		if let Err(_) = result {
127			error!("Event emission panicked for type {}", std::any::type_name::<E>());
128		}
129	}
130}
131
132#[macro_export]
133macro_rules! impl_event {
134	($ty:ty) => {
135		impl $crate::event::Event for $ty {
136			fn as_any(&self) -> &dyn std::any::Any {
137				self
138			}
139		}
140	};
141}
142
143#[cfg(test)]
144mod tests {
145	use std::{
146		sync::{Arc, Mutex},
147		thread,
148	};
149
150	use crate::event::{Event, EventBus, EventListener};
151
152	#[derive(Debug)]
153	pub struct TestEvent {}
154
155	impl_event!(TestEvent);
156
157	#[derive(Debug)]
158	pub struct AnotherEvent {}
159
160	impl_event!(AnotherEvent);
161
162	#[derive(Default, Debug, Clone)]
163	pub struct TestEventListener(Arc<TestHandlerInner>);
164
165	#[derive(Default, Debug)]
166	pub struct TestHandlerInner {
167		pub counter: Arc<Mutex<i32>>,
168	}
169
170	impl EventListener<TestEvent> for TestEventListener {
171		fn on(&self, _event: &TestEvent) {
172			let mut x = self.0.counter.lock().unwrap();
173			*x += 1;
174		}
175	}
176
177	impl EventListener<AnotherEvent> for TestEventListener {
178		fn on(&self, _event: &AnotherEvent) {
179			let mut x = self.0.counter.lock().unwrap();
180			*x *= 2;
181		}
182	}
183
184	#[test]
185	fn test_event_bus_new() {
186		let event_bus = EventBus::new();
187		event_bus.emit(TestEvent {});
188	}
189
190	#[test]
191	fn test_event_bus_default() {
192		let event_bus = EventBus::default();
193		event_bus.emit(TestEvent {});
194	}
195
196	#[test]
197	fn test_register_single_listener() {
198		let event_bus = EventBus::new();
199		let listener = TestEventListener::default();
200
201		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
202		event_bus.emit(TestEvent {});
203		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
204	}
205
206	#[test]
207	fn test_emit_unregistered_event() {
208		let event_bus = EventBus::new();
209		event_bus.emit(TestEvent {});
210	}
211
212	#[test]
213	fn test_multiple_listeners_same_event() {
214		let event_bus = EventBus::new();
215		let listener1 = TestEventListener::default();
216		let listener2 = TestEventListener::default();
217
218		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
219		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
220
221		event_bus.emit(TestEvent {});
222		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
223		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
224	}
225
226	#[test]
227	fn test_event_bus_clone() {
228		let event_bus1 = EventBus::new();
229		let listener = TestEventListener::default();
230		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
231
232		let event_bus2 = event_bus1.clone();
233		event_bus2.emit(TestEvent {});
234		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
235	}
236
237	#[test]
238	fn test_concurrent_registration() {
239		let event_bus = Arc::new(EventBus::new());
240		let handles: Vec<_> = (0..10)
241			.map(|_| {
242				let event_bus = event_bus.clone();
243				thread::spawn(move || {
244					let listener = TestEventListener::default();
245					event_bus.register::<TestEvent, TestEventListener>(listener);
246				})
247			})
248			.collect();
249
250		for handle in handles {
251			handle.join().unwrap();
252		}
253
254		event_bus.emit(TestEvent {});
255	}
256
257	#[test]
258	fn test_concurrent_emitting() {
259		let event_bus = Arc::new(EventBus::new());
260		let listener = TestEventListener::default();
261		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
262
263		let handles: Vec<_> = (0..10)
264			.map(|_| {
265				let event_bus = event_bus.clone();
266				thread::spawn(move || {
267					event_bus.emit(TestEvent {});
268				})
269			})
270			.collect();
271
272		for handle in handles {
273			handle.join().unwrap();
274		}
275
276		assert!(*listener.0.counter.lock().unwrap() >= 10);
277	}
278
279	#[derive(Debug)]
280	pub struct MacroTestEvent {
281		pub value: i32,
282	}
283
284	impl_event!(MacroTestEvent);
285
286	#[test]
287	fn test_impl_event_macro() {
288		let event = MacroTestEvent {
289			value: 42,
290		};
291		let any_ref = event.as_any();
292		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
293		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value, 42);
294	}
295
296	#[test]
297	fn test_multi_event_listener() {
298		let event_bus = EventBus::default();
299		let listener = TestEventListener::default();
300
301		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
302		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
303
304		// Each event type triggers only its own listeners
305		event_bus.emit(TestEvent {});
306		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
307
308		event_bus.emit(TestEvent {});
309		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
310
311		event_bus.emit(AnotherEvent {});
312		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
313	}
314}