async_event_emitter/lib.rs
1/*!
2
3 an Async implementation of the [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs) crate
4
5 Allows you to subscribe to events with callbacks and also fire those events.
6 Events are in the form of (strings, value) and callbacks are in the form of closures that take in a value parameter;
7
8 ## Differences between this crate and [`event-emitter-rs`](https://crates.io/crates/event-emitter-rs)
9 - This is an async implementation that works for all common async runtimes (Tokio, async-std and smol)
10 - The listener methods ***(on and once)*** take a callback that returns a future instead of a merely a closure.
11 - The emit methods executes each callback on each event by spawning a tokio task instead of a std::thread
12 - This emitter is thread safe and can also be used lock-free (supports interior mutability).
13
14
15 ***Note***: To use strict return and event types, use [typed-emitter](https://crates.io/crates/typed-emitter), that crate solves [this issue](https://github.com/spencerjibz/async-event-emitter-rs/issues/31) too.
16
17 ## Getting Started
18
19 ```
20 use async_event_emitter::AsyncEventEmitter;
21 #[tokio::main]
22 async fn main() {
23 let event_emitter = AsyncEventEmitter::new();
24 // This will print <"Hello world!"> whenever the <"Say Hello"> event is emitted
25 event_emitter.on("Say Hello", |_:()| async move { println!("Hello world!")});
26 event_emitter.emit("Say Hello", ()).await;
27 // >> "Hello world!"
28
29 }
30 ```
31 ## Basic Usage
32 We can emit and listen to values of any type so long as they implement serde's Serialize and Deserialize traits.
33 A single EventEmitter instance can have listeners to values of multiple types.
34
35 ```
36 use async_event_emitter::AsyncEventEmitter as EventEmitter;
37 use serde::{Deserialize, Serialize};
38 #[tokio::main]
39 async fn main () {
40 let event_emitter = EventEmitter::new();
41 event_emitter.on("Add three", |number: f32| async move {println!("{}", number + 3.0)});
42 event_emitter.emit("Add three", 5.0 as f32).await;
43 event_emitter.emit("Add three", 4.0 as f32).await;
44
45 // >> "8.0"
46 // >> "7.0"
47
48 // Using a more advanced value type such as a struct by implementing the serde traits
49 #[derive(Serialize, Deserialize,Debug)]
50 struct Date {
51 month: String,
52 day: String,
53 }
54
55 event_emitter.on("LOG_DATE", |date: Date| async move {
56 println!("Month: {} - Day: {}", date.month, date.day)
57 });
58 event_emitter.emit("LOG_DATE", Date {
59 month: "January".to_string(),
60 day: "Tuesday".to_string()
61 }).await;
62 // >> "Month: January - Day: Tuesday"
63 }
64 ```
65
66 Removing listeners is also easy
67
68 ```
69 use async_event_emitter::AsyncEventEmitter as EventEmitter;
70 let event_emitter = EventEmitter::new();
71
72 let listener_id = event_emitter.on("Hello", |_: ()| async {println!("Hello World")});
73 match event_emitter.remove_listener(&listener_id) {
74 Some(listener_id) => print!("Removed event listener!"),
75 None => print!("No event listener of that id exists")
76 }
77 ```
78 ## Creating a Global EventEmitter
79
80 It's likely that you'll want to have a single EventEmitter instance that can be shared across files;
81
82 After all, one of the main points of using an EventEmitter is to avoid passing down a value through several nested functions/types and having a global subscription service.
83
84 ```
85 // global_event_emitter.rs
86 use lazy_static::lazy_static;
87 use async_event_emitter::AsyncEventEmitter;
88
89 // Use lazy_static! because the size of EventEmitter is not known at compile time
90 lazy_static! {
91 // Export the emitter with `pub` keyword
92 pub static ref EVENT_EMITTER: AsyncEventEmitter = AsyncEventEmitter::new();
93 }
94
95 #[tokio::main]
96 async fn main() {
97 EVENT_EMITTER.on("Hello", |_:()| async {println!("hello there!")});
98 EVENT_EMITTER.emit("Hello", ()).await;
99 }
100
101 async fn random_function() {
102 // When the <"Hello"> event is emitted in main.rs then print <"Random stuff!">
103 EVENT_EMITTER.on("Hello", |_: ()| async { println!("Random stuff!")});
104 }
105 ```
106 ### Usage with other runtimes
107 Check out the examples from the [typed version of this crate](https://docs.rs/typed-emitter/0.1.2/typed_emitter/#getting-started), just replace the emntter type.
108
109 ### Testing
110 Run the tests on this crate with all-features enabled as follows:
111 ``` cargo test --all-features```
112
113
114 License: MIT
115*/
116
117use dashmap::DashMap;
118use futures::future::{BoxFuture, Future, FutureExt};
119use futures::stream::FuturesUnordered;
120use futures::StreamExt;
121use serde::{Deserialize, Serialize};
122use uuid::Uuid;
123pub type AsyncCB = dyn Fn(Vec<u8>) -> BoxFuture<'static, ()> + Send + Sync + 'static;
124use std::sync::Arc;
125#[derive(Clone)]
126pub struct AsyncListener {
127 pub callback: Arc<AsyncCB>,
128 pub limit: Option<u64>,
129 pub id: String,
130}
131
132#[derive(Default, Clone)]
133pub struct AsyncEventEmitter {
134 pub listeners: DashMap<String, Vec<AsyncListener>>,
135}
136
137impl AsyncEventEmitter {
138 pub fn new() -> Self {
139 Self::default()
140 }
141
142 /// Emits an event of the given parameters and executes each callback that is listening to that event asynchronously by spawning a task for each callback.
143 ///
144 /// # Example
145 ///
146 /// ```rust
147 /// use async_event_emitter::AsyncEventEmitter;
148 /// #[tokio::main]
149 /// async fn main() -> anyhow::Result<()> {
150 /// let event_emitter = AsyncEventEmitter::new();
151 ///
152 /// // Emits the <"Some event"> event and a value <"Hello programmer">
153 /// // The value can be of any type as long as it implements the serde Serialize trait
154 /// event_emitter.emit("Some event", "Hello programmer!").await;
155 ///
156 /// Ok(())
157 /// }
158 /// ```
159 pub async fn emit<'a, T>(&self, event: &str, value: T) -> anyhow::Result<()>
160 where
161 T: Serialize + Deserialize<'a> + Send + Sync + 'a,
162 {
163 let mut futures: FuturesUnordered<_> = FuturesUnordered::new();
164
165 if let Some(ref mut listeners) = self.listeners.get_mut(event) {
166 let mut listeners_to_remove: Vec<usize> = Vec::new();
167 for (index, listener) in listeners.iter_mut().enumerate() {
168 let bytes: Vec<u8> = bincode::serialize(&value)?;
169
170 let callback = Arc::clone(&listener.callback);
171
172 match listener.limit {
173 None => {
174 futures.push(callback(bytes));
175 }
176 Some(limit) => {
177 if limit != 0 {
178 futures.push(callback(bytes));
179
180 listener.limit = Some(limit - 1);
181 } else {
182 listeners_to_remove.push(index);
183 }
184 }
185 }
186 }
187
188 // Reverse here so we don't mess up the ordering of the vector
189 for index in listeners_to_remove.into_iter().rev() {
190 listeners.remove(index);
191 }
192 }
193
194 while futures.next().await.is_some() {}
195 Ok(())
196 }
197
198 /// Removes an event listener with the given id
199 ///
200 /// # Example
201 ///
202 /// ```
203 /// use async_event_emitter::AsyncEventEmitter;
204 /// let event_emitter = AsyncEventEmitter::new();
205 /// let listener_id =
206 /// event_emitter.on("Some event", |value: ()| async { println!("Hello world!") });
207 /// println!("{:?}", event_emitter.listeners);
208 ///
209 /// // Removes the listener that we just added
210 /// event_emitter.remove_listener(&listener_id);
211 /// ```
212 pub fn remove_listener(&self, id_to_delete: &str) -> Option<String> {
213 for mut mut_ref in self.listeners.iter_mut() {
214 let event_listeners = mut_ref.value_mut();
215 if let Some(index) = event_listeners
216 .iter()
217 .position(|listener| listener.id == id_to_delete)
218 {
219 event_listeners.remove(index);
220 return Some(id_to_delete.to_string());
221 }
222 }
223
224 None
225 }
226
227 /// Adds an event listener that will only execute the listener x amount of times - Then the listener will be deleted.
228 /// Returns the id of the newly added listener.
229 ///
230 /// # Example
231 ///
232 /// ```
233 /// use async_event_emitter::AsyncEventEmitter;
234 /// #[tokio::main]
235 /// async fn main() {
236 /// let event_emitter = AsyncEventEmitter::new();
237 /// // Listener will be executed 3 times. After the third time, the listener will be deleted.
238 /// event_emitter.on_limited("Some event", Some(3), |value: ()| async{ println!("Hello world!")});
239 /// event_emitter.emit("Some event", ()).await; // 1 >> "Hello world!"
240 /// event_emitter.emit("Some event", ()).await; // 2 >> "Hello world!"
241 /// event_emitter.emit("Some event", ()).await; // 3 >> "Hello world!"
242 /// event_emitter.emit("Some event", ()).await; // 4 >> <Nothing happens here because listener was deleted after the 3rd call>
243 /// }
244 /// ```
245 pub fn on_limited<F, T, C>(&self, event: &str, limit: Option<u64>, callback: C) -> String
246 where
247 for<'de> T: Deserialize<'de>,
248 C: Fn(T) -> F + Send + Sync + 'static,
249 F: Future<Output = ()> + Send + Sync + 'static,
250 {
251 let id = Uuid::new_v4().to_string();
252 let parsed_callback = move |bytes: Vec<u8>| {
253 let value: T = bincode::deserialize(&bytes).unwrap_or_else(|_| {
254 panic!(
255 " value can't be deserialized into type {}",
256 std::any::type_name::<T>()
257 )
258 });
259
260 callback(value).boxed()
261 };
262
263 let listener = AsyncListener {
264 id: id.clone(),
265 limit,
266 callback: Arc::new(parsed_callback),
267 };
268
269 match self.listeners.get_mut(event) {
270 Some(ref mut callbacks) => {
271 callbacks.push(listener);
272 }
273 None => {
274 self.listeners.insert(event.to_string(), vec![listener]);
275 }
276 }
277
278 id
279 }
280
281 /// Adds an event listener that will only execute the callback once - Then the listener will be deleted.
282 /// Returns the id of the newly added listener.
283 ///
284 /// # Example
285 ///
286 /// ```rust
287 /// use async_event_emitter::AsyncEventEmitter;
288 /// let event_emitter = AsyncEventEmitter::new();
289 ///
290 /// event_emitter.once("Some event", |value: ()| async {println!("Hello world!")});
291 /// event_emitter.emit("Some event", ()); // First event is emitted and the listener's callback is called once
292 /// // >> "Hello world!"
293 ///
294 /// event_emitter.emit("Some event", ());
295 /// // >> <Nothing happens here since listener was deleted>
296 /// ```
297 pub fn once<F, T, C>(&self, event: &str, callback: C) -> String
298 where
299 for<'de> T: Deserialize<'de>,
300 C: Fn(T) -> F + Send + Sync + 'static,
301 F: Future<Output = ()> + Send + Sync + 'static,
302 {
303 self.on_limited(event, Some(1), callback)
304 }
305
306 /// Adds an event listener with a callback that will get called whenever the given event is emitted.
307 /// Returns the id of the newly added listener.
308 ///
309 /// # Example
310 ///
311 /// ```rust
312 /// use async_event_emitter::AsyncEventEmitter;
313 /// let event_emitter = AsyncEventEmitter::new();
314 ///
315 /// // This will print <"Hello world!"> whenever the <"Some event"> event is emitted
316 /// // The type of the `value` parameter for the closure MUST be specified and, if you plan to use the `value`, the `value` type
317 /// // MUST also match the type that is being emitted (here we just use a throwaway `()` type since we don't care about using the `value`)
318 /// event_emitter.on("Some event", |value: ()| async { println!("Hello world!")});
319 /// ```
320 pub fn on<F, T, C>(&self, event: &str, callback: C) -> String
321 where
322 for<'de> T: Deserialize<'de>,
323 C: Fn(T) -> F + Send + Sync + 'static,
324 F: Future<Output = ()> + Send + Sync + 'static,
325 {
326 self.on_limited(event, None, callback)
327 }
328}
329
330// test the AsyncEventEmitter
331// implement fmt::Debug for AsyncEventListener
332use std::fmt;
333impl fmt::Debug for AsyncListener {
334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335 f.debug_struct("AsyncListener")
336 .field("id", &self.id)
337 .field("limit", &self.limit)
338 .finish()
339 }
340}
341
342// implement fmt::Debug for AsyncEventEmitter
343impl fmt::Debug for AsyncEventEmitter {
344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345 f.debug_struct("AsyncEventEmitter")
346 .field("listeners", &self.listeners)
347 .finish()
348 }
349}