async_event_emitter/lib.rs
1/*!
2
3 an Async implementation of the [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs) crate
4
5 Allows you to subscribe to events with callbacks and also fire those events.
6 Events are in the form of (strings, value) and callbacks are in the form of closures that take in a value parameter;
7
8 ## Differences between this crate and [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs)
9 - This is an async implementation that works for all common async runtimes (Tokio, async-std and smol)
10 - The listener methods ***(on and once)*** take a callback that returns a future instead of a merely a closure.
11 - The emit methods executes each callback on each event by running async intra-task instead of spawning a std::thread
12 - This emitter is thread safe and can also be used lock-free (supports interior mutability).
13
14 ***Note***: To use strict return and event types, use [typed-emitter](https://crates.io/crates/typed-emitter), that crate solves [this issue](https://github.com/spencerjibz/async-event-emitter-rs/issues/31) too.
15
16 ## Getting Started
17
18 ```
19 use async_event_emitter::AsyncEventEmitter;
20 #[tokio::main]
21 async fn main() {
22 let event_emitter = AsyncEventEmitter::new();
23 // This will print <"Hello world!"> whenever the <"Say Hello"> event is emitted
24 event_emitter.on("Say Hello", |_:()| async move { println!("Hello world!")});
25 event_emitter.emit("Say Hello", ()).await;
26 // >> "Hello world!"
27
28 }
29 ```
30 ## Basic Usage
31 We can emit and listen to values of any type so long as they implement serde's Serialize and Deserialize traits.
32 A single EventEmitter instance can have listeners to values of multiple types.
33
34 ```
35 use async_event_emitter::AsyncEventEmitter as EventEmitter;
36 use serde::{Deserialize, Serialize};
37 #[tokio::main]
38 async fn main () {
39 let event_emitter = EventEmitter::new();
40 event_emitter.on("Add three", |number: f32| async move {println!("{}", number + 3.0)});
41 event_emitter.emit("Add three", 5.0 as f32).await;
42 event_emitter.emit("Add three", 4.0 as f32).await;
43
44 // >> "8.0"
45 // >> "7.0"
46
47 // Using a more advanced value type such as a struct by implementing the serde traits
48 #[derive(Serialize, Deserialize,Debug)]
49 struct Date {
50 month: String,
51 day: String,
52 }
53
54 event_emitter.on("LOG_DATE", |date: Date| async move {
55 println!("Month: {} - Day: {}", date.month, date.day)
56 });
57 event_emitter.emit("LOG_DATE", Date {
58 month: "January".to_string(),
59 day: "Tuesday".to_string()
60 }).await;
61 // >> "Month: January - Day: Tuesday"
62 }
63 ```
64
65 Removing listeners is also easy
66
67 ```
68 use async_event_emitter::AsyncEventEmitter as EventEmitter;
69 let event_emitter = EventEmitter::new();
70
71 let listener_id = event_emitter.on("Hello", |_: ()| async {println!("Hello World")});
72 match event_emitter.remove_listener(listener_id) {
73 Some(listener_id) => print!("Removed event listener!"),
74 None => print!("No event listener of that id exists")
75 }
76 ```
77
78 Listening to all emitted events with a single listener
79
80 ```rust
81 use async_event_emitter::AsyncEventEmitter as EventEmitter;
82 #[tokio::main]
83 async fn main() {
84 let mut event_emitter = EventEmitter::new();
85 // this will print Hello world two because of
86 event_emitter.on_all(|value: i32| async move { println!("Hello world! - {value}") });
87 // >> "Hello world! - 1"
88 // >> "Hello world! - 2"
89 event_emitter.emit("Some event", 1).await;
90 event_emitter.emit("next event", 2).await;
91 }
92
93 ```
94 ## Creating a Global EventEmitter
95
96 It's likely that you'll want to have a single EventEmitter instance that can be shared across files;
97
98 After all, one of the main points of using an EventEmitter is to avoid passing down a value through several nested functions/types and having a global subscription service.
99
100 ```
101 // global_event_emitter.rs
102 use lazy_static::lazy_static;
103 use async_event_emitter::AsyncEventEmitter;
104
105 // Use lazy_static! because the size of EventEmitter is not known at compile time
106 lazy_static! {
107 // Export the emitter with `pub` keyword
108 pub static ref EVENT_EMITTER: AsyncEventEmitter = AsyncEventEmitter::new();
109 }
110
111 #[tokio::main]
112 async fn main() {
113 EVENT_EMITTER.on("Hello", |_:()| async {println!("hello there!")});
114 EVENT_EMITTER.emit("Hello", ()).await;
115 }
116
117 async fn random_function() {
118 // When the <"Hello"> event is emitted in main.rs then print <"Random stuff!">
119 EVENT_EMITTER.on("Hello", |_: ()| async { println!("Random stuff!")});
120 }
121 ```
122 ### Usage with other runtimes
123 Check out the examples from the [typed version of this crate](https://docs.rs/typed-emitter/0.1.2/typed_emitter/#getting-started), just replace the emntter type.
124
125 ### Testing
126 Run the tests on this crate with all-features enabled as follows:
127 ``` cargo test --all-features```
128
129
130 License: MIT
131*/
132
133use dashmap::DashMap;
134use futures::future::{BoxFuture, Future, FutureExt};
135use futures::stream::FuturesUnordered;
136use futures::StreamExt;
137use serde::{Deserialize, Serialize};
138use uuid::Uuid;
139pub type AsyncCB = dyn Fn(Vec<u8>) -> BoxFuture<'static, ()> + Send + Sync + 'static;
140use std::sync::{Arc, Mutex};
141#[derive(Clone)]
142pub struct AsyncListener {
143 pub callback: Arc<AsyncCB>,
144 pub limit: Option<u64>,
145 pub id: Uuid,
146}
147
148#[derive(Default, Clone)]
149pub struct AsyncEventEmitter {
150 listeners: DashMap<String, Vec<AsyncListener>>,
151 all_listener: Arc<Mutex<Option<AsyncListener>>>,
152}
153
154impl AsyncEventEmitter {
155 pub fn new() -> Self {
156 Self::default()
157 }
158 /// Returns the numbers of events
159 /// # Example
160 ///
161 /// ```rust
162 /// use async_event_emitter::AsyncEventEmitter;
163 /// let event_emitter = AsyncEventEmitter::new();
164 /// event_emitter.event_count(); // returns 0
165 /// ```
166 pub fn event_count(&self) -> usize {
167 self.listeners.len()
168 }
169
170 /// Returns all listeners on the specified event
171 /// # Example
172 /// ```rust
173 /// use async_event_emitter::AsyncEventEmitter;
174 /// #[tokio::main]
175 /// async fn main() {
176 /// let emitter = AsyncEventEmitter::new();
177 /// emitter.on("test", |value: ()| async { println!("Hello world!") });
178 /// emitter.emit("test", ()).await;
179 /// let listeners = emitter.listeners_by_event("test");
180 /// println!("{listeners:?}");
181 /// }
182 /// ```
183 pub fn listeners_by_event(&self, event: &str) -> Vec<AsyncListener> {
184 if let Some(listeners) = self.listeners.get(event) {
185 let values = listeners.to_vec();
186 return values;
187 }
188 vec![]
189 }
190 /// Returns the numbers of listners per event
191 /// # Example
192 ///
193 /// ```rust
194 /// use async_event_emitter::AsyncEventEmitter;
195 /// #[tokio::main]
196 /// async fn main() {
197 /// let emitter = AsyncEventEmitter::new();
198 /// emitter.on("test", |value: ()| async { println!("Hello world!") });
199 /// emitter.emit("test", ()).await;
200 /// emitter.listener_count_by_event("test"); // returns 1
201 /// }
202 /// ```
203 pub fn listener_count_by_event(&self, event: &str) -> usize {
204 if let Some(listeners) = self.listeners.get(event) {
205 return listeners.len();
206 }
207 0
208 }
209
210 /// Emits an event of the given parameters and executes each callback that is listening to that event asynchronously by spawning a task for each callback.
211 ///
212 /// # Example
213 ///
214 /// ```rust
215 /// use async_event_emitter::AsyncEventEmitter;
216 /// #[tokio::main]
217 /// async fn main() -> anyhow::Result<()> {
218 /// let event_emitter = AsyncEventEmitter::new();
219 ///
220 /// // Emits the <"Some event"> event and a value <"Hello programmer">
221 /// // The value can be of any type as long as it implements the serde Serialize trait
222 /// event_emitter.emit("Some event", "Hello programmer!").await;
223 ///
224 /// Ok(())
225 /// }
226 /// ```
227 pub async fn emit<T>(&self, event: &str, value: T) -> anyhow::Result<()>
228 where
229 T: Serialize,
230 {
231 let mut futures: FuturesUnordered<_> = FuturesUnordered::new();
232
233 if let Some(ref mut listeners) = self.listeners.get_mut(event) {
234 let mut listeners_to_remove: Vec<usize> = Vec::new();
235 for (index, listener) in listeners.iter_mut().enumerate() {
236 let bytes: Vec<u8> = bincode::serialize(&value)?;
237
238 let callback = Arc::clone(&listener.callback);
239
240 match listener.limit {
241 None => {
242 futures.push(callback(bytes));
243 }
244 Some(limit) => {
245 if limit != 0 {
246 futures.push(callback(bytes));
247
248 listener.limit = Some(limit - 1);
249 } else {
250 listeners_to_remove.push(index);
251 }
252 }
253 }
254 }
255
256 // Reverse here so we don't mess up the ordering of the vector
257 for index in listeners_to_remove.into_iter().rev() {
258 listeners.remove(index);
259 }
260 }
261
262 if let Some(global_listener) = self.all_listener.lock().unwrap().as_ref() {
263 let callback = global_listener.callback.clone();
264 let bytes: Vec<u8> = bincode::serialize(&value)?;
265 futures.push(callback(bytes));
266 }
267
268 while futures.next().await.is_some() {}
269 Ok(())
270 }
271
272 /// Removes an event listener with the given id
273 ///
274 /// # Example
275 ///
276 /// ```
277 /// use async_event_emitter::AsyncEventEmitter;
278 /// let event_emitter = AsyncEventEmitter::new();
279 /// let listener_id =
280 /// event_emitter.on("Some event", |value: ()| async { println!("Hello world!") });
281 /// // Removes the listener that we just added
282 /// event_emitter.remove_listener(listener_id);
283 /// ```
284 pub fn remove_listener(&self, id_to_delete: Uuid) -> Option<Uuid> {
285 for mut mut_ref in self.listeners.iter_mut() {
286 let event_listeners = mut_ref.value_mut();
287 if let Some(index) = event_listeners
288 .iter()
289 .position(|listener| listener.id == id_to_delete)
290 {
291 event_listeners.remove(index);
292 return Some(id_to_delete);
293 }
294 }
295 let mut all_listener = self.all_listener.lock().unwrap();
296 if let Some(listener) = all_listener.as_ref() {
297 if id_to_delete == listener.id {
298 all_listener.take();
299 }
300 }
301 None
302 }
303
304 /// Adds an event listener that will only execute the listener x amount of times - Then the listener will be deleted.
305 /// Returns the id of the newly added listener.
306 ///
307 /// # Example
308 ///
309 /// ```
310 /// use async_event_emitter::AsyncEventEmitter;
311 /// #[tokio::main]
312 /// async fn main() {
313 /// let event_emitter = AsyncEventEmitter::new();
314 /// // Listener will be executed 3 times. After the third time, the listener will be deleted.
315 /// event_emitter.on_limited("Some event", Some(3), |value: ()| async{ println!("Hello world!")});
316 /// event_emitter.emit("Some event", ()).await; // 1 >> "Hello world!"
317 /// event_emitter.emit("Some event", ()).await; // 2 >> "Hello world!"
318 /// event_emitter.emit("Some event", ()).await; // 3 >> "Hello world!"
319 /// event_emitter.emit("Some event", ()).await; // 4 >> <Nothing happens here because listener was deleted after the 3rd call>
320 /// }
321 /// ```
322 pub fn on_limited<F, T, C>(&self, event: &str, limit: Option<u64>, callback: C) -> Uuid
323 where
324 for<'de> T: Deserialize<'de>,
325 C: Fn(T) -> F + Send + Sync + 'static,
326 F: Future<Output = ()> + Send + Sync + 'static,
327 {
328 let id = Uuid::new_v4();
329 let parsed_callback = move |bytes: Vec<u8>| {
330 let value: T = bincode::deserialize(&bytes).unwrap_or_else(|_| {
331 panic!(
332 " value can't be deserialized into type {}",
333 std::any::type_name::<T>()
334 )
335 });
336
337 callback(value).boxed()
338 };
339
340 let listener = AsyncListener {
341 id,
342 limit,
343 callback: Arc::new(parsed_callback),
344 };
345
346 match self.listeners.get_mut(event) {
347 Some(ref mut callbacks) => {
348 callbacks.push(listener);
349 }
350 None => {
351 self.listeners.insert(event.to_string(), vec![listener]);
352 }
353 }
354
355 id
356 }
357
358 /// Adds an event listener that will only execute the callback once - Then the listener will be deleted.
359 /// Returns the id of the newly added listener.
360 ///
361 /// # Example
362 ///
363 /// ```rust
364 /// use async_event_emitter::AsyncEventEmitter;
365 /// let event_emitter = AsyncEventEmitter::new();
366 ///
367 /// event_emitter.once("Some event", |value: ()| async {println!("Hello world!")});
368 /// event_emitter.emit("Some event", ()); // First event is emitted and the listener's callback is called once
369 /// // >> "Hello world!"
370 ///
371 /// event_emitter.emit("Some event", ());
372 /// // >> <Nothing happens here since listener was deleted>
373 /// ```
374 pub fn once<F, T, C>(&self, event: &str, callback: C) -> Uuid
375 where
376 for<'de> T: Deserialize<'de>,
377 C: Fn(T) -> F + Send + Sync + 'static,
378 F: Future<Output = ()> + Send + Sync + 'static,
379 {
380 self.on_limited(event, Some(1), callback)
381 }
382
383 /// Adds an event listener with a callback that will get called whenever the given event is emitted.
384 /// Returns the id of the newly added listener.
385 ///
386 /// # Example
387 ///
388 /// ```rust
389 /// use async_event_emitter::AsyncEventEmitter;
390 /// let event_emitter = AsyncEventEmitter::new();
391 ///
392 /// // This will print <"Hello world!"> whenever the <"Some event"> event is emitted
393 /// // The type of the `value` parameter for the closure MUST be specified and, if you plan to use the `value`, the `value` type
394 /// // MUST also match the type that is being emitted (here we just use a throwaway `()` type since we don't care about using the `value`)
395 /// event_emitter.on("Some event", |value: ()| async { println!("Hello world!")});
396 /// ```
397 pub fn on<F, T, C>(&self, event: &str, callback: C) -> Uuid
398 where
399 for<'de> T: Deserialize<'de>,
400 C: Fn(T) -> F + Send + Sync + 'static,
401 F: Future<Output = ()> + Send + Sync + 'static,
402 {
403 self.on_limited(event, None, callback)
404 }
405 /// Adds an event listener called for whenever every event is called
406 /// Returns the id of the newly added listener.
407 ///
408 /// # Example
409 /// ```rust
410 /// use async_event_emitter::AsyncEventEmitter;
411 /// #[tokio::main]
412 /// async fn main() {
413 /// let mut event_emitter = AsyncEventEmitter::new();
414 /// // this will print Hello world two because of
415 /// event_emitter.on_all(|value: ()| async { println!("Hello world!") });
416 /// event_emitter.emit("Some event", ()).await;
417 /// // >> "Hello world!"
418 ///
419 /// event_emitter.emit("next event", ()).await;
420 /// // >> <Nothing happens here since listener was deleted>
421 /// }
422 /// ```
423 pub fn on_all<F, T, C>(&self, callback: C) -> Uuid
424 where
425 for<'de> T: Deserialize<'de>,
426 C: Fn(T) -> F + Send + Sync + 'static,
427 F: Future<Output = ()> + Send + Sync + 'static,
428 {
429 assert!(
430 self.all_listener.lock().unwrap().is_none(),
431 "only one global listener is allowed"
432 );
433 let id = Uuid::new_v4();
434 let parsed_callback = move |bytes: Vec<u8>| {
435 let value: T = bincode::deserialize(&bytes).unwrap_or_else(|_| {
436 panic!(
437 " value can't be deserialized into type {}",
438 std::any::type_name::<T>()
439 )
440 });
441 callback(value).boxed()
442 };
443
444 let listener = AsyncListener {
445 id,
446 limit: None,
447 callback: Arc::new(parsed_callback),
448 };
449
450 self.all_listener.lock().unwrap().replace(listener);
451
452 id
453 }
454}
455
456// test the AsyncEventEmitter
457// implement fmt::Debug for AsyncEventListener
458use std::fmt;
459impl fmt::Debug for AsyncListener {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 f.debug_struct("AsyncListener")
462 .field("id", &self.id)
463 .field("limit", &self.limit)
464 .finish()
465 }
466}
467
468// implement fmt::Debug for AsyncEventEmitter
469impl fmt::Debug for AsyncEventEmitter {
470 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
471 f.debug_struct("AsyncEventEmitter")
472 .field("listeners", &self.listeners)
473 .finish()
474 }
475}