reifydb_core/event/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync::Arc,
8};
9
10use async_trait::async_trait;
11use tokio::sync::RwLock;
12
13pub mod catalog;
14pub mod flow;
15pub mod lifecycle;
16pub mod transaction;
17
18pub trait Event: Any + Send + Sync + Clone + 'static {
19	fn as_any(&self) -> &dyn Any;
20	fn into_any(self) -> Box<dyn Any + Send>;
21}
22
23#[async_trait]
24pub trait EventListener<E>: Send + Sync + 'static
25where
26	E: Event,
27{
28	async fn on(&self, event: &E);
29}
30
31#[async_trait]
32trait EventListenerList: Any + Send + Sync {
33	async fn on_any(&self, event: Box<dyn Any + Send>);
34	fn as_any_mut(&mut self) -> &mut dyn Any;
35}
36
37struct EventListenerListImpl<E> {
38	listeners: RwLock<Vec<Arc<dyn EventListener<E>>>>,
39}
40
41impl<E> EventListenerListImpl<E>
42where
43	E: Event,
44{
45	fn new() -> Self {
46		Self {
47			listeners: RwLock::new(Vec::new()),
48		}
49	}
50
51	async fn add(&self, listener: Arc<dyn EventListener<E>>) {
52		self.listeners.write().await.push(listener);
53	}
54}
55
56#[async_trait]
57impl<E> EventListenerList for EventListenerListImpl<E>
58where
59	E: Event,
60{
61	async fn on_any(&self, event: Box<dyn Any + Send>) {
62		if let Ok(event) = event.downcast::<E>() {
63			// Get a snapshot of listeners (hold lock briefly)
64			let listeners: Vec<_> = {
65				let guard = self.listeners.read().await;
66				guard.iter().cloned().collect()
67			};
68
69			// Now we can await without holding the lock
70			for listener in listeners {
71				listener.on(&*event).await;
72			}
73		}
74	}
75
76	fn as_any_mut(&mut self) -> &mut dyn Any {
77		self
78	}
79}
80
81#[derive(Clone)]
82pub struct EventBus {
83	listeners: Arc<RwLock<HashMap<TypeId, Box<dyn EventListenerList>>>>,
84}
85
86impl Default for EventBus {
87	fn default() -> Self {
88		Self::new()
89	}
90}
91
92impl EventBus {
93	pub fn new() -> Self {
94		Self {
95			listeners: Arc::new(RwLock::new(HashMap::new())),
96		}
97	}
98
99	pub async fn register<E, L>(&self, listener: L)
100	where
101		E: Event,
102		L: EventListener<E>,
103	{
104		let type_id = TypeId::of::<E>();
105
106		let mut listeners = self.listeners.write().await;
107		let list = listeners.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
108
109		list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(Arc::new(listener)).await;
110	}
111
112	pub async fn emit<E>(&self, event: E)
113	where
114		E: Event,
115	{
116		let type_id = TypeId::of::<E>();
117
118		// Get a clone of the listener list pointer
119		let listener_list = {
120			let listeners = self.listeners.read().await;
121			listeners.get(&type_id).map(|l| l.as_ref() as *const dyn EventListenerList)
122		};
123
124		// Now call on_any without holding the lock
125		if let Some(listener_list_ptr) = listener_list {
126			// SAFETY: The listener_list is stored in an Arc inside self.listeners,
127			// so it remains valid as long as self exists
128			let listener_list = unsafe { &*listener_list_ptr };
129			listener_list.on_any(event.into_any()).await;
130		}
131	}
132}
133
134#[macro_export]
135macro_rules! impl_event {
136	($ty:ty) => {
137		impl $crate::event::Event for $ty {
138			fn as_any(&self) -> &dyn std::any::Any {
139				self
140			}
141
142			fn into_any(self) -> Box<dyn std::any::Any + Send> {
143				Box::new(self)
144			}
145		}
146	};
147}
148
149#[cfg(test)]
150mod tests {
151	use std::sync::{Arc, Mutex};
152
153	use async_trait::async_trait;
154
155	use crate::event::{Event, EventBus, EventListener};
156
157	#[derive(Debug, Clone)]
158	pub struct TestEvent {}
159
160	impl_event!(TestEvent);
161
162	#[derive(Debug, Clone)]
163	pub struct AnotherEvent {}
164
165	impl_event!(AnotherEvent);
166
167	#[derive(Default, Debug, Clone)]
168	pub struct TestEventListener(Arc<TestHandlerInner>);
169
170	#[derive(Default, Debug)]
171	pub struct TestHandlerInner {
172		pub counter: Arc<Mutex<i32>>,
173	}
174
175	#[async_trait]
176	impl EventListener<TestEvent> for TestEventListener {
177		async fn on(&self, _event: &TestEvent) {
178			let mut x = self.0.counter.lock().unwrap();
179			*x += 1;
180		}
181	}
182
183	#[async_trait]
184	impl EventListener<AnotherEvent> for TestEventListener {
185		async fn on(&self, _event: &AnotherEvent) {
186			let mut x = self.0.counter.lock().unwrap();
187			*x *= 2;
188		}
189	}
190
191	#[tokio::test]
192	async fn test_event_bus_new() {
193		let event_bus = EventBus::new();
194		event_bus.emit(TestEvent {}).await;
195	}
196
197	#[tokio::test]
198	async fn test_event_bus_default() {
199		let event_bus = EventBus::default();
200		event_bus.emit(TestEvent {}).await;
201	}
202
203	#[tokio::test]
204	async fn test_register_single_listener() {
205		let event_bus = EventBus::new();
206		let listener = TestEventListener::default();
207
208		event_bus.register::<TestEvent, TestEventListener>(listener.clone()).await;
209		event_bus.emit(TestEvent {}).await;
210		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
211	}
212
213	#[tokio::test]
214	async fn test_emit_unregistered_event() {
215		let event_bus = EventBus::new();
216		event_bus.emit(TestEvent {}).await;
217	}
218
219	#[tokio::test]
220	async fn test_multiple_listeners_same_event() {
221		let event_bus = EventBus::new();
222		let listener1 = TestEventListener::default();
223		let listener2 = TestEventListener::default();
224
225		event_bus.register::<TestEvent, TestEventListener>(listener1.clone()).await;
226		event_bus.register::<TestEvent, TestEventListener>(listener2.clone()).await;
227
228		event_bus.emit(TestEvent {}).await;
229		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
230		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
231	}
232
233	#[tokio::test]
234	async fn test_event_bus_clone() {
235		let event_bus1 = EventBus::new();
236		let listener = TestEventListener::default();
237		event_bus1.register::<TestEvent, TestEventListener>(listener.clone()).await;
238
239		let event_bus2 = event_bus1.clone();
240		event_bus2.emit(TestEvent {}).await;
241		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
242	}
243
244	#[tokio::test]
245	async fn test_concurrent_registration() {
246		let event_bus = Arc::new(EventBus::new());
247		let mut handles = Vec::new();
248
249		for _ in 0..10 {
250			let event_bus = event_bus.clone();
251			handles.push(tokio::spawn(async move {
252				let listener = TestEventListener::default();
253				event_bus.register::<TestEvent, TestEventListener>(listener).await;
254			}));
255		}
256
257		for handle in handles {
258			handle.await.unwrap();
259		}
260
261		event_bus.emit(TestEvent {}).await;
262	}
263
264	#[tokio::test]
265	async fn test_concurrent_emitting() {
266		let event_bus = Arc::new(EventBus::new());
267		let listener = TestEventListener::default();
268		event_bus.register::<TestEvent, TestEventListener>(listener.clone()).await;
269
270		let mut handles = Vec::new();
271
272		for _ in 0..10 {
273			let event_bus = event_bus.clone();
274			handles.push(tokio::spawn(async move {
275				event_bus.emit(TestEvent {}).await;
276			}));
277		}
278
279		for handle in handles {
280			handle.await.unwrap();
281		}
282
283		assert!(*listener.0.counter.lock().unwrap() >= 10);
284	}
285
286	#[derive(Debug, Clone)]
287	pub struct MacroTestEvent {
288		pub value: i32,
289	}
290
291	impl_event!(MacroTestEvent);
292
293	#[test]
294	fn test_impl_event_macro() {
295		let event = MacroTestEvent {
296			value: 42,
297		};
298		let any_ref = event.as_any();
299		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
300		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value, 42);
301	}
302
303	#[tokio::test]
304	async fn test_multi_event_listener() {
305		let event_bus = EventBus::default();
306		let listener = TestEventListener::default();
307
308		event_bus.register::<TestEvent, TestEventListener>(listener.clone()).await;
309		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone()).await;
310
311		// Each event type triggers only its own listeners
312		event_bus.emit(TestEvent {}).await;
313		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
314
315		event_bus.emit(TestEvent {}).await;
316		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
317
318		event_bus.emit(AnotherEvent {}).await;
319		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
320	}
321}