1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex, Weak},
    task::{Context, Poll, Waker},
};

/// State shared between [`Observer`] and [`AsyncEvents`]
struct Shared<T> {
    result: Option<T>,
    /// A waker is used to tell the task execute that a futures task may have proceeded and it is
    /// sensible to poll them again. This one offers methods to make sure only Futures for the
    /// events those status may have changed get woken.
    waker: Option<Waker>,
}

/// A Future which is completed, once its associated event is resolved. See
/// [`AsyncEvents::output_of`].
pub struct Observer<T> {
    shared: Arc<Mutex<Shared<T>>>,
}

impl<T> Future for Observer<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared = self.shared.lock().unwrap();
        match &shared.result {
            None => {
                if let Some(ref mut waker) = &mut shared.waker {
                    // If a waker has been previously set, let's reuse the resources from the old
                    // one, rather than allocating a new one.
                    waker.clone_from(cx.waker())
                } else {
                    shared.waker = Some(cx.waker().clone());
                }
                Poll::Pending
            }
            Some(_) => Poll::Ready(shared.result.take().unwrap()),
        }
    }
}

/// Allows to create futures which will not complete until an associated event id is resolved. This
/// is useful for creating futures waiting for completion on external events which are driven to
/// completion outside of the current process.
///
/// ```
/// use std::time::Duration;
/// use async_events::AsyncEvents;
///
/// async fn foo(events: &AsyncEvents<u32, &'static str>) {
///     // This is a future waiting for an event with key `1` to resover its output.
///     let observer = events.output_of(1);
///     // We can have multiple observers for the same event, if we want to.
///     let another_observer = events.output_of(1);
///
///     // This will block until the event is resolved
///     let result = observer.await;
///     // Do something awesome with result
///     println!("{result}");
/// }
///
/// async fn bar(events: &AsyncEvents<u32, &'static str>) {
///     // All observers waiting for `1` wake up and their threads may continue. You could resolve
///     // multiple events at once with the same result. This wakes up every observer associated
///     // with the event.
///     events.resolve_all_with(&[1], "Hello, World");
/// }
/// ```
pub struct AsyncEvents<K, T> {
    wakers: Mutex<Vec<Promise<K, T>>>,
}

impl<K, T> AsyncEvents<K, T> {
    pub fn new() -> Self {
        Self {
            wakers: Mutex::new(Vec::new()),
        }
    }
}

impl<K, T> Default for AsyncEvents<K, T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<K, V> AsyncEvents<K, V>
where
    K: Eq,
{
    /// A future associated with a peer, which can be resolved using `resolve_with`. You can call
    /// this method repeatedly to create multiple observers waiting for the same event.
    #[deprecated(note = "Please use output_of instead")]
    pub fn wait_for_output(&self, event_id: K) -> Observer<V> {
        let strong = Arc::new(Mutex::new(Shared {
            result: None,
            waker: None,
        }));
        let weak = Arc::downgrade(&strong);
        {
            let mut wakers = self.wakers.lock().unwrap();
            wakers.retain(|promise| !promise.is_orphan());
            wakers.push(Promise {
                key: event_id,
                shared: weak,
            });
        }
        Observer { shared: strong }
    }

    /// A future associated with a peer, which can be resolved using `resolve_with`. You can call
    /// this method repeatedly to create multiple observers waiting for the same event.
    ///
    /// Events are created **implicitly** by creating futures waiting for them. They are removed
    /// then it is resolved. Waiting on an already resolved event will hang forever.
    ///
    /// ```
    /// use async_events::AsyncEvents;
    ///
    /// # async fn example() {
    /// let events = AsyncEvents::<u32, u32>::new();
    ///
    /// // Event occurs before we created the observer
    /// events.resolve_all_with(&[1], 42);
    ///
    /// // Oh no, event `1` has already been resolved. This is likely to wait forever.
    /// let answer = events.output_of(1).await;
    /// # }
    /// ```
    pub fn output_of(&self, event_id: K) -> Observer<V> {
        let strong = Arc::new(Mutex::new(Shared {
            result: None,
            waker: None,
        }));
        let weak = Arc::downgrade(&strong);
        {
            let mut wakers = self.wakers.lock().unwrap();
            wakers.retain(|promise| !promise.is_orphan());
            wakers.push(Promise {
                key: event_id,
                shared: weak,
            });
        }
        Observer { shared: strong }
    }

    /// Resolves all the pending [`Observer`]s associated with the given ids.
    ///
    /// * `event_ids`: Observers associated with these ids are resolved. It would be typical to call
    ///   this method with only one element in `event_ids`. However in some error code paths it is
    ///   not unusual that you can rule provide a result (usually `Err`) for many events at once.
    /// * `output`: The result these [`Observer`]s will return in their `.await` call
    pub fn resolve_all_with(&self, event_ids: &[K], output: V)
    where
        V: Clone,
    {
        self.resolve_all_if(|event_id| {
            if event_ids.contains(event_id) {
                Some(output.clone())
            } else {
                None
            }
        })
    }

    /// Resolves all the pending [`Observer`]s. This resolves all events independent of their id.
    /// This might come in useful e.g. during application shutdown.
    ///
    /// * `output`: The result these [`Observer`]s will return in their `.await` call
    /// * `f`: Function acting as a filter for event ids which are to be resolved, and as a factory
    ///   for their results. If `f` returns `None` observers associated with the event id are not
    ///   resolved. If `Some` all observers with this Id are resolved.
    pub fn resolve_all_if(&self, f: impl Fn(&K) -> Option<V>)
    where
        V: Clone,
    {
        let mut wakers = self.wakers.lock().unwrap();
        for promise in wakers.iter_mut() {
            if let Some(output) = f(&promise.key) {
                promise.resolve(output)
            }
        }
    }

    /// Resolves one pending [`Observer`] associated with the given event id. If no observer with
    /// such an id exists, nothing happens.
    ///
    /// * `event_id`: One [`Observer`] associated with this ids is resolved.
    /// * `output`: The result the [`Observer`] will return in its `.await` call
    pub fn resolve_one(&self, event_id: K, output: V) {
        let mut wakers = self.wakers.lock().unwrap();
        if let Some(promise) = wakers.iter_mut().find(|p| p.key == event_id) {
            promise.resolve(output);
        }
    }
}

