Skip to main content

reifydb_core/event/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync,
8	sync::Arc,
9};
10
11use reifydb_runtime::actor::{
12	context::Context,
13	mailbox::ActorRef,
14	system::ActorSystem,
15	traits::{Actor, Directive},
16};
17use sync::mpsc::Sender;
18
19pub mod flow;
20pub mod lifecycle;
21#[macro_use]
22pub mod r#macro;
23pub mod metric;
24pub mod procedure;
25pub mod store;
26pub mod transaction;
27
28type EventListenerInstaller = Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>;
29
30pub trait Event: Any + Send + Sync + Clone + 'static {
31	fn as_any(&self) -> &dyn Any;
32	fn into_any(self) -> Box<dyn Any + Send>;
33}
34
35pub trait EventListener<E>: Send + Sync + 'static
36where
37	E: Event,
38{
39	fn on(&self, event: &E);
40}
41
42trait EventListenerList: Any + Send + Sync {
43	fn on_any(&self, event: Box<dyn Any + Send>);
44	fn as_any_mut(&mut self) -> &mut dyn Any;
45}
46
47struct EventListenerListImpl<E> {
48	listeners: Vec<Arc<dyn EventListener<E>>>,
49}
50
51impl<E> EventListenerListImpl<E>
52where
53	E: Event,
54{
55	fn new() -> Self {
56		Self {
57			listeners: Vec::new(),
58		}
59	}
60
61	fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
62		self.listeners.push(listener);
63	}
64}
65
66impl<E> EventListenerList for EventListenerListImpl<E>
67where
68	E: Event,
69{
70	fn on_any(&self, event: Box<dyn Any + Send>) {
71		if let Ok(event) = event.downcast::<E>() {
72			for listener in &self.listeners {
73				listener.on(&*event);
74			}
75		}
76	}
77
78	fn as_any_mut(&mut self) -> &mut dyn Any {
79		self
80	}
81}
82
83struct EventEnvelope {
84	type_id: TypeId,
85	event: Box<dyn Any + Send>,
86}
87
88enum EventBusMsg {
89	Emit(EventEnvelope),
90	Register {
91		installer: EventListenerInstaller,
92	},
93	WaitForCompletion(Sender<()>),
94}
95
96struct EventBusActor;
97
98impl Actor for EventBusActor {
99	type State = HashMap<TypeId, Box<dyn EventListenerList>>;
100	type Message = EventBusMsg;
101
102	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
103		HashMap::new()
104	}
105
106	fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
107		match msg {
108			EventBusMsg::Emit(envelope) => {
109				if let Some(list) = state.get(&envelope.type_id) {
110					list.on_any(envelope.event);
111				}
112			}
113			EventBusMsg::Register {
114				installer,
115			} => {
116				installer(state);
117			}
118			EventBusMsg::WaitForCompletion(tx) => {
119				let _ = tx.send(());
120			}
121		}
122		Directive::Continue
123	}
124}
125
126#[derive(Clone)]
127pub struct EventBus {
128	actor_ref: ActorRef<EventBusMsg>,
129	_actor_system: ActorSystem,
130}
131
132impl EventBus {
133	pub fn new(actor_system: &ActorSystem) -> Self {
134		let handle = actor_system.spawn("event-bus", EventBusActor);
135		Self {
136			actor_ref: handle.actor_ref().clone(),
137			_actor_system: actor_system.clone(),
138		}
139	}
140
141	pub fn register<E, L>(&self, listener: L)
142	where
143		E: Event,
144		L: EventListener<E>,
145	{
146		let type_id = TypeId::of::<E>();
147		let listener = Arc::new(listener);
148
149		let installer: EventListenerInstaller = Box::new(move |map| {
150			let list = map.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
151			list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
152		});
153
154		let _ = self.actor_ref.send(EventBusMsg::Register {
155			installer,
156		});
157	}
158
159	pub fn emit<E>(&self, event: E)
160	where
161		E: Event,
162	{
163		let type_id = TypeId::of::<E>();
164		let _ = self.actor_ref.send(EventBusMsg::Emit(EventEnvelope {
165			type_id,
166			event: event.into_any(),
167		}));
168	}
169
170	pub fn wait_for_completion(&self) {
171		let (tx, rx) = sync::mpsc::channel();
172		let _ = self.actor_ref.send(EventBusMsg::WaitForCompletion(tx));
173		let _ = rx.recv();
174	}
175}
176
177#[cfg(test)]
178pub mod tests {
179	use std::{
180		sync::{Arc, Mutex},
181		thread,
182	};
183
184	use reifydb_runtime::actor::system::ActorSystem;
185
186	use crate::event::{Event, EventBus, EventListener};
187
188	fn test_actor_system() -> ActorSystem {
189		ActorSystem::new(1)
190	}
191
192	define_event! {
193		pub struct TestEvent{}
194	}
195
196	define_event! {
197		pub struct AnotherEvent{}
198	}
199
200	#[derive(Default, Debug, Clone)]
201	pub struct TestEventListener(Arc<TestHandlerInner>);
202
203	#[derive(Default, Debug)]
204	pub struct TestHandlerInner {
205		pub counter: Arc<Mutex<i32>>,
206	}
207
208	impl EventListener<TestEvent> for TestEventListener {
209		fn on(&self, _event: &TestEvent) {
210			let mut x = self.0.counter.lock().unwrap();
211			*x += 1;
212		}
213	}
214
215	impl EventListener<AnotherEvent> for TestEventListener {
216		fn on(&self, _event: &AnotherEvent) {
217			let mut x = self.0.counter.lock().unwrap();
218			*x *= 2;
219		}
220	}
221
222	#[test]
223	fn test_event_bus_new() {
224		let actor_system = test_actor_system();
225		let event_bus = EventBus::new(&actor_system);
226		event_bus.emit(TestEvent::new());
227		event_bus.wait_for_completion();
228	}
229
230	#[test]
231	fn test_register_single_listener() {
232		let actor_system = test_actor_system();
233		let event_bus = EventBus::new(&actor_system);
234		let listener = TestEventListener::default();
235
236		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
237		event_bus.emit(TestEvent::new());
238		event_bus.wait_for_completion();
239		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
240	}
241
242	#[test]
243	fn test_emit_unregistered_event() {
244		let actor_system = test_actor_system();
245		let event_bus = EventBus::new(&actor_system);
246		event_bus.emit(TestEvent::new());
247		event_bus.wait_for_completion();
248	}
249
250	#[test]
251	fn test_multiple_listeners_same_event() {
252		let actor_system = test_actor_system();
253		let event_bus = EventBus::new(&actor_system);
254		let listener1 = TestEventListener::default();
255		let listener2 = TestEventListener::default();
256
257		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
258		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
259
260		event_bus.emit(TestEvent::new());
261		event_bus.wait_for_completion();
262		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
263		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
264	}
265
266	#[test]
267	fn test_event_bus_clone() {
268		let actor_system = test_actor_system();
269		let event_bus1 = EventBus::new(&actor_system);
270		let listener = TestEventListener::default();
271		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
272
273		let event_bus2 = event_bus1.clone();
274		event_bus2.emit(TestEvent::new());
275		event_bus2.wait_for_completion();
276		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
277	}
278
279	#[test]
280	fn test_concurrent_registration() {
281		let actor_system = test_actor_system();
282		let event_bus = Arc::new(EventBus::new(&actor_system));
283		let mut handles = Vec::new();
284
285		for _ in 0..10 {
286			let event_bus = event_bus.clone();
287			handles.push(thread::spawn(move || {
288				let listener = TestEventListener::default();
289				event_bus.register::<TestEvent, TestEventListener>(listener);
290			}));
291		}
292
293		for handle in handles {
294			handle.join().unwrap();
295		}
296
297		event_bus.emit(TestEvent::new());
298		event_bus.wait_for_completion();
299	}
300
301	#[test]
302	fn test_concurrent_emitting() {
303		let actor_system = test_actor_system();
304		let event_bus = Arc::new(EventBus::new(&actor_system));
305		let listener = TestEventListener::default();
306		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
307		event_bus.wait_for_completion();
308
309		let mut handles = Vec::new();
310
311		for _ in 0..10 {
312			let event_bus = event_bus.clone();
313			handles.push(thread::spawn(move || {
314				event_bus.emit(TestEvent::new());
315			}));
316		}
317
318		for handle in handles {
319			handle.join().unwrap();
320		}
321
322		event_bus.wait_for_completion();
323		assert_eq!(*listener.0.counter.lock().unwrap(), 10);
324	}
325
326	define_event! {
327		pub struct MacroTestEvent {
328			pub value: i32,
329		}
330	}
331
332	#[test]
333	fn testine_event_macro() {
334		let event = MacroTestEvent::new(42);
335		let any_ref = event.as_any();
336		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
337		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
338	}
339
340	#[test]
341	fn test_multi_event_listener() {
342		let actor_system = test_actor_system();
343		let event_bus = EventBus::new(&actor_system);
344		let listener = TestEventListener::default();
345
346		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
347		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
348
349		// Each event type triggers only its own listeners
350		event_bus.emit(TestEvent::new());
351		event_bus.wait_for_completion();
352		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
353
354		event_bus.emit(TestEvent::new());
355		event_bus.wait_for_completion();
356		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
357
358		event_bus.emit(AnotherEvent::new());
359		event_bus.wait_for_completion();
360		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
361	}
362}