reifydb_core/event/
mod.rs1use std::{
5 any::{Any, TypeId},
6 collections::HashMap,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11use tokio::sync::RwLock;
12
13pub mod catalog;
14pub mod flow;
15pub mod lifecycle;
16pub mod transaction;
17
18pub trait Event: Any + Send + Sync + Clone + 'static {
19 fn as_any(&self) -> &dyn Any;
20 fn into_any(self) -> Box<dyn Any + Send>;
21}
22
23#[async_trait]
24pub trait EventListener<E>: Send + Sync + 'static
25where
26 E: Event,
27{
28 async fn on(&self, event: &E);
29}
30
31#[async_trait]
32trait EventListenerList: Any + Send + Sync {
33 async fn on_any(&self, event: Box<dyn Any + Send>);
34 fn as_any_mut(&mut self) -> &mut dyn Any;
35}
36
37struct EventListenerListImpl<E> {
38 listeners: RwLock<Vec<Arc<dyn EventListener<E>>>>,
39}
40
41impl<E> EventListenerListImpl<E>
42where
43 E: Event,
44{
45 fn new() -> Self {
46 Self {
47 listeners: RwLock::new(Vec::new()),
48 }
49 }
50
51 async fn add(&self, listener: Arc<dyn EventListener<E>>) {
52 self.listeners.write().await.push(listener);
53 }
54}
55
56#[async_trait]
57impl<E> EventListenerList for EventListenerListImpl<E>
58where
59 E: Event,
60{
61 async fn on_any(&self, event: Box<dyn Any + Send>) {
62 if let Ok(event) = event.downcast::<E>() {
63 let listeners: Vec<_> = {
65 let guard = self.listeners.read().await;
66 guard.iter().cloned().collect()
67 };
68
69 for listener in listeners {
71 listener.on(&*event).await;
72 }
73 }
74 }
75
76 fn as_any_mut(&mut self) -> &mut dyn Any {
77 self
78 }
79}
80
81#[derive(Clone)]
82pub struct EventBus {
83 listeners: Arc<RwLock<HashMap<TypeId, Box<dyn EventListenerList>>>>,
84}
85
86impl Default for EventBus {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl EventBus {
93 pub fn new() -> Self {
94 Self {
95 listeners: Arc::new(RwLock::new(HashMap::new())),
96 }
97 }
98
99 pub async fn register<E, L>(&self, listener: L)
100 where
101 E: Event,
102 L: EventListener<E>,
103 {
104 let type_id = TypeId::of::<E>();
105
106 let mut listeners = self.listeners.write().await;
107 let list = listeners.entry(type_id).or_insert_with(|| Box::new(EventListenerListImpl::<E>::new()));
108
109 list.as_any_mut().downcast_mut::<EventListenerListImpl<E>>().unwrap().add(Arc::new(listener)).await;
110 }
111
112 pub async fn emit<E>(&self, event: E)
113 where
114 E: Event,
115 {
116 let type_id = TypeId::of::<E>();
117
118 let listener_list = {
120 let listeners = self.listeners.read().await;
121 listeners.get(&type_id).map(|l| l.as_ref() as *const dyn EventListenerList)
122 };
123
124 if let Some(listener_list_ptr) = listener_list {
126 let listener_list = unsafe { &*listener_list_ptr };
129 listener_list.on_any(event.into_any()).await;
130 }
131 }
132}
133
134#[macro_export]
135macro_rules! impl_event {
136 ($ty:ty) => {
137 impl $crate::event::Event for $ty {
138 fn as_any(&self) -> &dyn std::any::Any {
139 self
140 }
141
142 fn into_any(self) -> Box<dyn std::any::Any + Send> {
143 Box::new(self)
144 }
145 }
146 };
147}
148
149#[cfg(test)]
150mod tests {
151 use std::sync::{Arc, Mutex};
152
153 use async_trait::async_trait;
154
155 use crate::event::{Event, EventBus, EventListener};
156
157 #[derive(Debug, Clone)]
158 pub struct TestEvent {}
159
160 impl_event!(TestEvent);
161
162 #[derive(Debug, Clone)]
163 pub struct AnotherEvent {}
164
165 impl_event!(AnotherEvent);
166
167 #[derive(Default, Debug, Clone)]
168 pub struct TestEventListener(Arc<TestHandlerInner>);
169
170 #[derive(Default, Debug)]
171 pub struct TestHandlerInner {
172 pub counter: Arc<Mutex<i32>>,
173 }
174
175 #[async_trait]
176 impl EventListener<TestEvent> for TestEventListener {
177 async fn on(&self, _event: &TestEvent) {
178 let mut x = self.0.counter.lock().unwrap();
179 *x += 1;
180 }
181 }
182
183 #[async_trait]
184 impl EventListener<AnotherEvent> for TestEventListener {
185 async fn on(&self, _event: &AnotherEvent) {
186 let mut x = self.0.counter.lock().unwrap();
187 *x *= 2;
188 }
189 }
190
191 #[tokio::test]
192 async fn test_event_bus_new() {
193 let event_bus = EventBus::new();
194 event_bus.emit(TestEvent {}).await;
195 }
196
197 #[tokio::test]
198 async fn test_event_bus_default() {
199 let event_bus = EventBus::default();
200 event_bus.emit(TestEvent {}).await;
201 }
202
203 #[tokio::test]
204 async fn test_register_single_listener() {
205 let event_bus = EventBus::new();
206 let listener = TestEventListener::default();
207
208 event_bus.register::<TestEvent, TestEventListener>(listener.clone()).await;
209 event_bus.emit(TestEvent {}).await;
210 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
211 }
212
213 #[tokio::test]
214 async fn test_emit_unregistered_event() {
215 let event_bus = EventBus::new();
216 event_bus.emit(TestEvent {}).await;
217 }
218
219 #[tokio::test]
220 async fn test_multiple_listeners_same_event() {
221 let event_bus = EventBus::new();
222 let listener1 = TestEventListener::default();
223 let listener2 = TestEventListener::default();
224
225 event_bus.register::<TestEvent, TestEventListener>(listener1.clone()).await;
226 event_bus.register::<TestEvent, TestEventListener>(listener2.clone()).await;
227
228 event_bus.emit(TestEvent {}).await;
229 assert_eq!(*listener1.0.counter.lock().unwrap(), 1);
230 assert_eq!(*listener2.0.counter.lock().unwrap(), 1);
231 }
232
233 #[tokio::test]
234 async fn test_event_bus_clone() {
235 let event_bus1 = EventBus::new();
236 let listener = TestEventListener::default();
237 event_bus1.register::<TestEvent, TestEventListener>(listener.clone()).await;
238
239 let event_bus2 = event_bus1.clone();
240 event_bus2.emit(TestEvent {}).await;
241 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
242 }
243
244 #[tokio::test]
245 async fn test_concurrent_registration() {
246 let event_bus = Arc::new(EventBus::new());
247 let mut handles = Vec::new();
248
249 for _ in 0..10 {
250 let event_bus = event_bus.clone();
251 handles.push(tokio::spawn(async move {
252 let listener = TestEventListener::default();
253 event_bus.register::<TestEvent, TestEventListener>(listener).await;
254 }));
255 }
256
257 for handle in handles {
258 handle.await.unwrap();
259 }
260
261 event_bus.emit(TestEvent {}).await;
262 }
263
264 #[tokio::test]
265 async fn test_concurrent_emitting() {
266 let event_bus = Arc::new(EventBus::new());
267 let listener = TestEventListener::default();
268 event_bus.register::<TestEvent, TestEventListener>(listener.clone()).await;
269
270 let mut handles = Vec::new();
271
272 for _ in 0..10 {
273 let event_bus = event_bus.clone();
274 handles.push(tokio::spawn(async move {
275 event_bus.emit(TestEvent {}).await;
276 }));
277 }
278
279 for handle in handles {
280 handle.await.unwrap();
281 }
282
283 assert!(*listener.0.counter.lock().unwrap() >= 10);
284 }
285
286 #[derive(Debug, Clone)]
287 pub struct MacroTestEvent {
288 pub value: i32,
289 }
290
291 impl_event!(MacroTestEvent);
292
293 #[test]
294 fn test_impl_event_macro() {
295 let event = MacroTestEvent {
296 value: 42,
297 };
298 let any_ref = event.as_any();
299 assert!(any_ref.downcast_ref::<MacroTestEvent>().is_some());
300 assert_eq!(any_ref.downcast_ref::<MacroTestEvent>().unwrap().value, 42);
301 }
302
303 #[tokio::test]
304 async fn test_multi_event_listener() {
305 let event_bus = EventBus::default();
306 let listener = TestEventListener::default();
307
308 event_bus.register::<TestEvent, TestEventListener>(listener.clone()).await;
309 event_bus.register::<AnotherEvent, TestEventListener>(listener.clone()).await;
310
311 event_bus.emit(TestEvent {}).await;
313 assert_eq!(*listener.0.counter.lock().unwrap(), 1);
314
315 event_bus.emit(TestEvent {}).await;
316 assert_eq!(*listener.0.counter.lock().unwrap(), 2);
317
318 event_bus.emit(AnotherEvent {}).await;
319 assert_eq!(*listener.0.counter.lock().unwrap(), 4); }
321}