Skip to main content

reifydb_core/event/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Typed in-process event bus connecting the workspace's actors and subsystems.
5//!
6//! The `Event` trait marks any `Send + Sync + Clone + 'static` value that can be published; `EventListener<E>`
7//! registers a callback for one event type. Listener lists are keyed on Rust `TypeId`, so dispatch is purely by static
8//! type identity. Submodules group the standard event families: lifecycle (startup and shutdown), transaction, store,
9//! row, flow, procedure, and metric.
10//!
11//! Invariant: dispatch keys on Rust `TypeId`, not on type name. Two distinct types with the same name in different
12//! crates dispatch to disjoint listener lists; renaming an event type in one crate without renaming it in publishers
13//! and subscribers silently breaks delivery without a compile error at the bus call sites.
14
15use std::{
16	any::{Any, TypeId},
17	collections::HashMap,
18	sync,
19	sync::Arc,
20};
21
22use reifydb_runtime::actor::{
23	context::Context,
24	mailbox::ActorRef,
25	system::ActorSystem,
26	traits::{Actor, Directive},
27};
28use sync::mpsc::Sender;
29
30pub mod flow;
31pub mod lifecycle;
32#[macro_use]
33pub mod r#macro;
34pub mod metric;
35pub mod procedure;
36pub mod row;
37pub mod store;
38pub mod transaction;
39
40type EventListenerInstaller = Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>;
41
42pub trait Event: Any + Send + Sync + Clone + 'static {
43	fn as_any(&self) -> &dyn Any;
44	fn into_any(self) -> Box<dyn Any + Send>;
45}
46
47pub trait EventListener<E>: Send + Sync + 'static
48where
49	E: Event,
50{
51	fn on(&self, event: &E);
52}
53
54trait EventListenerList: Any + Send + Sync {
55	fn on_any(&self, event: Box<dyn Any + Send>);
56	fn as_any_mut(&mut self) -> &mut dyn Any;
57}
58
59struct EventListenerListImpl<E> {
60	listeners: Vec<Arc<dyn EventListener<E>>>,
61}
62
63impl<E> EventListenerListImpl<E>
64where
65	E: Event,
66{
67	fn new() -> Self {
68		Self {
69			listeners: Vec::new(),
70		}
71	}
72
73	fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
74		self.listeners.push(listener);
75	}
76}
77
78impl<E> EventListenerList for EventListenerListImpl<E>
79where
80	E: Event,
81{
82	fn on_any(&self, event: Box<dyn Any + Send>) {
83		if let Ok(event) = event.downcast::<E>() {
84			for listener in &self.listeners {
85				listener.on(&*event);
86			}
87		}
88	}
89
90	fn as_any_mut(&mut self) -> &mut dyn Any {
91		self
92	}
93}
94
95struct EventEnvelope {
96	type_id: TypeId,
97	event: Box<dyn Any + Send>,
98}
99
100enum EventBusMessage {
101	Emit(EventEnvelope),
102	Register {
103		installer: EventListenerInstaller,
104	},
105	WaitForCompletion(Sender<()>),
106}
107
108struct EventBusActor;
109
110impl Actor for EventBusActor {
111	type State = HashMap<TypeId, Box<dyn EventListenerList>>;
112	type Message = EventBusMessage;
113
114	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
115		HashMap::new()
116	}
117
118	fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
119		match msg {
120			EventBusMessage::Emit(envelope) => {
121				if let Some(list) = state.get(&envelope.type_id) {
122					list.on_any(envelope.event);
123				}
124			}
125			EventBusMessage::Register {
126				installer,
127			} => {
128				installer(state);
129			}
130			EventBusMessage::WaitForCompletion(tx) => {
131				let _ = tx.send(());
132			}
133		}
134		Directive::Continue
135	}
136}
137
138#[derive(Clone)]
139pub struct EventBus {
140	actor_ref: ActorRef<EventBusMessage>,
141	_actor_system: ActorSystem,
142}
143
144impl EventBus {
145	pub fn new(actor_system: &ActorSystem) -> Self {
146		let handle = actor_system.spawn_system("event-bus", EventBusActor);
147		Self {
148			actor_ref: handle.actor_ref().clone(),
149			_actor_system: actor_system.clone(),
150		}
151	}
152
153	pub fn register<E, L>(&self, listener: L)
154	where
155		E: Event,
156		L: EventListener<E>,
157	{
158		let type_id = TypeId::of::<E>();
159		let listener = Arc::new(listener);
160
161		let installer: EventListenerInstaller = Box::new(move |map| {
162			let list = map.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
163			list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
164		});
165
166		let _ = self.actor_ref.send(EventBusMessage::Register {
167			installer,
168		});
169	}
170
171	pub fn emit<E>(&self, event: E)
172	where
173		E: Event,
174	{
175		let type_id = TypeId::of::<E>();
176		let _ = self.actor_ref.send(EventBusMessage::Emit(EventEnvelope {
177			type_id,
178			event: event.into_any(),
179		}));
180	}
181
182	pub fn wait_for_completion(&self) {
183		let (tx, rx) = sync::mpsc::channel();
184		let _ = self.actor_ref.send(EventBusMessage::WaitForCompletion(tx));
185		let _ = rx.recv();
186	}
187}
188
189#[cfg(test)]
190pub mod tests {
191	use std::{
192		sync::{Arc, Mutex},
193		thread,
194	};
195
196	use reifydb_runtime::{
197		actor::system::ActorSystem,
198		context::clock::Clock,
199		pool::{PoolConfig, Pools},
200	};
201
202	use crate::event::{Event, EventBus, EventListener};
203
204	fn test_actor_system() -> ActorSystem {
205		let pools = Pools::new(PoolConfig::default());
206		ActorSystem::new(pools, Clock::Real)
207	}
208
209	define_event! {
210		pub struct TestEvent{}
211	}
212
213	define_event! {
214		pub struct AnotherEvent{}
215	}
216
217	#[derive(Default, Debug, Clone)]
218	pub struct TestEventListener(Arc<TestHandlerInner>);
219
220	#[derive(Default, Debug)]
221	pub struct TestHandlerInner {
222		pub counter: Arc<Mutex<i32>>,
223	}
224
225	impl EventListener<TestEvent> for TestEventListener {
226		fn on(&self, _event: &TestEvent) {
227			let mut x = self.0.counter.lock().unwrap();
228			*x += 1;
229		}
230	}
231
232	impl EventListener<AnotherEvent> for TestEventListener {
233		fn on(&self, _event: &AnotherEvent) {
234			let mut x = self.0.counter.lock().unwrap();
235			*x *= 2;
236		}
237	}
238
239	#[test]
240	fn test_event_bus_new() {
241		let actor_system = test_actor_system();
242		let event_bus = EventBus::new(&actor_system);
243		event_bus.emit(TestEvent::new());
244		event_bus.wait_for_completion();
245	}
246
247	#[test]
248	fn test_register_single_listener() {
249		let actor_system = test_actor_system();
250		let event_bus = EventBus::new(&actor_system);
251		let listener = TestEventListener::default();
252
253		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
254		event_bus.emit(TestEvent::new());
255		event_bus.wait_for_completion();
256		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
257	}
258
259	#[test]
260	fn test_emit_unregistered_event() {
261		let actor_system = test_actor_system();
262		let event_bus = EventBus::new(&actor_system);
263		event_bus.emit(TestEvent::new());
264		event_bus.wait_for_completion();
265	}
266
267	#[test]
268	fn test_multiple_listeners_same_event() {
269		let actor_system = test_actor_system();
270		let event_bus = EventBus::new(&actor_system);
271		let listener1 = TestEventListener::default();
272		let listener2 = TestEventListener::default();
273
274		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
275		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
276
277		event_bus.emit(TestEvent::new());
278		event_bus.wait_for_completion();
279		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
280		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
281	}
282
283	#[test]
284	fn test_event_bus_clone() {
285		let actor_system = test_actor_system();
286		let event_bus1 = EventBus::new(&actor_system);
287		let listener = TestEventListener::default();
288		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
289
290		let event_bus2 = event_bus1.clone();
291		event_bus2.emit(TestEvent::new());
292		event_bus2.wait_for_completion();
293		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
294	}
295
296	#[test]
297	fn test_concurrent_registration() {
298		let actor_system = test_actor_system();
299		let event_bus = Arc::new(EventBus::new(&actor_system));
300		let mut handles = Vec::new();
301
302		for _ in 0..10 {
303			let event_bus = event_bus.clone();
304			handles.push(thread::spawn(move || {
305				let listener = TestEventListener::default();
306				event_bus.register::<TestEvent, TestEventListener>(listener);
307			}));
308		}
309
310		for handle in handles {
311			handle.join().unwrap();
312		}
313
314		event_bus.emit(TestEvent::new());
315		event_bus.wait_for_completion();
316	}
317
318	#[test]
319	fn test_concurrent_emitting() {
320		let actor_system = test_actor_system();
321		let event_bus = Arc::new(EventBus::new(&actor_system));
322		let listener = TestEventListener::default();
323		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
324		event_bus.wait_for_completion();
325
326		let mut handles = Vec::new();
327
328		for _ in 0..10 {
329			let event_bus = event_bus.clone();
330			handles.push(thread::spawn(move || {
331				event_bus.emit(TestEvent::new());
332			}));
333		}
334
335		for handle in handles {
336			handle.join().unwrap();
337		}
338
339		event_bus.wait_for_completion();
340		assert_eq!(*listener.0.counter.lock().unwrap(), 10);
341	}
342
343	define_event! {
344		pub struct MacroTestEvent {
345			pub value: i32,
346		}
347	}
348
349	#[test]
350	fn testine_event_macro() {
351		let event = MacroTestEvent::new(42);
352		let any_ref = event.as_any();
353		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
354		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
355	}
356
357	#[test]
358	fn test_multi_event_listener() {
359		let actor_system = test_actor_system();
360		let event_bus = EventBus::new(&actor_system);
361		let listener = TestEventListener::default();
362
363		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
364		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
365
366		// Each event type triggers only its own listeners
367		event_bus.emit(TestEvent::new());
368		event_bus.wait_for_completion();
369		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
370
371		event_bus.emit(TestEvent::new());
372		event_bus.wait_for_completion();
373		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
374
375		event_bus.emit(AnotherEvent::new());
376		event_bus.wait_for_completion();
377		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
378	}
379}