Skip to main content

reifydb_core/event/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync::Arc,
8};
9
10use reifydb_runtime::actor::{
11	context::Context,
12	mailbox::ActorRef,
13	system::ActorSystem,
14	traits::{Actor, Directive},
15};
16
17pub mod flow;
18pub mod lifecycle;
19#[macro_use]
20pub mod r#macro;
21pub mod metric;
22pub mod store;
23pub mod transaction;
24
25pub trait Event: Any + Send + Sync + Clone + 'static {
26	fn as_any(&self) -> &dyn Any;
27	fn into_any(self) -> Box<dyn Any + Send>;
28}
29
30pub trait EventListener<E>: Send + Sync + 'static
31where
32	E: Event,
33{
34	fn on(&self, event: &E);
35}
36
37trait EventListenerList: Any + Send + Sync {
38	fn on_any(&self, event: Box<dyn Any + Send>);
39	fn as_any_mut(&mut self) -> &mut dyn Any;
40}
41
42struct EventListenerListImpl<E> {
43	listeners: Vec<Arc<dyn EventListener<E>>>,
44}
45
46impl<E> EventListenerListImpl<E>
47where
48	E: Event,
49{
50	fn new() -> Self {
51		Self {
52			listeners: Vec::new(),
53		}
54	}
55
56	fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
57		self.listeners.push(listener);
58	}
59}
60
61impl<E> EventListenerList for EventListenerListImpl<E>
62where
63	E: Event,
64{
65	fn on_any(&self, event: Box<dyn Any + Send>) {
66		if let Ok(event) = event.downcast::<E>() {
67			for listener in &self.listeners {
68				listener.on(&*event);
69			}
70		}
71	}
72
73	fn as_any_mut(&mut self) -> &mut dyn Any {
74		self
75	}
76}
77
78// --- Actor-based EventBus ---
79
80struct EventEnvelope {
81	type_id: TypeId,
82	event: Box<dyn Any + Send>,
83}
84
85enum EventBusMsg {
86	Emit(EventEnvelope),
87	Register {
88		installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>,
89	},
90	WaitForCompletion(std::sync::mpsc::Sender<()>),
91}
92
93struct EventBusActor;
94
95impl Actor for EventBusActor {
96	type State = HashMap<TypeId, Box<dyn EventListenerList>>;
97	type Message = EventBusMsg;
98
99	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
100		HashMap::new()
101	}
102
103	fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
104		match msg {
105			EventBusMsg::Emit(envelope) => {
106				if let Some(list) = state.get(&envelope.type_id) {
107					list.on_any(envelope.event);
108				}
109			}
110			EventBusMsg::Register {
111				installer,
112			} => {
113				installer(state);
114			}
115			EventBusMsg::WaitForCompletion(tx) => {
116				let _ = tx.send(());
117			}
118		}
119		Directive::Continue
120	}
121}
122
123#[derive(Clone)]
124pub struct EventBus {
125	actor_ref: ActorRef<EventBusMsg>,
126	_actor_system: ActorSystem,
127}
128
129impl EventBus {
130	pub fn new(actor_system: &ActorSystem) -> Self {
131		let handle = actor_system.spawn("event-bus", EventBusActor);
132		Self {
133			actor_ref: handle.actor_ref().clone(),
134			_actor_system: actor_system.clone(),
135		}
136	}
137
138	pub fn register<E, L>(&self, listener: L)
139	where
140		E: Event,
141		L: EventListener<E>,
142	{
143		let type_id = TypeId::of::<E>();
144		let listener = Arc::new(listener);
145
146		let installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send> =
147			Box::new(move |map| {
148				let list = map
149					.entry(type_id)
150					.or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
151				list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
152			});
153
154		let _ = self.actor_ref.send(EventBusMsg::Register {
155			installer,
156		});
157	}
158
159	pub fn emit<E>(&self, event: E)
160	where
161		E: Event,
162	{
163		let type_id = TypeId::of::<E>();
164		let _ = self.actor_ref.send(EventBusMsg::Emit(EventEnvelope {
165			type_id,
166			event: event.into_any(),
167		}));
168	}
169
170	pub fn wait_for_completion(&self) {
171		let (tx, rx) = std::sync::mpsc::channel();
172		let _ = self.actor_ref.send(EventBusMsg::WaitForCompletion(tx));
173		let _ = rx.recv();
174	}
175}
176
177#[cfg(test)]
178pub mod tests {
179	use std::sync::{Arc, Mutex};
180
181	use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
182
183	use crate::event::{Event, EventBus, EventListener};
184
185	fn test_actor_system() -> ActorSystem {
186		ActorSystem::new(SharedRuntimeConfig::default().actor_system_config())
187	}
188
189	define_event! {
190		pub struct TestEvent{}
191	}
192
193	define_event! {
194		pub struct AnotherEvent{}
195	}
196
197	#[derive(Default, Debug, Clone)]
198	pub struct TestEventListener(Arc<TestHandlerInner>);
199
200	#[derive(Default, Debug)]
201	pub struct TestHandlerInner {
202		pub counter: Arc<Mutex<i32>>,
203	}
204
205	impl EventListener<TestEvent> for TestEventListener {
206		fn on(&self, _event: &TestEvent) {
207			let mut x = self.0.counter.lock().unwrap();
208			*x += 1;
209		}
210	}
211
212	impl EventListener<AnotherEvent> for TestEventListener {
213		fn on(&self, _event: &AnotherEvent) {
214			let mut x = self.0.counter.lock().unwrap();
215			*x *= 2;
216		}
217	}
218
219	#[test]
220	fn test_event_bus_new() {
221		let actor_system = test_actor_system();
222		let event_bus = EventBus::new(&actor_system);
223		event_bus.emit(TestEvent::new());
224		event_bus.wait_for_completion();
225	}
226
227	#[test]
228	fn test_register_single_listener() {
229		let actor_system = test_actor_system();
230		let event_bus = EventBus::new(&actor_system);
231		let listener = TestEventListener::default();
232
233		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
234		event_bus.emit(TestEvent::new());
235		event_bus.wait_for_completion();
236		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
237	}
238
239	#[test]
240	fn test_emit_unregistered_event() {
241		let actor_system = test_actor_system();
242		let event_bus = EventBus::new(&actor_system);
243		event_bus.emit(TestEvent::new());
244		event_bus.wait_for_completion();
245	}
246
247	#[test]
248	fn test_multiple_listeners_same_event() {
249		let actor_system = test_actor_system();
250		let event_bus = EventBus::new(&actor_system);
251		let listener1 = TestEventListener::default();
252		let listener2 = TestEventListener::default();
253
254		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
255		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
256
257		event_bus.emit(TestEvent::new());
258		event_bus.wait_for_completion();
259		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
260		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
261	}
262
263	#[test]
264	fn test_event_bus_clone() {
265		let actor_system = test_actor_system();
266		let event_bus1 = EventBus::new(&actor_system);
267		let listener = TestEventListener::default();
268		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
269
270		let event_bus2 = event_bus1.clone();
271		event_bus2.emit(TestEvent::new());
272		event_bus2.wait_for_completion();
273		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
274	}
275
276	#[test]
277	fn test_concurrent_registration() {
278		let actor_system = test_actor_system();
279		let event_bus = Arc::new(EventBus::new(&actor_system));
280		let mut handles = Vec::new();
281
282		for _ in 0..10 {
283			let event_bus = event_bus.clone();
284			handles.push(std::thread::spawn(move || {
285				let listener = TestEventListener::default();
286				event_bus.register::<TestEvent, TestEventListener>(listener);
287			}));
288		}
289
290		for handle in handles {
291			handle.join().unwrap();
292		}
293
294		event_bus.emit(TestEvent::new());
295		event_bus.wait_for_completion();
296	}
297
298	#[test]
299	fn test_concurrent_emitting() {
300		let actor_system = test_actor_system();
301		let event_bus = Arc::new(EventBus::new(&actor_system));
302		let listener = TestEventListener::default();
303		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
304		event_bus.wait_for_completion();
305
306		let mut handles = Vec::new();
307
308		for _ in 0..10 {
309			let event_bus = event_bus.clone();
310			handles.push(std::thread::spawn(move || {
311				event_bus.emit(TestEvent::new());
312			}));
313		}
314
315		for handle in handles {
316			handle.join().unwrap();
317		}
318
319		event_bus.wait_for_completion();
320		assert_eq!(*listener.0.counter.lock().unwrap(), 10);
321	}
322
323	define_event! {
324		pub struct MacroTestEvent {
325			pub value: i32,
326		}
327	}
328
329	#[test]
330	fn test_define_event_macro() {
331		let event = MacroTestEvent::new(42);
332		let any_ref = event.as_any();
333		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
334		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
335	}
336
337	#[test]
338	fn test_multi_event_listener() {
339		let actor_system = test_actor_system();
340		let event_bus = EventBus::new(&actor_system);
341		let listener = TestEventListener::default();
342
343		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
344		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
345
346		// Each event type triggers only its own listeners
347		event_bus.emit(TestEvent::new());
348		event_bus.wait_for_completion();
349		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
350
351		event_bus.emit(TestEvent::new());
352		event_bus.wait_for_completion();
353		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
354
355		event_bus.emit(AnotherEvent::new());
356		event_bus.wait_for_completion();
357		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
358	}
359}