async_events/
lib.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    sync::{Arc, Mutex, Weak},
5    task::{Context, Poll, Waker},
6};
7
8/// State shared between [`Observer`] and [`AsyncEvents`]
9struct Shared<T> {
10    result: Option<T>,
11    /// A waker is used to tell the task execute that a futures task may have proceeded and it is
12    /// sensible to poll them again. This one offers methods to make sure only Futures for the
13    /// events those status may have changed get woken.
14    waker: Option<Waker>,
15}
16
17/// A Future which is completed, once its associated event is resolved. See
18/// [`AsyncEvents::output_of`].
19pub struct Observer<T> {
20    shared: Arc<Mutex<Shared<T>>>,
21}
22
23impl<T> Future for Observer<T> {
24    type Output = T;
25
26    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27        let mut shared = self.shared.lock().unwrap();
28        match &shared.result {
29            None => {
30                if let Some(ref mut waker) = &mut shared.waker {
31                    // If a waker has been previously set, let's reuse the resources from the old
32                    // one, rather than allocating a new one.
33                    waker.clone_from(cx.waker())
34                } else {
35                    shared.waker = Some(cx.waker().clone());
36                }
37                Poll::Pending
38            }
39            Some(_) => Poll::Ready(shared.result.take().unwrap()),
40        }
41    }
42}
43
44/// Allows to create futures which will not complete until an associated event id is resolved. This
45/// is useful for creating futures waiting for completion on external events which are driven to
46/// completion outside of the current process.
47///
48/// ```
49/// use std::time::Duration;
50/// use async_events::AsyncEvents;
51///
52/// async fn foo(events: &AsyncEvents<u32, &'static str>) {
53///     // This is a future waiting for an event with key `1` to resover its output.
54///     let observer = events.output_of(1);
55///     // We can have multiple observers for the same event, if we want to.
56///     let another_observer = events.output_of(1);
57///
58///     // This will block until the event is resolved
59///     let result = observer.await;
60///     // Do something awesome with result
61///     println!("{result}");
62/// }
63///
64/// async fn bar(events: &AsyncEvents<u32, &'static str>) {
65///     // All observers waiting for `1` wake up and their threads may continue. You could resolve
66///     // multiple events at once with the same result. This wakes up every observer associated
67///     // with the event.
68///     events.resolve_all_with(&[1], "Hello, World");
69/// }
70/// ```
71pub struct AsyncEvents<K, T> {
72    wakers: Mutex<Vec<Promise<K, T>>>,
73}
74
75impl<K, T> AsyncEvents<K, T> {
76    pub fn new() -> Self {
77        Self {
78            wakers: Mutex::new(Vec::new()),
79        }
80    }
81}
82
83impl<K, T> Default for AsyncEvents<K, T> {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89impl<K, V> AsyncEvents<K, V>
90where
91    K: Eq,
92{
93    /// A future associated with a peer, which can be resolved using `resolve_with`. You can call
94    /// this method repeatedly to create multiple observers waiting for the same event.
95    #[deprecated(note = "Please use output_of instead")]
96    pub fn wait_for_output(&self, event_id: K) -> Observer<V> {
97        let strong = Arc::new(Mutex::new(Shared {
98            result: None,
99            waker: None,
100        }));
101        let weak = Arc::downgrade(&strong);
102        {
103            let mut wakers = self.wakers.lock().unwrap();
104            wakers.retain(|promise| !promise.is_orphan());
105            wakers.push(Promise {
106                key: event_id,
107                shared: weak,
108            });
109        }
110        Observer { shared: strong }
111    }
112
113    /// A future associated with a peer, which can be resolved using `resolve_with`. You can call
114    /// this method repeatedly to create multiple observers waiting for the same event.
115    ///
116    /// Events are created **implicitly** by creating futures waiting for them. They are removed
117    /// then it is resolved. Waiting on an already resolved event will hang forever.
118    ///
119    /// ```
120    /// use async_events::AsyncEvents;
121    ///
122    /// # async fn example() {
123    /// let events = AsyncEvents::<u32, u32>::new();
124    ///
125    /// // Event occurs before we created the observer
126    /// events.resolve_all_with(&[1], 42);
127    ///
128    /// // Oh no, event `1` has already been resolved. This is likely to wait forever.
129    /// let answer = events.output_of(1).await;
130    /// # }
131    /// ```
132    pub fn output_of(&self, event_id: K) -> Observer<V> {
133        let strong = Arc::new(Mutex::new(Shared {
134            result: None,
135            waker: None,
136        }));
137        let weak = Arc::downgrade(&strong);
138        {
139            let mut wakers = self.wakers.lock().unwrap();
140            wakers.retain(|promise| !promise.is_orphan());
141            wakers.push(Promise {
142                key: event_id,
143                shared: weak,
144            });
145        }
146        Observer { shared: strong }
147    }
148
149    /// Resolves all the pending [`Observer`]s associated with the given ids.
150    ///
151    /// * `event_ids`: Observers associated with these ids are resolved. It would be typical to call
152    ///   this method with only one element in `event_ids`. However in some error code paths it is
153    ///   not unusual that you can rule provide a result (usually `Err`) for many events at once.
154    /// * `output`: The result these [`Observer`]s will return in their `.await` call
155    pub fn resolve_all_with(&self, event_ids: &[K], output: V)
156    where
157        V: Clone,
158    {
159        self.resolve_all_if(|event_id| {
160            if event_ids.contains(event_id) {
161                Some(output.clone())
162            } else {
163                None
164            }
165        })
166    }
167
168    /// Resolves all the pending [`Observer`]s. This resolves all events independent of their id.
169    /// This might come in useful e.g. during application shutdown.
170    ///
171    /// * `output`: The result these [`Observer`]s will return in their `.await` call
172    /// * `f`: Function acting as a filter for event ids which are to be resolved, and as a factory
173    ///   for their results. If `f` returns `None` observers associated with the event id are not
174    ///   resolved. If `Some` all observers with this Id are resolved.
175    pub fn resolve_all_if(&self, f: impl Fn(&K) -> Option<V>)
176    where
177        V: Clone,
178    {
179        let mut wakers = self.wakers.lock().unwrap();
180        for promise in wakers.iter_mut() {
181            if let Some(output) = f(&promise.key) {
182                promise.resolve(output)
183            }
184        }
185    }
186
187    /// Resolves one pending [`Observer`] associated with the given event id. If no observer with
188    /// such an id exists, nothing happens.
189    ///
190    /// * `event_id`: One [`Observer`] associated with this ids is resolved.
191    /// * `output`: The result the [`Observer`] will return in its `.await` call
192    pub fn resolve_one(&self, event_id: K, output: V) {
193        let mut wakers = self.wakers.lock().unwrap();
194        if let Some(promise) = wakers.iter_mut().find(|p| p.key == event_id) {
195            promise.resolve(output);
196        }
197    }
198}
199
200/// For every [`Observer`] future, we create an associated promise, which we can use to send the
201/// result and notify the async runtime that it should poll the future again.
202struct Promise<K, T> {
203    /// Identifiere of the event the associated future is waiting on.
204    key: K,
205    /// Weak reference to the shared result state.
206    shared: Weak<Mutex<Shared<T>>>,
207}
208
209impl<K, T> Promise<K, T> {
210    /// Set result and notify the runtime to poll the observing Future
211    fn resolve(&mut self, result: T) {
212        if let Some(strong) = self.shared.upgrade() {
213            let mut shared = strong.lock().unwrap();
214            shared.result = Some(result);
215            if let Some(waker) = shared.waker.take() {
216                waker.wake()
217            }
218        }
219    }
220
221    /// No Observer is watining anymore for this promise to be resolved.
222    fn is_orphan(&self) -> bool {
223        self.shared.strong_count() == 0
224    }
225}
226
227#[cfg(test)]
228mod tests {
229
230    use std::time::Duration;
231
232    use super::AsyncEvents;
233    use tokio::{self, time::timeout};
234
235    const ZERO: Duration = Duration::from_secs(0);
236
237    #[tokio::test]
238    async fn pending() {
239        let pm: AsyncEvents<i32, ()> = AsyncEvents::new();
240        let future = pm.output_of(1);
241        // Promise not yet fulfilled => Elapses due to timeout.
242        timeout(ZERO, future).await.unwrap_err();
243    }
244
245    #[tokio::test]
246    async fn resolved() {
247        let pm = AsyncEvents::new();
248        let future = pm.output_of(1);
249        pm.resolve_all_with(&[1], 42);
250        // Promise fulfilled => Return result
251        assert_eq!(42, timeout(ZERO, future).await.unwrap());
252    }
253
254    #[tokio::test]
255    async fn multiple_observers_resolve_all() {
256        let pm = AsyncEvents::new();
257        let obs_1 = pm.output_of(1);
258        let obs_2 = pm.output_of(1);
259        pm.resolve_all_with(&[1], 42);
260        assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
261        assert_eq!(42, timeout(ZERO, obs_2).await.unwrap());
262    }
263
264    #[tokio::test]
265    async fn multiple_observers_resolve_one() {
266        let pm = AsyncEvents::new();
267        let obs_1 = pm.output_of(1);
268        let obs_2 = pm.output_of(1);
269        pm.resolve_one(1, 42);
270        assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
271        // Second observer times out
272        assert!(timeout(ZERO, obs_2).await.is_err());
273    }
274
275    /// This has been proven usefull if shutting down an application, and wanting to stop waiting
276    /// on all pending futures.
277    #[tokio::test]
278    async fn resolve_all_observers_with_the_same_output() {
279        let pm = AsyncEvents::new();
280        let obs_1 = pm.output_of(1);
281        let obs_2 = pm.output_of(2);
282
283        // We ignore event ID, we want to give the same result to all observers.
284        pm.resolve_all_if(|_event_id| Some(42));
285
286        assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
287        assert_eq!(42, timeout(ZERO, obs_2).await.unwrap());
288    }
289}