async_events_emitter/
lib.rs1extern crate tokio;
38
39mod event;
40mod listener;
41mod event_emitter;
42
43pub use self::{event::{Event, EventError, EventHandler}, event_emitter::EventEmitter};
45
46#[cfg(test)]
47mod test {
48 use std::{sync::{Arc, atomic::{AtomicBool, Ordering}}, time::Instant};
49
50 use async_trait::async_trait;
51 use tokio::sync::Mutex;
52
53 use super::*;
54
55
56 #[derive(Debug, Clone)]
58 struct CustomEvent1 {
59 data: String,
60 }
61
62 #[derive(Debug, Clone)]
63 struct CustomEvent2 {
64 data: usize
65 }
66
67 impl Event for CustomEvent1 {}
68 impl Event for CustomEvent2 {}
69
70
71 #[derive(Debug, Clone)]
72 struct CounterHandler {
73 handle_count: Arc<tokio::sync::Mutex<usize>>,
74 }
75
76 #[derive(Debug, Clone)]
77 struct SharedDataHandler1 {
78 data: Arc<Mutex<Vec<String>>>,
79 }
80
81 #[derive(Debug, Clone)]
82 struct SharedDataHandler2 {
83 data: Arc<Mutex<Vec<String>>>,
84 }
85
86 #[derive(Debug)]
87 struct UnsubscribeHandler {
88 received_flag: Arc<AtomicBool>,
89 }
90
91 #[derive(Debug, Clone)]
92 struct SimpleHandler;
93
94 #[derive(Debug, Clone)]
95 struct ComplexHandler {
96 data: Arc<tokio::sync::Mutex<String>>,
97 }
98
99 #[async_trait]
100 impl EventHandler<CustomEvent1> for SimpleHandler {
101 async fn handle_event(&mut self, event: &CustomEvent1) {
102 println!("[SimpleHandler] Handling CustomEvent1 with data: {}", event.data);
103 }
104 }
105
106 #[async_trait]
107 impl EventHandler<CustomEvent2> for SimpleHandler {
108 async fn handle_event(&mut self, event: &CustomEvent2) {
109 println!("[SimpleHandler] Handling CustomEvent2 with data: {}", event.data);
110 }
111 }
112
113 #[async_trait]
114 impl EventHandler<CustomEvent1> for SharedDataHandler1 {
115 async fn handle_event(&mut self, event: &CustomEvent1) {
116 let mut data = self.data.lock().await;
117
118 data.push(event.data.clone() + " from handler1");
119 }
120 }
121
122 #[async_trait]
123 impl EventHandler<CustomEvent1> for SharedDataHandler2 {
124 async fn handle_event(&mut self, event: &CustomEvent1) {
125 let mut data = self.data.lock().await;
126
127 data.push(event.data.clone() + " from handler2");
128 }
129 }
130
131 #[async_trait]
132 impl EventHandler<CustomEvent1> for UnsubscribeHandler {
133 async fn handle_event(&mut self, event: &CustomEvent1) {
134 println!("[UnsubscribeHandler] Handling CustomEvent1 with data: {}", event.data);
135 let flag = &self.received_flag;
136 flag.store(true, Ordering::SeqCst);
137 }
138 }
139
140 #[async_trait]
141 impl EventHandler<CustomEvent1> for CounterHandler {
142 async fn handle_event(&mut self, _event: &CustomEvent1) {
143 let mut counter = self.handle_count.lock().await;
145 *counter += 1;
146 }
147 }
148
149 #[async_trait]
150 impl EventHandler<CustomEvent2> for CounterHandler {
151 async fn handle_event(&mut self, _event: &CustomEvent2) {
152 let mut counter = self.handle_count.lock().await;
154 *counter += 1;
155 }
156 }
157
158 #[async_trait]
159 impl EventHandler<CustomEvent1> for ComplexHandler {
160 async fn handle_event(&mut self, event: &CustomEvent1) {
161 println!("[ComplexHandler] Handling CustomEvent1 with data: {}", event.data);
163 let mut data = self.data.lock().await;
165
166 *data = format!("{} - Event Handled", *data);
169
170 }
171 }
172
173 #[async_trait]
174 impl EventHandler<CustomEvent2> for ComplexHandler {
175 async fn handle_event(&mut self, event: &CustomEvent2) {
176 println!("[ComplexHandler] Handling CustomEvent2 with data: {}", event.data);
178 }
179 }
180
181 #[tokio::test]
182 async fn test_event_once() {
183 let mut event_emitter = EventEmitter::new();
184 let handler = CounterHandler {
185 handle_count: Arc::new(Mutex::new(0))
186 };
187 event_emitter.once::<CustomEvent1>(handler.clone());
188 let event = CustomEvent1 {
189 data: "Should be triggered once!".to_string()
190 };
191 event_emitter.emit(event.clone());
192 event_emitter.emit(event);
193 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
194 assert_eq!(event_emitter.listeners_count(), 0);
195 let handler_count = handler.handle_count.lock().await;
196 assert_eq!(handler_count.to_owned(), 1);
197
198 }
199
200 #[tokio::test]
201 async fn test_unsubscribe_functionality() {
202 let mut event_emitter = EventEmitter::new();
203 let received_flag = Arc::new(AtomicBool::new(false));
204 let received_flag_clone = received_flag.clone();
205
206 let handler = UnsubscribeHandler {
207 received_flag: received_flag_clone
208 };
209
210 let listener_id = event_emitter.on::<CustomEvent1>(handler);
211 event_emitter.remove_listener(listener_id);
212
213 event_emitter.emit(CustomEvent1 { data: "Test".to_string() });
214 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; assert_eq!(received_flag.load(Ordering::SeqCst), false);
217 }
218
219 #[tokio::test]
220 async fn test_multiple_subscribers() {
221 let mut event_emitter = EventEmitter::new();
222 let shared_data = Arc::new(tokio::sync::Mutex::new(vec![]));
223
224 let handler1_data = shared_data.clone();
225 let handler1 = SharedDataHandler1 {
226 data: handler1_data
227 };
228
229 let handler2_data = shared_data.clone();
230 let handler2 = SharedDataHandler2 {
231 data: handler2_data
232 };
233
234 event_emitter.on::<CustomEvent1>(handler1);
235 event_emitter.on::<CustomEvent1>(handler2);
236
237 event_emitter.emit(CustomEvent1 { data: "Hello".to_string() });
238 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; let data = shared_data.lock().await;
241 assert_eq!(data.len(), 2);
242 assert!(data.contains(&"Hello from handler1".to_string()));
243 assert!(data.contains(&"Hello from handler2".to_string()));
244 }
245
246
247 #[tokio::test]
248 async fn test_high_load() {
249 let event_emitter = Arc::new(Mutex::new(EventEmitter::new()));
250 let event_count = 1000;
251 let handler_count = 100;
252 let counter = Arc::new(Mutex::new(0));
253 for _ in 0..handler_count {
255 let emitter = event_emitter.clone();
256 let handler = CounterHandler {
257 handle_count: Arc::clone(&counter)
258 };
259 emitter.lock().await.on::<CustomEvent1>(handler);
260 }
261 let start_time = Instant::now();
262 let event = CustomEvent1 { data: "Some value".to_string() };
264 for _ in 0..event_count {
265 let cloned_ee = Arc::clone(&event_emitter);
266 let cloned_e = event.clone();
267 tokio::spawn(async move {
268 cloned_ee.lock().await.emit(cloned_e);
269 });
270 }
271 tokio::time::sleep(tokio::time::Duration::from_millis(335)).await;
272 let duration = start_time.elapsed();
273 let total_events = event_count * handler_count;
274
275 let handled_events = counter.lock().await;
277 assert_eq!(*handled_events, total_events, "Not all events were handled correctly");
278 println!("Time taken to process events {}: {:?}", total_events, duration);
279 }
280
281 #[tokio::test]
282 async fn test_event_emitter() {
283 let mut event_emitter = EventEmitter::new();
284 let handler = ComplexHandler {
285 data: Arc::new(tokio::sync::Mutex::new(String::new())),
286 };
287
288 event_emitter.on::<CustomEvent1>(handler.clone());
289
290 let listener_id = event_emitter.on::<CustomEvent2>(handler.clone());
291
292 let event = CustomEvent1 { data: "Hello World".to_string() };
293 let event2 = CustomEvent2 { data: 1 };
294
295 event_emitter.emit(event.clone());
296 event_emitter.remove_listener(listener_id);
297 event_emitter.emit(event2);
298
299 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
303 let data = handler.data.lock().await;
304 assert_eq!(data.to_owned(), " - Event Handled")
305
306 }
307
308 #[tokio::test]
309 async fn test_listeners() {
310 let mut ee = EventEmitter::with_capacity(1);
311 let handler = SimpleHandler;
312 let handler_1 = ComplexHandler {
313 data: Arc::new(Mutex::new("".to_string())),
314 };
315 ee.on::<CustomEvent1>(handler.clone());
316 ee.on::<CustomEvent1>(handler_1);
317 ee.on::<CustomEvent2>(handler);
318 let listeners = ee.get_event_listeners::<CustomEvent1>().unwrap();
319 assert_eq!(listeners.len() , 2);
320 }
321}