reifydb_core/event/
macro.rs1#[macro_export]
5macro_rules! define_event {
6 (
8 $(#[$meta:meta])*
9 $vis:vis struct $name:ident {}
10 ) => {
11 $(#[$meta])*
12 #[derive(Debug, Clone)]
13 $vis struct $name {}
14
15 impl $name {
16 #[allow(clippy::new_without_default)]
17 pub fn new() -> Self {
18 Self {}
19 }
20 }
21
22 impl $crate::event::Event for $name {
23 fn as_any(&self) -> &dyn std::any::Any {
24 self
25 }
26
27 fn into_any(self) -> Box<dyn std::any::Any + Send> {
28 Box::new(self)
29 }
30 }
31 };
32
33 (
35 $(#[$meta:meta])*
36 $vis:vis struct $name:ident {
37 $(
38 $(#[$field_meta:meta])*
39 $field_vis:vis $field:ident: $field_ty:ty
40 ),* $(,)?
41 }
42 ) => {
43 ::paste::paste! {
45 #[doc(hidden)]
47 #[allow(non_snake_case)]
48 mod [<__inner_ $name:snake>] {
49 #[allow(unused_imports)]
50 use super::*;
51
52 #[derive(Debug)]
53 #[allow(dead_code)]
54 pub(super) struct Inner {
55 $(
56 $(#[$field_meta])*
57 pub(super) $field: $field_ty,
58 )*
59 }
60 }
61
62 $(#[$meta])*
64 #[derive(Debug)]
65 $vis struct $name {
66 inner: std::sync::Arc<[<__inner_ $name:snake>]::Inner>,
67 }
68
69 impl Clone for $name {
71 fn clone(&self) -> Self {
72 Self {
73 inner: std::sync::Arc::clone(&self.inner),
74 }
75 }
76 }
77
78 impl $name {
80 #[allow(clippy::too_many_arguments)]
81 #[allow(clippy::new_without_default)]
82 pub fn new($($field: $field_ty),*) -> Self {
83 Self {
84 inner: std::sync::Arc::new([<__inner_ $name:snake>]::Inner {
85 $($field),*
86 }),
87 }
88 }
89
90 $(
91 #[allow(dead_code)]
92 pub fn $field(&self) -> &$field_ty {
93 &self.inner.$field
94 }
95 )*
96 }
97
98 impl $crate::event::Event for $name {
100 fn as_any(&self) -> &dyn std::any::Any {
101 self
102 }
103
104 fn into_any(self) -> Box<dyn std::any::Any + Send> {
105 Box::new(self)
106 }
107 }
108 }
109 };
110}
111
112#[cfg(test)]
113mod tests {
114 use std::{
115 sync::{Arc, Mutex},
116 thread,
117 };
118
119 use reifydb_runtime::{
120 actor::system::ActorSystem,
121 context::clock::Clock,
122 pool::{PoolConfig, Pools},
123 };
124
125 use crate::event::{Event, EventBus, EventListener};
126
127 define_event! {
128 pub struct DefineTestEvent {
129 pub data: Vec<i32>,
130 pub name: String,
131 }
132 }
133
134 define_event! {
135 pub struct EmptyDefineEvent {}
136 }
137
138 #[test]
139 fn testine_event_cheap_clone() {
140 let large_vec = vec![0; 10_000];
141 let event = DefineTestEvent::new(large_vec, "test".to_string());
142
143 let clone1 = event.clone();
145 let clone2 = event.clone();
146
147 assert!(Arc::ptr_eq(&event.inner, &clone1.inner));
149 assert!(Arc::ptr_eq(&event.inner, &clone2.inner));
150
151 assert_eq!(event.data().len(), 10_000);
153 assert_eq!(clone1.data().len(), 10_000);
154 assert_eq!(clone2.data().len(), 10_000);
155 }
156
157 #[test]
158 fn testine_event_field_access() {
159 let event = DefineTestEvent::new(vec![1, 2, 3], "my_event".to_string());
160
161 assert_eq!(event.data(), &vec![1, 2, 3]);
162 assert_eq!(event.name(), "my_event");
163
164 let _data_ref: &Vec<i32> = event.data();
166 let _name_ref: &String = event.name();
167 }
168
169 #[test]
170 fn testine_event_empty_struct() {
171 let event = EmptyDefineEvent::new();
172 let clone = event.clone();
173
174 drop(event);
176 drop(clone);
177 }
178
179 #[test]
180 fn testine_event_implements_event_trait() {
181 let event = DefineTestEvent::new(vec![42], "test".to_string());
182
183 let any_ref = event.as_any();
185 assert!(any_ref.downcast_ref::<DefineTestEvent>().is_some());
186
187 let event2 = DefineTestEvent::new(vec![99], "test2".to_string());
188 let any_box = event2.into_any();
189 assert!(any_box.downcast::<DefineTestEvent>().is_ok());
190 }
191
192 #[test]
193 fn testine_event_send_sync() {
194 fn assert_send<T: Send>() {}
196 fn assert_sync<T: Sync>() {}
197
198 assert_send::<DefineTestEvent>();
199 assert_sync::<DefineTestEvent>();
200
201 let event = DefineTestEvent::new(vec![1, 2, 3], "thread_test".to_string());
203 let handle = thread::spawn(move || {
204 assert_eq!(event.data(), &vec![1, 2, 3]);
205 });
206 handle.join().unwrap();
207 }
208
209 #[test]
210 fn testine_event_with_event_bus() {
211 let pools = Pools::new(PoolConfig::default());
212 let actor_system = ActorSystem::new(pools, Clock::Real);
213 let event_bus = EventBus::new(&actor_system);
214
215 #[derive(Clone)]
217 struct DefineTestListener {
218 counter: Arc<Mutex<i32>>,
219 }
220
221 impl EventListener<DefineTestEvent> for DefineTestListener {
222 fn on(&self, event: &DefineTestEvent) {
223 let mut c = self.counter.lock().unwrap();
224 *c += event.data().len() as i32;
225 }
226 }
227
228 let listener = DefineTestListener {
229 counter: Arc::new(Mutex::new(0)),
230 };
231
232 event_bus.register::<DefineTestEvent, DefineTestListener>(listener.clone());
233
234 event_bus.emit(DefineTestEvent::new(vec![1, 2, 3], "test".to_string()));
236 event_bus.wait_for_completion();
237 assert_eq!(*listener.counter.lock().unwrap(), 3);
238
239 event_bus.emit(DefineTestEvent::new(vec![1, 2, 3, 4, 5], "test2".to_string()));
241 event_bus.wait_for_completion();
242 assert_eq!(*listener.counter.lock().unwrap(), 8);
243 }
244}