async_events_emitter/
lib.rs

1//! # Event Emitter Library
2//!
3//! `async-events-emitter` is a Rust library providing an implementation of an event handling system.
4//! It allows users to define custom events and handlers, and emit events that are processed asynchronously.
5//!
6//! ## Usage
7//! ```
8//! use async_events_emitter::*;
9//! use async_trait::async_trait;
10//! 
11//! // Define your events
12//! #[derive(Debug, Clone)]
13//! struct MyEvent;
14//! 
15//! // Impl the EventHandler trait
16//! #[async_trait]
17//! impl EventHandler<MyEvent> for MyHandler {
18//!     async fn handle_event(&self, event: MyEvent) {
19//!         // TODO: Process and handle your event async
20//!     }
21//! }
22//! 
23//! let ee = EventEmitter::new();
24//! // Init the handler
25//! let handler = EventHandler;
26//! // Attach the handler to your event
27//! ee.on::<MyEvent>(handler);
28//! // Now emit the event
29//! ee.emit(MyEvent);
30//! ```
31//! 
32//! ## Features
33//! - Define custom events.
34//! - Register asynchronous event handlers.
35//! - Emit events to be handled by registered handlers.
36
37extern crate tokio;
38
39mod event;
40mod listener;
41mod event_emitter;
42
43// Re-exporting the main structs and traits
44pub use self::{event::{Event, EventError, EventHandler}, event_emitter::EventEmitter};
45
46#[cfg(test)]
47mod test {
48    use std::{sync::{Arc, atomic::{AtomicBool, Ordering}}, time::Instant};
49
50    use async_trait::async_trait;
51    use tokio::sync::Mutex;
52
53    use super::*;
54
55
56    // Example of custom event
57    #[derive(Debug, Clone)]
58    struct CustomEvent1 {
59        data: String,
60    }
61
62    #[derive(Debug, Clone)]
63    struct CustomEvent2 {
64        data: usize
65    }
66
67    impl Event for CustomEvent1 {}
68    impl Event for CustomEvent2 {}
69
70
71    #[derive(Debug, Clone)]
72    struct CounterHandler {
73        handle_count: Arc<tokio::sync::Mutex<usize>>,
74    }
75
76    #[derive(Debug, Clone)]
77    struct SharedDataHandler1 {
78        data: Arc<Mutex<Vec<String>>>,
79    }
80
81    #[derive(Debug, Clone)]
82    struct SharedDataHandler2 {
83        data: Arc<Mutex<Vec<String>>>,
84    }
85
86    #[derive(Debug)]
87    struct UnsubscribeHandler {
88        received_flag: Arc<AtomicBool>,
89    }
90
91    #[derive(Debug, Clone)]
92    struct SimpleHandler;
93
94    #[derive(Debug, Clone)]
95    struct ComplexHandler {
96        data: Arc<tokio::sync::Mutex<String>>,
97    }
98
99    #[async_trait]
100    impl EventHandler<CustomEvent1> for SimpleHandler {
101        async fn handle_event(&mut self, event: &CustomEvent1) {
102            println!("[SimpleHandler] Handling CustomEvent1 with data: {}", event.data);
103        }
104    }
105
106    #[async_trait]
107    impl EventHandler<CustomEvent2> for SimpleHandler {
108        async fn handle_event(&mut self, event: &CustomEvent2) {
109            println!("[SimpleHandler] Handling CustomEvent2 with data: {}", event.data);
110        }
111    }
112
113    #[async_trait]
114    impl EventHandler<CustomEvent1> for SharedDataHandler1 {
115        async fn handle_event(&mut self, event: &CustomEvent1) {
116            let mut data = self.data.lock().await;
117            
118            data.push(event.data.clone() + " from handler1");
119        }
120    }
121
122    #[async_trait]
123    impl EventHandler<CustomEvent1> for SharedDataHandler2 {
124        async fn handle_event(&mut self, event: &CustomEvent1) {
125            let mut data = self.data.lock().await;
126            
127            data.push(event.data.clone() + " from handler2");
128        }
129    }
130
131    #[async_trait]
132    impl EventHandler<CustomEvent1> for UnsubscribeHandler {
133        async fn handle_event(&mut self, event: &CustomEvent1) {
134            println!("[UnsubscribeHandler] Handling CustomEvent1 with data: {}", event.data);
135            let flag = &self.received_flag;
136            flag.store(true, Ordering::SeqCst);
137        }
138    }
139
140    #[async_trait]
141    impl EventHandler<CustomEvent1> for CounterHandler {
142        async fn handle_event(&mut self, _event: &CustomEvent1) {
143            // println!("[CounterHandler] Handling CustomEvent1 with data: {}", event.data);
144            let mut counter = self.handle_count.lock().await;
145            *counter += 1;
146        }
147    }
148
149    #[async_trait]
150    impl EventHandler<CustomEvent2> for CounterHandler {
151        async fn handle_event(&mut self, _event: &CustomEvent2) {
152            // println!("[CounterHandler] Handling CustomEvent2 with data: {}", event.data);
153            let mut counter = self.handle_count.lock().await;
154            *counter += 1;
155        }
156    }
157
158    #[async_trait]
159    impl EventHandler<CustomEvent1> for ComplexHandler {
160        async fn handle_event(&mut self, event: &CustomEvent1) {
161            // Async handling logic
162            println!("[ComplexHandler] Handling CustomEvent1 with data: {}", event.data);
163            // Lock the mutex to get mutable access to the data
164            let mut data = self.data.lock().await;
165
166            // Perform your mutations here
167            // For example, appending some information from the event
168            *data = format!("{} - Event Handled", *data);
169
170        }
171    }
172
173    #[async_trait]
174    impl EventHandler<CustomEvent2> for ComplexHandler {
175        async fn handle_event(&mut self, event: &CustomEvent2) {
176            // Async handling logic
177            println!("[ComplexHandler] Handling CustomEvent2 with data: {}", event.data);
178        }
179    }
180
181    #[tokio::test]
182    async fn test_event_once() {
183        let mut event_emitter = EventEmitter::new();
184        let handler = CounterHandler {
185            handle_count: Arc::new(Mutex::new(0))
186        };
187        event_emitter.once::<CustomEvent1>(handler.clone());
188        let event = CustomEvent1 {
189            data: "Should be triggered once!".to_string()
190        };
191        event_emitter.emit(event.clone());
192        event_emitter.emit(event);
193        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
194        assert_eq!(event_emitter.listeners_count(), 0);
195        let handler_count =  handler.handle_count.lock().await;
196        assert_eq!(handler_count.to_owned(), 1);
197
198    }
199
200    #[tokio::test]
201    async fn test_unsubscribe_functionality() {
202        let mut event_emitter = EventEmitter::new();
203        let received_flag = Arc::new(AtomicBool::new(false));
204        let received_flag_clone = received_flag.clone();
205
206        let handler = UnsubscribeHandler {
207            received_flag: received_flag_clone
208        };
209
210        let listener_id = event_emitter.on::<CustomEvent1>(handler);
211        event_emitter.remove_listener(listener_id);
212
213        event_emitter.emit(CustomEvent1 { data: "Test".to_string() });
214        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // Allow time for handlers to process
215
216        assert_eq!(received_flag.load(Ordering::SeqCst), false);
217    }
218
219    #[tokio::test]
220    async fn test_multiple_subscribers() {
221        let mut event_emitter = EventEmitter::new();
222        let shared_data = Arc::new(tokio::sync::Mutex::new(vec![]));
223
224        let handler1_data = shared_data.clone();
225        let handler1 = SharedDataHandler1 {
226            data: handler1_data
227        };
228
229        let handler2_data = shared_data.clone();
230        let handler2 = SharedDataHandler2 {
231            data: handler2_data
232        };
233
234        event_emitter.on::<CustomEvent1>(handler1);
235        event_emitter.on::<CustomEvent1>(handler2);
236
237        event_emitter.emit(CustomEvent1 { data: "Hello".to_string() });
238        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // Allow time for handlers to process
239
240        let data = shared_data.lock().await;
241        assert_eq!(data.len(), 2);
242        assert!(data.contains(&"Hello from handler1".to_string()));
243        assert!(data.contains(&"Hello from handler2".to_string()));
244    }
245
246
247    #[tokio::test]
248    async fn test_high_load() {
249        let event_emitter = Arc::new(Mutex::new(EventEmitter::new()));
250        let event_count = 1000;
251        let handler_count = 100;
252        let counter = Arc::new(Mutex::new(0));
253        // Add a large number of handlers
254        for _ in 0..handler_count {
255            let emitter = event_emitter.clone();
256            let handler = CounterHandler {
257                handle_count: Arc::clone(&counter)
258            };
259            emitter.lock().await.on::<CustomEvent1>(handler);
260        }
261        let start_time = Instant::now();
262        // Emit a large number of events
263        let event = CustomEvent1 { data: "Some value".to_string() };
264        for _ in 0..event_count {
265            let cloned_ee = Arc::clone(&event_emitter);
266            let cloned_e = event.clone();
267            tokio::spawn(async move {
268                cloned_ee.lock().await.emit(cloned_e);
269            });
270        }
271        tokio::time::sleep(tokio::time::Duration::from_millis(335)).await;
272        let duration = start_time.elapsed();
273        let total_events = event_count * handler_count;
274        
275        // Ensure all events have been processed
276        let handled_events = counter.lock().await;
277        assert_eq!(*handled_events, total_events, "Not all events were handled correctly");
278        println!("Time taken to process events {}: {:?}", total_events, duration);
279    }
280
281    #[tokio::test]
282    async fn test_event_emitter() {
283        let mut event_emitter = EventEmitter::new();
284        let handler = ComplexHandler {
285            data: Arc::new(tokio::sync::Mutex::new(String::new())),
286        };
287
288        event_emitter.on::<CustomEvent1>(handler.clone());
289
290        let listener_id = event_emitter.on::<CustomEvent2>(handler.clone());
291
292        let event = CustomEvent1 { data: "Hello World".to_string() };
293        let event2 = CustomEvent2 { data: 1 };
294
295        event_emitter.emit(event.clone());
296        event_emitter.remove_listener(listener_id);
297        event_emitter.emit(event2);
298        
299        // let mut new_ee = event_emitter;
300
301        // new_ee.emit(event)
302        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
303        let data = handler.data.lock().await;
304        assert_eq!(data.to_owned(), " - Event Handled")
305
306    }
307
308    #[tokio::test]
309    async fn test_listeners() {
310        let mut ee = EventEmitter::with_capacity(1);
311        let handler = SimpleHandler;
312        let handler_1 = ComplexHandler {
313            data: Arc::new(Mutex::new("".to_string())),
314        };
315        ee.on::<CustomEvent1>(handler.clone());
316        ee.on::<CustomEvent1>(handler_1);
317        ee.on::<CustomEvent2>(handler);
318        let listeners = ee.get_event_listeners::<CustomEvent1>().unwrap();
319        assert_eq!(listeners.len() , 2);
320    }
321}