Skip to main content

reifydb_core/event/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync,
8	sync::Arc,
9};
10
11use reifydb_runtime::actor::{
12	context::Context,
13	mailbox::ActorRef,
14	system::ActorSystem,
15	traits::{Actor, Directive},
16};
17use sync::mpsc::Sender;
18
19pub mod flow;
20pub mod lifecycle;
21#[macro_use]
22pub mod r#macro;
23pub mod metric;
24pub mod procedure;
25pub mod store;
26pub mod transaction;
27
28pub trait Event: Any + Send + Sync + Clone + 'static {
29	fn as_any(&self) -> &dyn Any;
30	fn into_any(self) -> Box<dyn Any + Send>;
31}
32
33pub trait EventListener<E>: Send + Sync + 'static
34where
35	E: Event,
36{
37	fn on(&self, event: &E);
38}
39
40trait EventListenerList: Any + Send + Sync {
41	fn on_any(&self, event: Box<dyn Any + Send>);
42	fn as_any_mut(&mut self) -> &mut dyn Any;
43}
44
45struct EventListenerListImpl<E> {
46	listeners: Vec<Arc<dyn EventListener<E>>>,
47}
48
49impl<E> EventListenerListImpl<E>
50where
51	E: Event,
52{
53	fn new() -> Self {
54		Self {
55			listeners: Vec::new(),
56		}
57	}
58
59	fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
60		self.listeners.push(listener);
61	}
62}
63
64impl<E> EventListenerList for EventListenerListImpl<E>
65where
66	E: Event,
67{
68	fn on_any(&self, event: Box<dyn Any + Send>) {
69		if let Ok(event) = event.downcast::<E>() {
70			for listener in &self.listeners {
71				listener.on(&*event);
72			}
73		}
74	}
75
76	fn as_any_mut(&mut self) -> &mut dyn Any {
77		self
78	}
79}
80
81struct EventEnvelope {
82	type_id: TypeId,
83	event: Box<dyn Any + Send>,
84}
85
86enum EventBusMsg {
87	Emit(EventEnvelope),
88	Register {
89		installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>,
90	},
91	WaitForCompletion(Sender<()>),
92}
93
94struct EventBusActor;
95
96impl Actor for EventBusActor {
97	type State = HashMap<TypeId, Box<dyn EventListenerList>>;
98	type Message = EventBusMsg;
99
100	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
101		HashMap::new()
102	}
103
104	fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
105		match msg {
106			EventBusMsg::Emit(envelope) => {
107				if let Some(list) = state.get(&envelope.type_id) {
108					list.on_any(envelope.event);
109				}
110			}
111			EventBusMsg::Register {
112				installer,
113			} => {
114				installer(state);
115			}
116			EventBusMsg::WaitForCompletion(tx) => {
117				let _ = tx.send(());
118			}
119		}
120		Directive::Continue
121	}
122}
123
124#[derive(Clone)]
125pub struct EventBus {
126	actor_ref: ActorRef<EventBusMsg>,
127	_actor_system: ActorSystem,
128}
129
130impl EventBus {
131	pub fn new(actor_system: &ActorSystem) -> Self {
132		let handle = actor_system.spawn("event-bus", EventBusActor);
133		Self {
134			actor_ref: handle.actor_ref().clone(),
135			_actor_system: actor_system.clone(),
136		}
137	}
138
139	pub fn register<E, L>(&self, listener: L)
140	where
141		E: Event,
142		L: EventListener<E>,
143	{
144		let type_id = TypeId::of::<E>();
145		let listener = Arc::new(listener);
146
147		let installer: Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send> =
148			Box::new(move |map| {
149				let list = map
150					.entry(type_id)
151					.or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
152				list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
153			});
154
155		let _ = self.actor_ref.send(EventBusMsg::Register {
156			installer,
157		});
158	}
159
160	pub fn emit<E>(&self, event: E)
161	where
162		E: Event,
163	{
164		let type_id = TypeId::of::<E>();
165		let _ = self.actor_ref.send(EventBusMsg::Emit(EventEnvelope {
166			type_id,
167			event: event.into_any(),
168		}));
169	}
170
171	pub fn wait_for_completion(&self) {
172		let (tx, rx) = sync::mpsc::channel();
173		let _ = self.actor_ref.send(EventBusMsg::WaitForCompletion(tx));
174		let _ = rx.recv();
175	}
176}
177
178#[cfg(test)]
179pub mod tests {
180	use std::{
181		sync::{Arc, Mutex},
182		thread,
183	};
184
185	use reifydb_runtime::{SharedRuntimeConfig, actor::system::ActorSystem};
186
187	use crate::event::{Event, EventBus, EventListener};
188
189	fn test_actor_system() -> ActorSystem {
190		ActorSystem::new(SharedRuntimeConfig::default().actor_system_config())
191	}
192
193	define_event! {
194		pub struct TestEvent{}
195	}
196
197	define_event! {
198		pub struct AnotherEvent{}
199	}
200
201	#[derive(Default, Debug, Clone)]
202	pub struct TestEventListener(Arc<TestHandlerInner>);
203
204	#[derive(Default, Debug)]
205	pub struct TestHandlerInner {
206		pub counter: Arc<Mutex<i32>>,
207	}
208
209	impl EventListener<TestEvent> for TestEventListener {
210		fn on(&self, _event: &TestEvent) {
211			let mut x = self.0.counter.lock().unwrap();
212			*x += 1;
213		}
214	}
215
216	impl EventListener<AnotherEvent> for TestEventListener {
217		fn on(&self, _event: &AnotherEvent) {
218			let mut x = self.0.counter.lock().unwrap();
219			*x *= 2;
220		}
221	}
222
223	#[test]
224	fn test_event_bus_new() {
225		let actor_system = test_actor_system();
226		let event_bus = EventBus::new(&actor_system);
227		event_bus.emit(TestEvent::new());
228		event_bus.wait_for_completion();
229	}
230
231	#[test]
232	fn test_register_single_listener() {
233		let actor_system = test_actor_system();
234		let event_bus = EventBus::new(&actor_system);
235		let listener = TestEventListener::default();
236
237		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
238		event_bus.emit(TestEvent::new());
239		event_bus.wait_for_completion();
240		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
241	}
242
243	#[test]
244	fn test_emit_unregistered_event() {
245		let actor_system = test_actor_system();
246		let event_bus = EventBus::new(&actor_system);
247		event_bus.emit(TestEvent::new());
248		event_bus.wait_for_completion();
249	}
250
251	#[test]
252	fn test_multiple_listeners_same_event() {
253		let actor_system = test_actor_system();
254		let event_bus = EventBus::new(&actor_system);
255		let listener1 = TestEventListener::default();
256		let listener2 = TestEventListener::default();
257
258		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
259		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
260
261		event_bus.emit(TestEvent::new());
262		event_bus.wait_for_completion();
263		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
264		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
265	}
266
267	#[test]
268	fn test_event_bus_clone() {
269		let actor_system = test_actor_system();
270		let event_bus1 = EventBus::new(&actor_system);
271		let listener = TestEventListener::default();
272		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
273
274		let event_bus2 = event_bus1.clone();
275		event_bus2.emit(TestEvent::new());
276		event_bus2.wait_for_completion();
277		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
278	}
279
280	#[test]
281	fn test_concurrent_registration() {
282		let actor_system = test_actor_system();
283		let event_bus = Arc::new(EventBus::new(&actor_system));
284		let mut handles = Vec::new();
285
286		for _ in 0..10 {
287			let event_bus = event_bus.clone();
288			handles.push(thread::spawn(move || {
289				let listener = TestEventListener::default();
290				event_bus.register::<TestEvent, TestEventListener>(listener);
291			}));
292		}
293
294		for handle in handles {
295			handle.join().unwrap();
296		}
297
298		event_bus.emit(TestEvent::new());
299		event_bus.wait_for_completion();
300	}
301
302	#[test]
303	fn test_concurrent_emitting() {
304		let actor_system = test_actor_system();
305		let event_bus = Arc::new(EventBus::new(&actor_system));
306		let listener = TestEventListener::default();
307		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
308		event_bus.wait_for_completion();
309
310		let mut handles = Vec::new();
311
312		for _ in 0..10 {
313			let event_bus = event_bus.clone();
314			handles.push(thread::spawn(move || {
315				event_bus.emit(TestEvent::new());
316			}));
317		}
318
319		for handle in handles {
320			handle.join().unwrap();
321		}
322
323		event_bus.wait_for_completion();
324		assert_eq!(*listener.0.counter.lock().unwrap(), 10);
325	}
326
327	define_event! {
328		pub struct MacroTestEvent {
329			pub value: i32,
330		}
331	}
332
333	#[test]
334	fn testine_event_macro() {
335		let event = MacroTestEvent::new(42);
336		let any_ref = event.as_any();
337		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
338		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
339	}
340
341	#[test]
342	fn test_multi_event_listener() {
343		let actor_system = test_actor_system();
344		let event_bus = EventBus::new(&actor_system);
345		let listener = TestEventListener::default();
346
347		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
348		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
349
350		// Each event type triggers only its own listeners
351		event_bus.emit(TestEvent::new());
352		event_bus.wait_for_completion();
353		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
354
355		event_bus.emit(TestEvent::new());
356		event_bus.wait_for_completion();
357		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
358
359		event_bus.emit(AnotherEvent::new());
360		event_bus.wait_for_completion();
361		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
362	}
363}