/// For every [`Observer`] future, we create an associated promise, which we can use to send the
/// result and notify the async runtime that it should poll the future again.
struct Promise<K, T> {
    /// Identifiere of the event the associated future is waiting on.
    key: K,
    /// Weak reference to the shared result state.
    shared: Weak<Mutex<Shared<T>>>,
}

impl<K, T> Promise<K, T> {
    /// Set result and notify the runtime to poll the observing Future
    fn resolve(&mut self, result: T) {
        if let Some(strong) = self.shared.upgrade() {
            let mut shared = strong.lock().unwrap();
            shared.result = Some(result);
            if let Some(waker) = shared.waker.take() {
                waker.wake()
            }
        }
    }

    /// No Observer is watining anymore for this promise to be resolved.
    fn is_orphan(&self) -> bool {
        self.shared.strong_count() == 0
    }
}

#[cfg(test)]
mod tests {

    use std::time::Duration;

    use super::AsyncEvents;
    use tokio::{self, time::timeout};

    const ZERO: Duration = Duration::from_secs(0);

    #[tokio::test]
    async fn pending() {
        let pm: AsyncEvents<i32, ()> = AsyncEvents::new();
        let future = pm.output_of(1);
        // Promise not yet fulfilled => Elapses due to timeout.
        timeout(ZERO, future).await.unwrap_err();
    }

    #[tokio::test]
    async fn resolved() {
        let pm = AsyncEvents::new();
        let future = pm.output_of(1);
        pm.resolve_all_with(&[1], 42);
        // Promise fulfilled => Return result
        assert_eq!(42, timeout(ZERO, future).await.unwrap());
    }

    #[tokio::test]
    async fn multiple_observers_resolve_all() {
        let pm = AsyncEvents::new();
        let obs_1 = pm.output_of(1);
        let obs_2 = pm.output_of(1);
        pm.resolve_all_with(&[1], 42);
        assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
        assert_eq!(42, timeout(ZERO, obs_2).await.unwrap());
    }

    #[tokio::test]
    async fn multiple_observers_resolve_one() {
        let pm = AsyncEvents::new();
        let obs_1 = pm.output_of(1);
        let obs_2 = pm.output_of(1);
        pm.resolve_one(1, 42);
        assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
        // Second observer times out
        assert!(timeout(ZERO, obs_2).await.is_err());
    }

    /// This has been proven usefull if shutting down an application, and wanting to stop waiting
    /// on all pending futures.
    #[tokio::test]
    async fn resolve_all_observers_with_the_same_output() {
        let pm = AsyncEvents::new();
        let obs_1 = pm.output_of(1);
        let obs_2 = pm.output_of(2);

        // We ignore event ID, we want to give the same result to all observers.
        pm.resolve_all_if(|_event_id| Some(42));

        assert_eq!(42, timeout(ZERO, obs_1).await.unwrap());
        assert_eq!(42, timeout(ZERO, obs_2).await.unwrap());
    }
}