async_event_emitter/
lib.rs

1/*!
2
3        an Async implementation of the  [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs) crate
4
5        Allows you to subscribe to events with callbacks and also fire those events.
6        Events are in the form of (strings, value) and callbacks are in the form of closures that take in a value parameter;
7
8        ## Differences between this crate and [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs)
9        - This is an async implementation that works for all common async runtimes (Tokio, async-std and smol)
10        - The listener methods ***(on and once)*** take a callback that returns a future instead of a merely a closure.
11        - The emit methods executes each callback on each event by running async intra-task instead of spawning a std::thread
12        - This emitter is thread safe and can  also be used lock-free (supports interior mutability).
13
14        ***Note***: To use strict return and event types, use [typed-emitter](https://crates.io/crates/typed-emitter), that crate solves [this issue](https://github.com/spencerjibz/async-event-emitter-rs/issues/31) too.
15
16        ## Getting Started
17
18        ```
19        use async_event_emitter::AsyncEventEmitter;
20        #[tokio::main]
21        async fn main() {
22        let event_emitter = AsyncEventEmitter::new();
23        // This will print <"Hello world!"> whenever the <"Say Hello"> event is emitted
24        event_emitter.on("Say Hello", |_:()|  async move { println!("Hello world!")});
25        event_emitter.emit("Say Hello", ()).await;
26        // >> "Hello world!"
27
28        }
29        ```
30        ## Basic Usage
31        We can emit and listen to values of any type so long as they implement serde's Serialize and Deserialize traits.
32        A single EventEmitter instance can have listeners to values of multiple types.
33
34        ```
35        use async_event_emitter::AsyncEventEmitter as EventEmitter;
36        use serde::{Deserialize, Serialize};
37        #[tokio::main]
38        async fn main () {
39        let event_emitter = EventEmitter::new();
40        event_emitter.on("Add three", |number: f32| async move  {println!("{}", number + 3.0)});
41        event_emitter.emit("Add three", 5.0 as f32).await;
42        event_emitter.emit("Add three", 4.0 as f32).await;
43
44        // >> "8.0"
45        // >> "7.0"
46
47        // Using a more advanced value type such as a struct by implementing the serde traits
48        #[derive(Serialize, Deserialize,Debug)]
49        struct Date {
50            month: String,
51            day: String,
52        }
53
54        event_emitter.on("LOG_DATE", |date: Date|  async move {
55            println!("Month: {} - Day: {}", date.month, date.day)
56        });
57        event_emitter.emit("LOG_DATE", Date {
58            month: "January".to_string(),
59            day: "Tuesday".to_string()
60        }).await;
61        // >> "Month: January - Day: Tuesday"
62        }
63        ```
64
65        Removing listeners is also easy
66
67        ```
68        use async_event_emitter::AsyncEventEmitter as EventEmitter;
69        let event_emitter = EventEmitter::new();
70
71        let listener_id = event_emitter.on("Hello", |_: ()|  async {println!("Hello World")});
72        match event_emitter.remove_listener(listener_id) {
73            Some(listener_id) => print!("Removed event listener!"),
74            None => print!("No event listener of that id exists")
75        }
76        ```
77
78        Listening to all emitted events with a single listener
79
80        ```rust
81        use async_event_emitter::AsyncEventEmitter as EventEmitter;
82         #[tokio::main]
83          async fn main() {
84         let mut event_emitter = EventEmitter::new();
85         // this will print Hello world two because of
86         event_emitter.on_all(|value: i32| async move { println!("Hello world! - {value}") });
87         // >> "Hello world! - 1"
88         // >> "Hello world! - 2"
89         event_emitter.emit("Some event", 1).await;
90         event_emitter.emit("next event", 2).await;
91      }
92
93        ```
94        ## Creating a Global EventEmitter
95
96        It's likely that you'll want to have a single EventEmitter instance that can be shared across files;
97
98        After all, one of the main points of using an EventEmitter is to avoid passing down a value through several nested functions/types and having a global subscription service.
99
100        ```
101        // global_event_emitter.rs
102        use lazy_static::lazy_static;
103        use async_event_emitter::AsyncEventEmitter;
104
105        // Use lazy_static! because the size of EventEmitter is not known at compile time
106        lazy_static! {
107            // Export the emitter with `pub` keyword
108            pub static ref EVENT_EMITTER: AsyncEventEmitter = AsyncEventEmitter::new();
109        }
110
111        #[tokio::main]
112        async fn main() {
113            EVENT_EMITTER.on("Hello", |_:()|  async {println!("hello there!")});
114            EVENT_EMITTER.emit("Hello", ()).await;
115        }
116
117        async fn random_function() {
118            // When the <"Hello"> event is emitted in main.rs then print <"Random stuff!">
119            EVENT_EMITTER.on("Hello", |_: ()| async { println!("Random stuff!")});
120        }
121        ```
122     ### Usage with other runtimes
123     Check out the examples from the [typed version of this crate](https://docs.rs/typed-emitter/0.1.2/typed_emitter/#getting-started), just replace the emntter type.
124
125     ### Testing
126       Run the tests on this crate with all-features enabled as follows:
127       ``` cargo test --all-features```
128
129
130        License: MIT
131*/
132
133use dashmap::DashMap;
134use futures::future::{BoxFuture, Future, FutureExt};
135use futures::stream::FuturesUnordered;
136use futures::StreamExt;
137use serde::{Deserialize, Serialize};
138use uuid::Uuid;
139pub type AsyncCB = dyn Fn(Vec<u8>) -> BoxFuture<'static, ()> + Send + Sync + 'static;
140use std::sync::{Arc, Mutex};
141#[derive(Clone)]
142pub struct AsyncListener {
143    pub callback: Arc<AsyncCB>,
144    pub limit: Option<u64>,
145    pub id: Uuid,
146}
147
148#[derive(Default, Clone)]
149pub struct AsyncEventEmitter {
150    listeners: DashMap<String, Vec<AsyncListener>>,
151    all_listener: Arc<Mutex<Option<AsyncListener>>>,
152}
153
154impl AsyncEventEmitter {
155    pub fn new() -> Self {
156        Self::default()
157    }
158    /// Returns the numbers of events
159    /// # Example
160    ///
161    /// ```rust
162    /// use async_event_emitter::AsyncEventEmitter;
163    /// let event_emitter = AsyncEventEmitter::new();
164    /// event_emitter.event_count(); // returns  0
165    /// ```
166    pub fn event_count(&self) -> usize {
167        self.listeners.len()
168    }
169
170    /// Returns all listeners on the specified event
171    /// # Example
172    /// ```rust
173    /// use async_event_emitter::AsyncEventEmitter;
174    /// #[tokio::main]
175    /// async fn main() {
176    ///     let emitter = AsyncEventEmitter::new();
177    ///     emitter.on("test", |value: ()| async { println!("Hello world!") });
178    ///     emitter.emit("test", ()).await;
179    ///     let listeners = emitter.listeners_by_event("test");
180    ///     println!("{listeners:?}");
181    /// }
182    /// ```
183    pub fn listeners_by_event(&self, event: &str) -> Vec<AsyncListener> {
184        if let Some(listeners) = self.listeners.get(event) {
185            let values = listeners.to_vec();
186            return values;
187        }
188        vec![]
189    }
190    /// Returns the numbers of listners per event
191    /// # Example
192    ///
193    /// ```rust
194    /// use async_event_emitter::AsyncEventEmitter;
195    /// #[tokio::main]
196    /// async fn main() {
197    ///     let emitter = AsyncEventEmitter::new();
198    ///     emitter.on("test", |value: ()| async { println!("Hello world!") });
199    ///     emitter.emit("test", ()).await;
200    ///     emitter.listener_count_by_event("test"); // returns  1
201    /// }
202    /// ```
203    pub fn listener_count_by_event(&self, event: &str) -> usize {
204        if let Some(listeners) = self.listeners.get(event) {
205            return listeners.len();
206        }
207        0
208    }
209
210    /// Emits an event of the given parameters and executes each callback that is listening to that event asynchronously by spawning a task for each callback.
211    ///
212    /// # Example
213    ///
214    /// ```rust
215    /// use async_event_emitter::AsyncEventEmitter;
216    /// #[tokio::main]
217    /// async fn main() -> anyhow::Result<()> {
218    ///     let event_emitter = AsyncEventEmitter::new();
219    ///
220    ///     // Emits the <"Some event"> event and a value <"Hello programmer">
221    ///     // The value can be of any type as long as it implements the serde Serialize trait
222    ///     event_emitter.emit("Some event", "Hello programmer!").await;
223    ///
224    ///     Ok(())
225    /// }
226    /// ```
227    pub async fn emit<T>(&self, event: &str, value: T) -> anyhow::Result<()>
228    where
229        T: Serialize,
230    {
231        let mut futures: FuturesUnordered<_> = FuturesUnordered::new();
232
233        if let Some(ref mut listeners) = self.listeners.get_mut(event) {
234            let mut listeners_to_remove: Vec<usize> = Vec::new();
235            for (index, listener) in listeners.iter_mut().enumerate() {
236                let bytes: Vec<u8> = bincode::serialize(&value)?;
237
238                let callback = Arc::clone(&listener.callback);
239
240                match listener.limit {
241                    None => {
242                        futures.push(callback(bytes));
243                    }
244                    Some(limit) => {
245                        if limit != 0 {
246                            futures.push(callback(bytes));
247
248                            listener.limit = Some(limit - 1);
249                        } else {
250                            listeners_to_remove.push(index);
251                        }
252                    }
253                }
254            }
255
256            // Reverse here so we don't mess up the ordering of the vector
257            for index in listeners_to_remove.into_iter().rev() {
258                listeners.remove(index);
259            }
260        }
261
262        if let Some(global_listener) = self.all_listener.lock().unwrap().as_ref() {
263            let callback = global_listener.callback.clone();
264            let bytes: Vec<u8> = bincode::serialize(&value)?;
265            futures.push(callback(bytes));
266        }
267
268        while futures.next().await.is_some() {}
269        Ok(())
270    }
271
272    /// Removes an event listener with the given id
273    ///
274    /// # Example
275    ///
276    /// ```
277    /// use async_event_emitter::AsyncEventEmitter;
278    /// let event_emitter = AsyncEventEmitter::new();
279    /// let listener_id =
280    ///     event_emitter.on("Some event", |value: ()| async { println!("Hello world!") });
281    /// // Removes the listener that we just added
282    /// event_emitter.remove_listener(listener_id);
283    /// ```
284    pub fn remove_listener(&self, id_to_delete: Uuid) -> Option<Uuid> {
285        for mut mut_ref in self.listeners.iter_mut() {
286            let event_listeners = mut_ref.value_mut();
287            if let Some(index) = event_listeners
288                .iter()
289                .position(|listener| listener.id == id_to_delete)
290            {
291                event_listeners.remove(index);
292                return Some(id_to_delete);
293            }
294        }
295        let mut all_listener = self.all_listener.lock().unwrap();
296        if let Some(listener) = all_listener.as_ref() {
297            if id_to_delete == listener.id {
298                all_listener.take();
299            }
300        }
301        None
302    }
303
304    /// Adds an event listener that will only execute the listener x amount of times - Then the listener will be deleted.
305    /// Returns the id of the newly added listener.
306    ///
307    /// # Example
308    ///
309    /// ```
310    /// use async_event_emitter::AsyncEventEmitter;
311    /// #[tokio::main]
312    /// async fn main() {
313    /// let event_emitter = AsyncEventEmitter::new();
314    /// // Listener will be executed 3 times. After the third time, the listener will be deleted.
315    /// event_emitter.on_limited("Some event", Some(3), |value: ()| async{ println!("Hello world!")});
316    /// event_emitter.emit("Some event", ()).await; // 1 >> "Hello world!"
317    /// event_emitter.emit("Some event", ()).await; // 2 >> "Hello world!"
318    /// event_emitter.emit("Some event", ()).await; // 3 >> "Hello world!"
319    /// event_emitter.emit("Some event", ()).await; // 4 >> <Nothing happens here because listener was deleted after the 3rd call>
320    /// }
321    /// ```
322    pub fn on_limited<F, T, C>(&self, event: &str, limit: Option<u64>, callback: C) -> Uuid
323    where
324        for<'de> T: Deserialize<'de>,
325        C: Fn(T) -> F + Send + Sync + 'static,
326        F: Future<Output = ()> + Send + Sync + 'static,
327    {
328        let id = Uuid::new_v4();
329        let parsed_callback = move |bytes: Vec<u8>| {
330            let value: T = bincode::deserialize(&bytes).unwrap_or_else(|_| {
331                panic!(
332                    " value can't be deserialized into type {}",
333                    std::any::type_name::<T>()
334                )
335            });
336
337            callback(value).boxed()
338        };
339
340        let listener = AsyncListener {
341            id,
342            limit,
343            callback: Arc::new(parsed_callback),
344        };
345
346        match self.listeners.get_mut(event) {
347            Some(ref mut callbacks) => {
348                callbacks.push(listener);
349            }
350            None => {
351                self.listeners.insert(event.to_string(), vec![listener]);
352            }
353        }
354
355        id
356    }
357
358    /// Adds an event listener that will only execute the callback once - Then the listener will be deleted.
359    /// Returns the id of the newly added listener.
360    ///
361    /// # Example
362    ///
363    /// ```rust
364    /// use async_event_emitter::AsyncEventEmitter;
365    /// let  event_emitter = AsyncEventEmitter::new();
366    ///
367    /// event_emitter.once("Some event", |value: ()| async {println!("Hello world!")});
368    /// event_emitter.emit("Some event", ()); // First event is emitted and the listener's callback is called once
369    /// // >> "Hello world!"
370    ///
371    /// event_emitter.emit("Some event", ());
372    /// // >> <Nothing happens here since listener was deleted>
373    /// ```
374    pub fn once<F, T, C>(&self, event: &str, callback: C) -> Uuid
375    where
376        for<'de> T: Deserialize<'de>,
377        C: Fn(T) -> F + Send + Sync + 'static,
378        F: Future<Output = ()> + Send + Sync + 'static,
379    {
380        self.on_limited(event, Some(1), callback)
381    }
382
383    /// Adds an event listener with a callback that will get called whenever the given event is emitted.
384    /// Returns the id of the newly added listener.
385    ///
386    /// # Example
387    ///
388    /// ```rust
389    /// use async_event_emitter::AsyncEventEmitter;
390    /// let  event_emitter = AsyncEventEmitter::new();
391    ///
392    /// // This will print <"Hello world!"> whenever the <"Some event"> event is emitted
393    /// // The type of the `value` parameter for the closure MUST be specified and, if you plan to use the `value`, the `value` type
394    /// // MUST also match the type that is being emitted (here we just use a throwaway `()` type since we don't care about using the `value`)
395    /// event_emitter.on("Some event", |value: ()| async { println!("Hello world!")});
396    /// ```
397    pub fn on<F, T, C>(&self, event: &str, callback: C) -> Uuid
398    where
399        for<'de> T: Deserialize<'de>,
400        C: Fn(T) -> F + Send + Sync + 'static,
401        F: Future<Output = ()> + Send + Sync + 'static,
402    {
403        self.on_limited(event, None, callback)
404    }
405    /// Adds an event listener called for whenever every event is called
406    /// Returns the id of the newly added listener.
407    ///
408    /// # Example
409    /// ```rust
410    /// use async_event_emitter::AsyncEventEmitter;
411    /// #[tokio::main]
412    /// async fn main() {
413    ///     let mut event_emitter = AsyncEventEmitter::new();
414    ///     // this will print Hello world two because of
415    ///     event_emitter.on_all(|value: ()| async { println!("Hello world!") });
416    ///     event_emitter.emit("Some event", ()).await;
417    ///     // >> "Hello world!"
418    ///
419    ///     event_emitter.emit("next event", ()).await;
420    ///     // >> <Nothing happens here since listener was deleted>
421    /// }
422    /// ```
423    pub fn on_all<F, T, C>(&self, callback: C) -> Uuid
424    where
425        for<'de> T: Deserialize<'de>,
426        C: Fn(T) -> F + Send + Sync + 'static,
427        F: Future<Output = ()> + Send + Sync + 'static,
428    {
429        assert!(
430            self.all_listener.lock().unwrap().is_none(),
431            "only one global listener is allowed"
432        );
433        let id = Uuid::new_v4();
434        let parsed_callback = move |bytes: Vec<u8>| {
435            let value: T = bincode::deserialize(&bytes).unwrap_or_else(|_| {
436                panic!(
437                    " value can't be deserialized into type {}",
438                    std::any::type_name::<T>()
439                )
440            });
441            callback(value).boxed()
442        };
443
444        let listener = AsyncListener {
445            id,
446            limit: None,
447            callback: Arc::new(parsed_callback),
448        };
449
450        self.all_listener.lock().unwrap().replace(listener);
451
452        id
453    }
454}
455
456// test the AsyncEventEmitter
457// implement fmt::Debug for AsyncEventListener
458use std::fmt;
459impl fmt::Debug for AsyncListener {
460    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461        f.debug_struct("AsyncListener")
462            .field("id", &self.id)
463            .field("limit", &self.limit)
464            .finish()
465    }
466}
467
468// implement fmt::Debug   for AsyncEventEmitter
469impl fmt::Debug for AsyncEventEmitter {
470    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
471        f.debug_struct("AsyncEventEmitter")
472            .field("listeners", &self.listeners)
473            .finish()
474    }
475}