Skip to main content

reifydb_core/event/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync,
8	sync::Arc,
9};
10
11use reifydb_runtime::actor::{
12	context::Context,
13	mailbox::ActorRef,
14	system::ActorSystem,
15	traits::{Actor, Directive},
16};
17use sync::mpsc::Sender;
18
19pub mod flow;
20pub mod lifecycle;
21#[macro_use]
22pub mod r#macro;
23pub mod metric;
24pub mod procedure;
25pub mod store;
26pub mod transaction;
27
28pub trait Event: Any + Send + Sync + Clone + 'static {
29	fn as_any(&self) -> &dyn Any;
30	fn into_any(self) -> Box<dyn Any + Send>;
31}
32
33pub trait EventListener<E>: Send + Sync + 'static
34where
35	E: Event,
36{
37	fn on(&self, event: &E);
38}
39
40trait EventListenerList: Any + Send + Sync {
41	fn on_any(&self, event: Box<dyn Any + Send>);
42	fn as_any_mut(&mut self) -> &mut dyn Any;
43}
44
45struct EventListenerListImpl<E> {
46	listeners: Vec<Arc<dyn EventListener<E>>>,
47}
48
49impl<E> EventListenerListImpl<E>
50where
51	E: Event,
52{
53	fn new() -> Self {
54		Self {
55			listeners: Vec::new(),
56		}
57	}
58
59	fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
60		self.listeners.push(listener);
61	}
62}
63
64impl<E> EventListenerList for EventListenerListImpl<E>
65where
66	E: Event,
67{
68	fn on_any(&self, event: Box<dyn Any + Send>) {
69		if let Ok(event) = event.downcast::<E>() {
70			for listener in &self.listeners {
71				listener.on(&*event);
72			}
73		}
74	}
75
76	fn as_any_mut(&mut self) -> &mut dyn Any {
77		self
78	}
79}
80
81// --- Actor-based EventBus ---
82
83struct EventEnvelope {
84	type_id: TypeId,
85	event: Box<dyn Any + Send>,
86}
87
88enum EventBusMsg {
89	Emit(EventEnvelope),
90	Register {
91		installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>,
92	},
93	WaitForCompletion(Sender<()>),
94}
95
96struct EventBusActor;
97
98impl Actor for EventBusActor {
99	type State = HashMap<TypeId, Box<dyn EventListenerList>>;
100	type Message = EventBusMsg;
101
102	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
103		HashMap::new()
104	}
105
106	fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
107		match msg {
108			EventBusMsg::Emit(envelope) => {
109				if let Some(list) = state.get(&envelope.type_id) {
110					list.on_any(envelope.event);
111				}
112			}
113			EventBusMsg::Register {
114				installer,
115			} => {
116				installer(state);
117			}
118			EventBusMsg::WaitForCompletion(tx) => {
119				let _ = tx.send(());
120			}
121		}
122		Directive::Continue
123	}
124}
125
126#[derive(Clone)]
127pub struct EventBus {
128	actor_ref: ActorRef<EventBusMsg>,
129	_actor_system: ActorSystem,
130}
131
132impl EventBus {
133	pub fn new(actor_system: &ActorSystem) -> Self {
134		let handle = actor_system.spawn("event-bus", EventBusActor);
135		Self {
136			actor_ref: handle.actor_ref().clone(),
137			_actor_system: actor_system.clone(),
138		}
139	}
140
141	pub fn register<E, L>(&self, listener: L)
142	where
143		E: Event,
144		L: EventListener<E>,
145	{
146		let type_id = TypeId::of::<E>();
147		let listener = Arc::new(listener);
148
149		let installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send> =
150			Box::new(move |map| {
151				let list = map
152					.entry(type_id)
153					.or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
154				list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
155			});
156
157		let _ = self.actor_ref.send(EventBusMsg::Register {
158			installer,
159		});
160	}
161
162	pub fn emit<E>(&self, event: E)
163	where
164		E: Event,
165	{
166		let type_id = TypeId::of::<E>();
167		let _ = self.actor_ref.send(EventBusMsg::Emit(EventEnvelope {
168			type_id,
169			event: event.into_any(),
170		}));
171	}
172
173	pub fn wait_for_completion(&self) {
174		let (tx, rx) = sync::mpsc::channel();
175		let _ = self.actor_ref.send(EventBusMsg::WaitForCompletion(tx));
176		let _ = rx.recv();
177	}
178}
179
180#[cfg(test)]
181pub mod tests {
182	use std::{
183		sync::{Arc, Mutex},
184		thread,
185	};
186
187	use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
188
189	use crate::event::{Event, EventBus, EventListener};
190
191	fn test_actor_system() -> ActorSystem {
192		ActorSystem::new(SharedRuntimeConfig::default().actor_system_config())
193	}
194
195	define_event! {
196		pub struct TestEvent{}
197	}
198
199	define_event! {
200		pub struct AnotherEvent{}
201	}
202
203	#[derive(Default, Debug, Clone)]
204	pub struct TestEventListener(Arc<TestHandlerInner>);
205
206	#[derive(Default, Debug)]
207	pub struct TestHandlerInner {
208		pub counter: Arc<Mutex<i32>>,
209	}
210
211	impl EventListener<TestEvent> for TestEventListener {
212		fn on(&self, _event: &TestEvent) {
213			let mut x = self.0.counter.lock().unwrap();
214			*x += 1;
215		}
216	}
217
218	impl EventListener<AnotherEvent> for TestEventListener {
219		fn on(&self, _event: &AnotherEvent) {
220			let mut x = self.0.counter.lock().unwrap();
221			*x *= 2;
222		}
223	}
224
225	#[test]
226	fn test_event_bus_new() {
227		let actor_system = test_actor_system();
228		let event_bus = EventBus::new(&actor_system);
229		event_bus.emit(TestEvent::new());
230		event_bus.wait_for_completion();
231	}
232
233	#[test]
234	fn test_register_single_listener() {
235		let actor_system = test_actor_system();
236		let event_bus = EventBus::new(&actor_system);
237		let listener = TestEventListener::default();
238
239		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
240		event_bus.emit(TestEvent::new());
241		event_bus.wait_for_completion();
242		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
243	}
244
245	#[test]
246	fn test_emit_unregistered_event() {
247		let actor_system = test_actor_system();
248		let event_bus = EventBus::new(&actor_system);
249		event_bus.emit(TestEvent::new());
250		event_bus.wait_for_completion();
251	}
252
253	#[test]
254	fn test_multiple_listeners_same_event() {
255		let actor_system = test_actor_system();
256		let event_bus = EventBus::new(&actor_system);
257		let listener1 = TestEventListener::default();
258		let listener2 = TestEventListener::default();
259
260		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
261		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
262
263		event_bus.emit(TestEvent::new());
264		event_bus.wait_for_completion();
265		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
266		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
267	}
268
269	#[test]
270	fn test_event_bus_clone() {
271		let actor_system = test_actor_system();
272		let event_bus1 = EventBus::new(&actor_system);
273		let listener = TestEventListener::default();
274		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
275
276		let event_bus2 = event_bus1.clone();
277		event_bus2.emit(TestEvent::new());
278		event_bus2.wait_for_completion();
279		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
280	}
281
282	#[test]
283	fn test_concurrent_registration() {
284		let actor_system = test_actor_system();
285		let event_bus = Arc::new(EventBus::new(&actor_system));
286		let mut handles = Vec::new();
287
288		for _ in 0..10 {
289			let event_bus = event_bus.clone();
290			handles.push(thread::spawn(move || {
291				let listener = TestEventListener::default();
292				event_bus.register::<TestEvent, TestEventListener>(listener);
293			}));
294		}
295
296		for handle in handles {
297			handle.join().unwrap();
298		}
299
300		event_bus.emit(TestEvent::new());
301		event_bus.wait_for_completion();
302	}
303
304	#[test]
305	fn test_concurrent_emitting() {
306		let actor_system = test_actor_system();
307		let event_bus = Arc::new(EventBus::new(&actor_system));
308		let listener = TestEventListener::default();
309		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
310		event_bus.wait_for_completion();
311
312		let mut handles = Vec::new();
313
314		for _ in 0..10 {
315			let event_bus = event_bus.clone();
316			handles.push(thread::spawn(move || {
317				event_bus.emit(TestEvent::new());
318			}));
319		}
320
321		for handle in handles {
322			handle.join().unwrap();
323		}
324
325		event_bus.wait_for_completion();
326		assert_eq!(*listener.0.counter.lock().unwrap(), 10);
327	}
328
329	define_event! {
330		pub struct MacroTestEvent {
331			pub value: i32,
332		}
333	}
334
335	#[test]
336	fn test_define_event_macro() {
337		let event = MacroTestEvent::new(42);
338		let any_ref = event.as_any();
339		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
340		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
341	}
342
343	#[test]
344	fn test_multi_event_listener() {
345		let actor_system = test_actor_system();
346		let event_bus = EventBus::new(&actor_system);
347		let listener = TestEventListener::default();
348
349		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
350		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
351
352		// Each event type triggers only its own listeners
353		event_bus.emit(TestEvent::new());
354		event_bus.wait_for_completion();
355		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
356
357		event_bus.emit(TestEvent::new());
358		event_bus.wait_for_completion();
359		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
360
361		event_bus.emit(AnotherEvent::new());
362		event_bus.wait_for_completion();
363		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
364	}
365}