async_events/lib.rs
1use std::{
2 future::Future,
3 pin::Pin,
4 sync::{Arc, Mutex, Weak},
5 task::{Context, Poll, Waker},
6};
7
8/// State shared between [`Observer`] and [`AsyncEvents`]
9struct Shared<T> {
10 result: Option<T>,
11 /// A waker is used to tell the task execute that a futures task may have proceeded and it is
12 /// sensible to poll them again. This one offers methods to make sure only Futures for the
13 /// events those status may have changed get woken.
14 waker: Option<Waker>,
15}
16
17/// A Future which is completed, once its associated event is resolved. See
18/// [`AsyncEvents::output_of`].
19pub struct Observer<T> {
20 shared: Arc<Mutex<Shared<T>>>,
21}
22
23impl<T> Future for Observer<T> {
24 type Output = T;
25
26 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27 let mut shared = self.shared.lock().unwrap();
28 match &shared.result {
29 None => {
30 if let Some(ref mut waker) = &mut shared.waker {
31 // If a waker has been previously set, let's reuse the resources from the old
32 // one, rather than allocating a new one.
33 waker.clone_from(cx.waker())
34 } else {
35 shared.waker = Some(cx.waker().clone());
36 }
37 Poll::Pending
38 }
39 Some(_) => Poll::Ready(shared.result.take().unwrap()),
40 }
41 }
42}
43
44/// Allows to create futures which will not complete until an associated event id is resolved. This
45/// is useful for creating futures waiting for completion on external events which are driven to
46/// completion outside of the current process.
47///
48/// ```
49/// use std::time::Duration;
50/// use async_events::AsyncEvents;
51///
52/// async fn foo(events: &AsyncEvents<u32, &'static str>) {
53/// // This is a future waiting for an event with key `1` to resover its output.
54/// let observer = events.output_of(1);
55/// // We can have multiple observers for the same event, if we want to.
56/// let another_observer = events.output_of(1);
57///
58/// // This will block until the event is resolved
59/// let result = observer.await;
60/// // Do something awesome with result
61/// println!("{result}");
62/// }
63///
64/// async fn bar(events: &AsyncEvents<u32, &'static str>) {
65/// // All observers waiting for `1` wake up and their threads may continue. You could resolve
66/// // multiple events at once with the same result. This wakes up every observer associated
67/// // with the event.
68/// events.resolve_all_with(&[1], "Hello, World");
69/// }
70/// ```
71pub struct AsyncEvents<K, T> {
72 wakers: Mutex<Vec<Promise<K, T>>>,
73}
74
75impl<K, T> AsyncEvents<K, T> {
76 pub fn new() -> Self {
77 Self {
78 wakers: Mutex::new(Vec::new()),
79 }
80 }
81}
82
83impl<K, T> Default for AsyncEvents<K, T> {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89impl<K, V> AsyncEvents<K, V>
90where
91 K: Eq,
92{
93 /// A future associated with a peer, which can be resolved using `resolve_with`. You can call
94 /// this method repeatedly to create multiple observers waiting for the same event.
95 #[deprecated(note = "Please use output_of instead")]
96 pub fn wait_for_output(&self, event_id: K) -> Observer<V> {
97 let strong = Arc::new(Mutex::new(Shared {
98 result: None,
99 waker: None,
100 }));
101 let weak = Arc::downgrade(&strong);
102 {
103 let mut wakers = self.wakers.lock().unwrap();
104 wakers.retain(|promise| !promise.is_orphan());
105 wakers.push(Promise {
106 key: event_id,
107 shared: weak,
108 });
109 }
110 Observer { shared: strong }
111 }
112
113 /// A future associated with a peer, which can be resolved using `resolve_with`. You can call
114 /// this method repeatedly to create multiple observers waiting for the same event.
115 ///
116 /// Events are created **implicitly** by creating futures waiting for them. They are removed
117 /// then it is resolved. Waiting on an already resolved event will hang forever.
118 ///
119 /// ```
120 /// use async_events::AsyncEvents;
121 ///
122 /// # async fn example() {
123 /// let events = AsyncEvents::<u32, u32>::new();
124 ///
125 /// // Event occurs before we created the observer
126 /// events.resolve_all_with(&[1], 42);
127 ///
128 /// // Oh no, event `1` has already been resolved. This is likely to wait forever.
129 /// let answer = events.output_of(1).await;
130 /// # }
131 /// ```
132 pub fn output_of(&self, event_id: K) -> Observer<V> {
133 let strong = Arc::new(Mutex::new(Shared {
134 result: None,
135 waker: None,
136 }));
137 let weak = Arc::downgrade(&strong);
138 {
139 let mut wakers = self.wakers.lock().unwrap();
140 wakers.retain(|promise| !promise.is_orphan());
141 wakers.push(Promise {
142 key: event_id,
143 shared: weak,
144 });
145 }
146 Observer { shared: strong }
147 }
148
149 /// Resolves all the pending [`Observer`]s associated with the given ids.
150 ///
151 /// * `event_ids`: Observers associated with these ids are resolved. It would be typical to call
152 /// this method with only one element in `event_ids`. However in some error code paths it is
153 /// not unusual that you can rule provide a result (usually `Err`) for many events at once.
154 /// * `output`: The result these [`Observer`]s will return in their `.await` call
155 pub fn resolve_all_with(&self, event_ids: &[K], output: V)
156 where
157 V: Clone,
158 {
159 self.resolve_all_if(|event_id| {
160 if event_ids.contains(event_id) {
161 Some(output.clone())
162 } else {
163 None
164 }
165 })
166 }
167
168 /// Resolves all the pending [`Observer`]s. This resolves all events independent of their id.
169 /// This might come in useful e.g. during application shutdown.
170 ///
171 /// * `output`: The result these [`Observer`]s will return in their `.await` call
172 /// * `f`: Function acting as a filter for event ids which are to be resolved, and as a factory
173 /// for their results. If `f` returns `None` observers associated with the event id are not
174 /// resolved. If `Some` all observers with this Id are resolved.
175 pub fn resolve_all_if(&self, f: impl Fn(&K) -> Option<V>)
176 where
177 V: Clone,
178 {
179 let mut wakers = self.wakers.lock().unwrap();
180 for promise in wakers.iter_mut() {
181 if let Some(output) = f(&promise.key) {
182 promise.resolve(output)
183 }
184 }
185 }
186
187 /// Resolves one pending [`Observer`] associated with the given event id. If no observer with
188 /// such an id exists, nothing happens.
189 ///
190 /// * `event_id`: One [`Observer`] associated with this ids is resolved.
191 /// * `output`: The result the [`Observer`] will return in its `.await` call
192 pub fn resolve_one(&self, event_id: K, output: V) {
193 let mut wakers = self.wakers.lock().unwrap();
194 if let Some(promise) = wakers.iter_mut().find(|p| p.key == event_id) {
195 promise.resolve(output);
196 }
197 }
198}
199
200/// For every [`Observer`] future, we create an associated promise, which we can use to send the
201/// result and notify the async runtime that it should poll the future again.
202struct Promise<K, T> {
203 /// Identifiere of the event the associated future is waiting on.
204 key: K,
205 /// Weak reference to the shared result state.
206 shared: Weak<Mutex<Shared<T>>>,
207}
208
209impl<K, T> Promise<K, T> {
210 /// Set result and notify the runtime to poll the observing Future
211 fn resolve(&mut self, result: T) {
212 if let Some(strong) = self.shared.upgrade() {
213 let mut shared = strong.lock().unwrap();
214 shared.result = Some(result);
215 if let Some(waker) = shared.waker.take() {
216 waker.wake()
217 }
218 }
219 }
220
221 /// No Observer is watining anymore for this promise to be resolved.
222 fn is_orphan(&self) -> bool {
223 self.shared.strong_count() == 0
224 }
225}
226
227#[cfg(test)]
228mod tests {
229
230 use std::time::Duration;
231
232 use super::AsyncEvents;
233 use tokio::{self, time::timeout};
234
235 const ZERO: Duration = Duration::from_secs(0);
236
237 #[tokio::test]
238 async fn pending() {
239 let pm: AsyncEvents<i32, ()> = AsyncEvents::new();
240 let future = pm.output_of(1);
241 // Promise not yet fulfilled => Elapses due to timeout.
242 timeout(ZERO, future).await.unwrap_err();
243 }
244
245 #[tokio::test]
246 async fn resolved() {
247 let pm = AsyncEvents::new();
248 let future = pm.output_of(1);
249 pm.resolve_all_with(&[1], 42);
250 // Promise fulfilled => Return result
251 assert_eq!(42, timeout(ZERO, future).await.unwrap());
252 }
253
254 #[tokio::test]
255 async fn multiple_observers_resolve_all() {
256 let pm = AsyncEvents::new();
257 let obs_1 = pm.output_of(1);
258 let obs_2 = pm.output_of(1);
259 pm.resolve_all_with(&[1], 42);
260 assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
261 assert_eq!(42, timeout(ZERO, obs_2).await.unwrap());
262 }
263
264 #[tokio::test]
265 async fn multiple_observers_resolve_one() {
266 let pm = AsyncEvents::new();
267 let obs_1 = pm.output_of(1);
268 let obs_2 = pm.output_of(1);
269 pm.resolve_one(1, 42);
270 assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
271 // Second observer times out
272 assert!(timeout(ZERO, obs_2).await.is_err());
273 }
274
275 /// This has been proven usefull if shutting down an application, and wanting to stop waiting
276 /// on all pending futures.
277 #[tokio::test]
278 async fn resolve_all_observers_with_the_same_output() {
279 let pm = AsyncEvents::new();
280 let obs_1 = pm.output_of(1);
281 let obs_2 = pm.output_of(2);
282
283 // We ignore event ID, we want to give the same result to all observers.
284 pm.resolve_all_if(|_event_id| Some(42));
285
286 assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
287 assert_eq!(42, timeout(ZERO, obs_2).await.unwrap());
288 }
289}