Skip to main content

reifydb_core/event/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	any::{Any, TypeId},
6	collections::HashMap,
7	sync,
8	sync::Arc,
9};
10
11use reifydb_runtime::actor::{
12	context::Context,
13	mailbox::ActorRef,
14	system::ActorSystem,
15	traits::{Actor, Directive},
16};
17use sync::mpsc::Sender;
18
19pub mod flow;
20pub mod lifecycle;
21#[macro_use]
22pub mod r#macro;
23pub mod metric;
24pub mod procedure;
25pub mod row;
26pub mod store;
27pub mod transaction;
28
29type EventListenerInstaller = Box<dyn FnOnce(&mut HashMap<TypeId, Box<dyn EventListenerList>>) + Send>;
30
31pub trait Event: Any + Send + Sync + Clone + 'static {
32	fn as_any(&self) -> &dyn Any;
33	fn into_any(self) -> Box<dyn Any + Send>;
34}
35
36pub trait EventListener<E>: Send + Sync + 'static
37where
38	E: Event,
39{
40	fn on(&self, event: &E);
41}
42
43trait EventListenerList: Any + Send + Sync {
44	fn on_any(&self, event: Box<dyn Any + Send>);
45	fn as_any_mut(&mut self) -> &mut dyn Any;
46}
47
48struct EventListenerListImpl<E> {
49	listeners: Vec<Arc<dyn EventListener<E>>>,
50}
51
52impl<E> EventListenerListImpl<E>
53where
54	E: Event,
55{
56	fn new() -> Self {
57		Self {
58			listeners: Vec::new(),
59		}
60	}
61
62	fn add(&mut self, listener: Arc<dyn EventListener<E>>) {
63		self.listeners.push(listener);
64	}
65}
66
67impl<E> EventListenerList for EventListenerListImpl<E>
68where
69	E: Event,
70{
71	fn on_any(&self, event: Box<dyn Any + Send>) {
72		if let Ok(event) = event.downcast::<E>() {
73			for listener in &self.listeners {
74				listener.on(&*event);
75			}
76		}
77	}
78
79	fn as_any_mut(&mut self) -> &mut dyn Any {
80		self
81	}
82}
83
84struct EventEnvelope {
85	type_id: TypeId,
86	event: Box<dyn Any + Send>,
87}
88
89enum EventBusMessage {
90	Emit(EventEnvelope),
91	Register {
92		installer: EventListenerInstaller,
93	},
94	WaitForCompletion(Sender<()>),
95}
96
97struct EventBusActor;
98
99impl Actor for EventBusActor {
100	type State = HashMap<TypeId, Box<dyn EventListenerList>>;
101	type Message = EventBusMessage;
102
103	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
104		HashMap::new()
105	}
106
107	fn handle(&self, state: &mut Self::State, msg: Self::Message, _ctx: &Context<Self::Message>) -> Directive {
108		match msg {
109			EventBusMessage::Emit(envelope) => {
110				if let Some(list) = state.get(&envelope.type_id) {
111					list.on_any(envelope.event);
112				}
113			}
114			EventBusMessage::Register {
115				installer,
116			} => {
117				installer(state);
118			}
119			EventBusMessage::WaitForCompletion(tx) => {
120				let _ = tx.send(());
121			}
122		}
123		Directive::Continue
124	}
125}
126
127#[derive(Clone)]
128pub struct EventBus {
129	actor_ref: ActorRef<EventBusMessage>,
130	_actor_system: ActorSystem,
131}
132
133impl EventBus {
134	pub fn new(actor_system: &ActorSystem) -> Self {
135		let handle = actor_system.spawn("event-bus", EventBusActor);
136		Self {
137			actor_ref: handle.actor_ref().clone(),
138			_actor_system: actor_system.clone(),
139		}
140	}
141
142	pub fn register<E, L>(&self, listener: L)
143	where
144		E: Event,
145		L: EventListener<E>,
146	{
147		let type_id = TypeId::of::<E>();
148		let listener = Arc::new(listener);
149
150		let installer: EventListenerInstaller = Box::new(move |map| {
151			let list = map.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
152			list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(listener);
153		});
154
155		let _ = self.actor_ref.send(EventBusMessage::Register {
156			installer,
157		});
158	}
159
160	pub fn emit<E>(&self, event: E)
161	where
162		E: Event,
163	{
164		let type_id = TypeId::of::<E>();
165		let _ = self.actor_ref.send(EventBusMessage::Emit(EventEnvelope {
166			type_id,
167			event: event.into_any(),
168		}));
169	}
170
171	pub fn wait_for_completion(&self) {
172		let (tx, rx) = sync::mpsc::channel();
173		let _ = self.actor_ref.send(EventBusMessage::WaitForCompletion(tx));
174		let _ = rx.recv();
175	}
176}
177
178#[cfg(test)]
179pub mod tests {
180	use std::{
181		sync::{Arc, Mutex},
182		thread,
183	};
184
185	use reifydb_runtime::{
186		actor::system::ActorSystem,
187		context::clock::Clock,
188		pool::{PoolConfig, Pools},
189	};
190
191	use crate::event::{Event, EventBus, EventListener};
192
193	fn test_actor_system() -> ActorSystem {
194		let pools = Pools::new(PoolConfig::default());
195		ActorSystem::new(pools, Clock::Real)
196	}
197
198	define_event! {
199		pub struct TestEvent{}
200	}
201
202	define_event! {
203		pub struct AnotherEvent{}
204	}
205
206	#[derive(Default, Debug, Clone)]
207	pub struct TestEventListener(Arc<TestHandlerInner>);
208
209	#[derive(Default, Debug)]
210	pub struct TestHandlerInner {
211		pub counter: Arc<Mutex<i32>>,
212	}
213
214	impl EventListener<TestEvent> for TestEventListener {
215		fn on(&self, _event: &TestEvent) {
216			let mut x = self.0.counter.lock().unwrap();
217			*x += 1;
218		}
219	}
220
221	impl EventListener<AnotherEvent> for TestEventListener {
222		fn on(&self, _event: &AnotherEvent) {
223			let mut x = self.0.counter.lock().unwrap();
224			*x *= 2;
225		}
226	}
227
228	#[test]
229	fn test_event_bus_new() {
230		let actor_system = test_actor_system();
231		let event_bus = EventBus::new(&actor_system);
232		event_bus.emit(TestEvent::new());
233		event_bus.wait_for_completion();
234	}
235
236	#[test]
237	fn test_register_single_listener() {
238		let actor_system = test_actor_system();
239		let event_bus = EventBus::new(&actor_system);
240		let listener = TestEventListener::default();
241
242		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
243		event_bus.emit(TestEvent::new());
244		event_bus.wait_for_completion();
245		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
246	}
247
248	#[test]
249	fn test_emit_unregistered_event() {
250		let actor_system = test_actor_system();
251		let event_bus = EventBus::new(&actor_system);
252		event_bus.emit(TestEvent::new());
253		event_bus.wait_for_completion();
254	}
255
256	#[test]
257	fn test_multiple_listeners_same_event() {
258		let actor_system = test_actor_system();
259		let event_bus = EventBus::new(&actor_system);
260		let listener1 = TestEventListener::default();
261		let listener2 = TestEventListener::default();
262
263		event_bus.register::<TestEvent, TestEventListener>(listener1.clone());
264		event_bus.register::<TestEvent, TestEventListener>(listener2.clone());
265
266		event_bus.emit(TestEvent::new());
267		event_bus.wait_for_completion();
268		assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
269		assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
270	}
271
272	#[test]
273	fn test_event_bus_clone() {
274		let actor_system = test_actor_system();
275		let event_bus1 = EventBus::new(&actor_system);
276		let listener = TestEventListener::default();
277		event_bus1.register::<TestEvent, TestEventListener>(listener.clone());
278
279		let event_bus2 = event_bus1.clone();
280		event_bus2.emit(TestEvent::new());
281		event_bus2.wait_for_completion();
282		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
283	}
284
285	#[test]
286	fn test_concurrent_registration() {
287		let actor_system = test_actor_system();
288		let event_bus = Arc::new(EventBus::new(&actor_system));
289		let mut handles = Vec::new();
290
291		for _ in 0..10 {
292			let event_bus = event_bus.clone();
293			handles.push(thread::spawn(move || {
294				let listener = TestEventListener::default();
295				event_bus.register::<TestEvent, TestEventListener>(listener);
296			}));
297		}
298
299		for handle in handles {
300			handle.join().unwrap();
301		}
302
303		event_bus.emit(TestEvent::new());
304		event_bus.wait_for_completion();
305	}
306
307	#[test]
308	fn test_concurrent_emitting() {
309		let actor_system = test_actor_system();
310		let event_bus = Arc::new(EventBus::new(&actor_system));
311		let listener = TestEventListener::default();
312		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
313		event_bus.wait_for_completion();
314
315		let mut handles = Vec::new();
316
317		for _ in 0..10 {
318			let event_bus = event_bus.clone();
319			handles.push(thread::spawn(move || {
320				event_bus.emit(TestEvent::new());
321			}));
322		}
323
324		for handle in handles {
325			handle.join().unwrap();
326		}
327
328		event_bus.wait_for_completion();
329		assert_eq!(*listener.0.counter.lock().unwrap(), 10);
330	}
331
332	define_event! {
333		pub struct MacroTestEvent {
334			pub value: i32,
335		}
336	}
337
338	#[test]
339	fn testine_event_macro() {
340		let event = MacroTestEvent::new(42);
341		let any_ref = event.as_any();
342		assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
343		assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value(), &42);
344	}
345
346	#[test]
347	fn test_multi_event_listener() {
348		let actor_system = test_actor_system();
349		let event_bus = EventBus::new(&actor_system);
350		let listener = TestEventListener::default();
351
352		event_bus.register::<TestEvent, TestEventListener>(listener.clone());
353		event_bus.register::<AnotherEvent, TestEventListener>(listener.clone());
354
355		// Each event type triggers only its own listeners
356		event_bus.emit(TestEvent::new());
357		event_bus.wait_for_completion();
358		assert_eq!(*listener.0.counter.lock().unwrap(), 1);
359
360		event_bus.emit(TestEvent::new());
361		event_bus.wait_for_completion();
362		assert_eq!(*listener.0.counter.lock().unwrap(), 2);
363
364		event_bus.emit(AnotherEvent::new());
365		event_bus.wait_for_completion();
366		assert_eq!(*listener.0.counter.lock().unwrap(), 4); // 2 * 2
367	}
368}