async_event_emitter/
lib.rs

1/*!
2
3        an Async implementation of the  [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs) crate
4
5        Allows you to subscribe to events with callbacks and also fire those events.
6        Events are in the form of (strings, value) and callbacks are in the form of closures that take in a value parameter;
7
8        ## Differences between this crate and [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs)
9        - This is an async implementation that works for all common async runtimes (Tokio, async-std and smol)
10        - The listener methods ***(on and once)*** take a callback that returns a future instead of a merely a closure.
11        - The emit methods executes each callback on each event by spawning a tokio task instead of a std::thread
12        - This emitter is thread safe and can  also be used lock-free (supports interior mutability).
13
14
15        ***Note***: To use strict return and event types, use [typed-emitter](https://crates.io/crates/typed-emitter), that crate solves [this issue](https://github.com/spencerjibz/async-event-emitter-rs/issues/31) too.
16
17        ## Getting Started
18
19        ```
20        use async_event_emitter::AsyncEventEmitter;
21        #[tokio::main]
22        async fn main() {
23        let event_emitter = AsyncEventEmitter::new();
24        // This will print <"Hello world!"> whenever the <"Say Hello"> event is emitted
25        event_emitter.on("Say Hello", |_:()|  async move { println!("Hello world!")});
26        event_emitter.emit("Say Hello", ()).await;
27        // >> "Hello world!"
28
29        }
30        ```
31        ## Basic Usage
32        We can emit and listen to values of any type so long as they implement serde's Serialize and Deserialize traits.
33        A single EventEmitter instance can have listeners to values of multiple types.
34
35        ```
36        use async_event_emitter::AsyncEventEmitter as EventEmitter;
37        use serde::{Deserialize, Serialize};
38        #[tokio::main]
39        async fn main () {
40        let event_emitter = EventEmitter::new();
41        event_emitter.on("Add three", |number: f32| async move  {println!("{}", number + 3.0)});
42        event_emitter.emit("Add three", 5.0 as f32).await;
43        event_emitter.emit("Add three", 4.0 as f32).await;
44
45        // >> "8.0"
46        // >> "7.0"
47
48        // Using a more advanced value type such as a struct by implementing the serde traits
49        #[derive(Serialize, Deserialize,Debug)]
50        struct Date {
51            month: String,
52            day: String,
53        }
54
55        event_emitter.on("LOG_DATE", |date: Date|  async move {
56            println!("Month: {} - Day: {}", date.month, date.day)
57        });
58        event_emitter.emit("LOG_DATE", Date {
59            month: "January".to_string(),
60            day: "Tuesday".to_string()
61        }).await;
62        // >> "Month: January - Day: Tuesday"
63        }
64        ```
65
66        Removing listeners is also easy
67
68        ```
69        use async_event_emitter::AsyncEventEmitter as EventEmitter;
70        let event_emitter = EventEmitter::new();
71
72        let listener_id = event_emitter.on("Hello", |_: ()|  async {println!("Hello World")});
73        match event_emitter.remove_listener(&listener_id) {
74            Some(listener_id) => print!("Removed event listener!"),
75            None => print!("No event listener of that id exists")
76        }
77        ```
78        ## Creating a Global EventEmitter
79
80        It's likely that you'll want to have a single EventEmitter instance that can be shared across files;
81
82        After all, one of the main points of using an EventEmitter is to avoid passing down a value through several nested functions/types and having a global subscription service.
83
84        ```
85        // global_event_emitter.rs
86        use lazy_static::lazy_static;
87        use async_event_emitter::AsyncEventEmitter;
88
89        // Use lazy_static! because the size of EventEmitter is not known at compile time
90        lazy_static! {
91            // Export the emitter with `pub` keyword
92            pub static ref EVENT_EMITTER: AsyncEventEmitter = AsyncEventEmitter::new();
93        }
94
95        #[tokio::main]
96        async fn main() {
97            EVENT_EMITTER.on("Hello", |_:()|  async {println!("hello there!")});
98            EVENT_EMITTER.emit("Hello", ()).await;
99        }
100
101        async fn random_function() {
102            // When the <"Hello"> event is emitted in main.rs then print <"Random stuff!">
103            EVENT_EMITTER.on("Hello", |_: ()| async { println!("Random stuff!")});
104        }
105        ```
106     ### Usage with other runtimes
107     Check out the examples from the [typed version of this crate](https://docs.rs/typed-emitter/0.1.2/typed_emitter/#getting-started), just replace the emntter type.
108
109     ### Testing
110       Run the tests on this crate with all-features enabled as follows:
111       ``` cargo test --all-features```
112
113
114        License: MIT
115*/
116
117use dashmap::DashMap;
118use futures::future::{BoxFuture, Future, FutureExt};
119use futures::stream::FuturesUnordered;
120use futures::StreamExt;
121use serde::{Deserialize, Serialize};
122use uuid::Uuid;
123pub type AsyncCB = dyn Fn(Vec<u8>) -> BoxFuture<'static, ()> + Send + Sync + 'static;
124use std::sync::Arc;
125#[derive(Clone)]
126pub struct AsyncListener {
127    pub callback: Arc<AsyncCB>,
128    pub limit: Option<u64>,
129    pub id: String,
130}
131
132#[derive(Default, Clone)]
133pub struct AsyncEventEmitter {
134    pub listeners: DashMap<String, Vec<AsyncListener>>,
135}
136
137impl AsyncEventEmitter {
138    pub fn new() -> Self {
139        Self::default()
140    }
141
142    /// Emits an event of the given parameters and executes each callback that is listening to that event asynchronously by spawning a task for each callback.
143    ///
144    /// # Example
145    ///
146    /// ```rust
147    /// use async_event_emitter::AsyncEventEmitter;
148    /// #[tokio::main]
149    /// async fn main() -> anyhow::Result<()> {
150    ///     let event_emitter = AsyncEventEmitter::new();
151    ///
152    ///     // Emits the <"Some event"> event and a value <"Hello programmer">
153    ///     // The value can be of any type as long as it implements the serde Serialize trait
154    ///     event_emitter.emit("Some event", "Hello programmer!").await;
155    ///
156    ///     Ok(())
157    /// }
158    /// ```
159    pub async fn emit<'a, T>(&self, event: &str, value: T) -> anyhow::Result<()>
160    where
161        T: Serialize + Deserialize<'a> + Send + Sync + 'a,
162    {
163        let mut futures: FuturesUnordered<_> = FuturesUnordered::new();
164
165        if let Some(ref mut listeners) = self.listeners.get_mut(event) {
166            let mut listeners_to_remove: Vec<usize> = Vec::new();
167            for (index, listener) in listeners.iter_mut().enumerate() {
168                let bytes: Vec<u8> = bincode::serialize(&value)?;
169
170                let callback = Arc::clone(&listener.callback);
171
172                match listener.limit {
173                    None => {
174                        futures.push(callback(bytes));
175                    }
176                    Some(limit) => {
177                        if limit != 0 {
178                            futures.push(callback(bytes));
179
180                            listener.limit = Some(limit - 1);
181                        } else {
182                            listeners_to_remove.push(index);
183                        }
184                    }
185                }
186            }
187
188            // Reverse here so we don't mess up the ordering of the vector
189            for index in listeners_to_remove.into_iter().rev() {
190                listeners.remove(index);
191            }
192        }
193
194        while futures.next().await.is_some() {}
195        Ok(())
196    }
197
198    /// Removes an event listener with the given id
199    ///
200    /// # Example
201    ///
202    /// ```
203    /// use async_event_emitter::AsyncEventEmitter;
204    /// let event_emitter = AsyncEventEmitter::new();
205    /// let listener_id =
206    ///     event_emitter.on("Some event", |value: ()| async { println!("Hello world!") });
207    /// println!("{:?}", event_emitter.listeners);
208    ///
209    /// // Removes the listener that we just added
210    /// event_emitter.remove_listener(&listener_id);
211    /// ```
212    pub fn remove_listener(&self, id_to_delete: &str) -> Option<String> {
213        for mut mut_ref in self.listeners.iter_mut() {
214            let event_listeners = mut_ref.value_mut();
215            if let Some(index) = event_listeners
216                .iter()
217                .position(|listener| listener.id == id_to_delete)
218            {
219                event_listeners.remove(index);
220                return Some(id_to_delete.to_string());
221            }
222        }
223
224        None
225    }
226
227    /// Adds an event listener that will only execute the listener x amount of times - Then the listener will be deleted.
228    /// Returns the id of the newly added listener.
229    ///
230    /// # Example
231    ///
232    /// ```
233    /// use async_event_emitter::AsyncEventEmitter;
234    /// #[tokio::main]
235    /// async fn main() {
236    /// let event_emitter = AsyncEventEmitter::new();
237    /// // Listener will be executed 3 times. After the third time, the listener will be deleted.
238    /// event_emitter.on_limited("Some event", Some(3), |value: ()| async{ println!("Hello world!")});
239    /// event_emitter.emit("Some event", ()).await; // 1 >> "Hello world!"
240    /// event_emitter.emit("Some event", ()).await; // 2 >> "Hello world!"
241    /// event_emitter.emit("Some event", ()).await; // 3 >> "Hello world!"
242    /// event_emitter.emit("Some event", ()).await; // 4 >> <Nothing happens here because listener was deleted after the 3rd call>
243    /// }
244    /// ```
245    pub fn on_limited<F, T, C>(&self, event: &str, limit: Option<u64>, callback: C) -> String
246    where
247        for<'de> T: Deserialize<'de>,
248        C: Fn(T) -> F + Send + Sync + 'static,
249        F: Future<Output = ()> + Send + Sync + 'static,
250    {
251        let id = Uuid::new_v4().to_string();
252        let parsed_callback = move |bytes: Vec<u8>| {
253            let value: T = bincode::deserialize(&bytes).unwrap_or_else(|_| {
254                panic!(
255                    " value can't be deserialized into type {}",
256                    std::any::type_name::<T>()
257                )
258            });
259
260            callback(value).boxed()
261        };
262
263        let listener = AsyncListener {
264            id: id.clone(),
265            limit,
266            callback: Arc::new(parsed_callback),
267        };
268
269        match self.listeners.get_mut(event) {
270            Some(ref mut callbacks) => {
271                callbacks.push(listener);
272            }
273            None => {
274                self.listeners.insert(event.to_string(), vec![listener]);
275            }
276        }
277
278        id
279    }
280
281    /// Adds an event listener that will only execute the callback once - Then the listener will be deleted.
282    /// Returns the id of the newly added listener.
283    ///
284    /// # Example
285    ///
286    /// ```rust
287    /// use async_event_emitter::AsyncEventEmitter;
288    /// let  event_emitter = AsyncEventEmitter::new();
289    ///
290    /// event_emitter.once("Some event", |value: ()| async {println!("Hello world!")});
291    /// event_emitter.emit("Some event", ()); // First event is emitted and the listener's callback is called once
292    /// // >> "Hello world!"
293    ///
294    /// event_emitter.emit("Some event", ());
295    /// // >> <Nothing happens here since listener was deleted>
296    /// ```
297    pub fn once<F, T, C>(&self, event: &str, callback: C) -> String
298    where
299        for<'de> T: Deserialize<'de>,
300        C: Fn(T) -> F + Send + Sync + 'static,
301        F: Future<Output = ()> + Send + Sync + 'static,
302    {
303        self.on_limited(event, Some(1), callback)
304    }
305
306    /// Adds an event listener with a callback that will get called whenever the given event is emitted.
307    /// Returns the id of the newly added listener.
308    ///
309    /// # Example
310    ///
311    /// ```rust
312    /// use async_event_emitter::AsyncEventEmitter;
313    /// let  event_emitter = AsyncEventEmitter::new();
314    ///
315    /// // This will print <"Hello world!"> whenever the <"Some event"> event is emitted
316    /// // The type of the `value` parameter for the closure MUST be specified and, if you plan to use the `value`, the `value` type
317    /// // MUST also match the type that is being emitted (here we just use a throwaway `()` type since we don't care about using the `value`)
318    /// event_emitter.on("Some event", |value: ()| async { println!("Hello world!")});
319    /// ```
320    pub fn on<F, T, C>(&self, event: &str, callback: C) -> String
321    where
322        for<'de> T: Deserialize<'de>,
323        C: Fn(T) -> F + Send + Sync + 'static,
324        F: Future<Output = ()> + Send + Sync + 'static,
325    {
326        self.on_limited(event, None, callback)
327    }
328}
329
330// test the AsyncEventEmitter
331// implement fmt::Debug for AsyncEventListener
332use std::fmt;
333impl fmt::Debug for AsyncListener {
334    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335        f.debug_struct("AsyncListener")
336            .field("id", &self.id)
337            .field("limit", &self.limit)
338            .finish()
339    }
340}
341
342// implement fmt::Debug   for AsyncEventEmitter
343impl fmt::Debug for AsyncEventEmitter {
344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345        f.debug_struct("AsyncEventEmitter")
346            .field("listeners", &self.listeners)
347            .finish()
348    }
349}