Skip to main content

async_notify/
lib.rs

1//! A general version async Notify, like `tokio` Notify but can work with any async runtime.
2
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::task::{Context, Poll, ready};
9
10use event_listener::{Event, EventListener, listener};
11use futures_core::Stream;
12use pin_project_lite::pin_project;
13
14/// Notify a single task to wake up.
15///
16/// `Notify` provides a basic mechanism to notify a single task of an event.
17/// `Notify` itself does not carry any data. Instead, it is to be used to signal
18/// another task to perform an operation.
19///
20/// If [`notify()`] is called **before** [`notified().await`], then the next call to
21/// [`notified().await`] will complete immediately, consuming one permit. Permits
22/// accumulate: each call to [`notify()`] or [`notify_n()`] adds permits that can
23/// be consumed by subsequent [`notified().await`] calls.
24///
25/// [`notify()`]: Notify::notify
26/// [`notified().await`]: Notify::notified()
27///
28/// # Examples
29///
30/// Basic usage.
31///
32/// ```
33/// use std::sync::Arc;
34/// use async_notify::Notify;
35///
36/// async_global_executor::block_on(async {
37///    let notify = Arc::new(Notify::new());
38///    let notify2 = notify.clone();
39///
40///    async_global_executor::spawn(async move {
41///        notify2.notify();
42///        println!("sent notification");
43///    })
44///    .detach();
45///
46///    println!("received notification");
47///    notify.notified().await;
48/// })
49/// ```
50#[derive(Debug, Default)]
51pub struct Notify {
52    count: AtomicUsize,
53    event: Event,
54}
55
56/// Like tokio Notify, this is a runtime independent Notify.
57impl Notify {
58    /// Create a [`Notify`]
59    pub const fn new() -> Self {
60        Self {
61            count: AtomicUsize::new(0),
62            event: Event::new(),
63        }
64    }
65
66    /// Notifies a waiting task
67    ///
68    /// Adds one permit to this `Notify`. If a task is currently waiting on
69    /// [`notified().await`], that task will be woken and complete. Otherwise,
70    /// the permit is stored and the next call to [`notified().await`] will
71    /// complete immediately. Permits accumulate across multiple `notify()` calls.
72    ///
73    /// [`notified().await`]: Notify::notified()
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use std::sync::Arc;
79    /// use async_notify::Notify;
80    ///
81    /// async_global_executor::block_on(async {
82    ///    let notify = Arc::new(Notify::new());
83    ///    let notify2 = notify.clone();
84    ///
85    ///    async_global_executor::spawn(async move {
86    ///        notify2.notify();
87    ///        println!("sent notification");
88    ///    })
89    ///    .detach();
90    ///
91    ///    println!("received notification");
92    ///    notify.notified().await;
93    /// })
94    /// ```
95    #[inline]
96    pub fn notify(&self) {
97        self.notify_n(NonZeroUsize::new(1).unwrap())
98    }
99
100    /// Grants `n` permits and notifies up to `n` waiting tasks.
101    ///
102    /// Adds `n` permits to this `Notify`. If there are tasks currently waiting
103    /// on [`notified().await`], up to `n` of them will be woken and complete,
104    /// each consuming one permit. If no tasks are waiting, the permits are
105    /// stored and the next up to `n` calls to [`notified().await`] will complete
106    /// immediately.
107    ///
108    /// This is a generalization of [`notify()`] which is equivalent to
109    /// `notify_n(NonZeroUsize::MIN)`.
110    ///
111    /// [`notified().await`]: Notify::notified()
112    /// [`notify()`]: Notify::notify
113    #[inline]
114    pub fn notify_n(&self, n: NonZeroUsize) {
115        let n = n.get();
116        self.count.fetch_add(n, Ordering::Release);
117        self.event.notify(n);
118    }
119
120    /// Wakes up to `n` waiting tasks to compete for existing permits.
121    ///
122    /// Unlike [`notify_n()`], this does **not** add any permits. It only wakes
123    /// up to `n` tasks that are waiting on [`notified().await`]. Those tasks
124    /// will then compete for whatever permits are currently available. At most
125    /// one task can consume each available permit; the rest will wait for the
126    /// next notification.
127    ///
128    /// Use this when you want to wake multiple waiters to race for a single
129    /// resource (e.g. thundering herd mitigation).
130    ///
131    /// [`notified().await`]: Notify::notified()
132    /// [`notify_n()`]: Notify::notify_n
133    #[inline]
134    pub fn notify_waiters(&self, n: NonZeroUsize) {
135        self.event.notify(n.get());
136    }
137
138    /// Wait for a notification.
139    ///
140    /// Each `Notify` value holds a number of permits. If a permit is available
141    /// from an earlier call to [`notify()`] or [`notify_n()`], then
142    /// `notified().await` will complete immediately, consuming one permit.
143    /// Otherwise, `notified().await` waits for a permit to be made available.
144    ///
145    /// This method is cancel safety.
146    ///
147    /// [`notify()`]: Notify::notify
148    /// [`notify_n()`]: Notify::notify_n
149    #[inline]
150    pub async fn notified(&self) {
151        loop {
152            if self.fast_path() {
153                return;
154            }
155
156            listener!(self.event => listener);
157
158            if self.fast_path() {
159                return;
160            }
161
162            listener.await;
163        }
164    }
165
166    fn fast_path(&self) -> bool {
167        self.count
168            .try_update(Ordering::AcqRel, Ordering::Acquire, |c| c.checked_sub(1))
169            .is_ok()
170    }
171}
172
173pin_project! {
174    /// A [`Stream`](Stream) [`Notify`] wrapper
175    pub struct NotifyStream<T: Deref<Target=Notify>> {
176        #[pin]
177        notify: T,
178        listener: Option<EventListener>,
179    }
180}
181
182impl<T: Deref<Target = Notify>> NotifyStream<T> {
183    /// Create [`NotifyStream`] from `T`
184    pub const fn new(notify: T) -> Self {
185        Self {
186            notify,
187            listener: None,
188        }
189    }
190
191    /// acquire the inner [`T`]
192    pub fn into_inner(self) -> T {
193        self.notify
194    }
195}
196
197impl<T: Deref<Target = Notify>> AsRef<Notify> for NotifyStream<T> {
198    fn as_ref(&self) -> &Notify {
199        self.notify.deref()
200    }
201}
202
203impl<T: Deref<Target = Notify>> Stream for NotifyStream<T> {
204    type Item = ();
205
206    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207        let this = self.project();
208        let notify = this.notify.deref();
209
210        loop {
211            if notify.fast_path() {
212                *this.listener = None;
213
214                return Poll::Ready(Some(()));
215            }
216
217            match this.listener.as_mut() {
218                None => {
219                    let listener = notify.event.listen();
220                    *this.listener = Some(listener);
221                }
222                Some(listener) => {
223                    ready!(Pin::new(listener).poll(cx));
224                }
225            }
226        }
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use std::sync::Arc;
233
234    use futures_util::{FutureExt, StreamExt, select};
235
236    use super::*;
237
238    #[test]
239    fn test() {
240        async_global_executor::block_on(async {
241            let notify = Arc::new(Notify::new());
242            let notify2 = notify.clone();
243
244            async_global_executor::spawn(async move {
245                notify2.notify();
246                println!("sent notification");
247            })
248            .detach();
249
250            println!("received notification");
251            notify.notified().await;
252        })
253    }
254
255    #[test]
256    fn test_multi_notify() {
257        async_global_executor::block_on(async {
258            let notify = Arc::new(Notify::new());
259            let notify2 = notify.clone();
260
261            // 2 permits
262            notify.notify();
263            notify.notify();
264
265            // First completes (1 permit consumed)
266            select! {
267                _ = notify2.notified().fuse() => {}
268                default => unreachable!("there should be notified")
269            }
270
271            // Second completes (1 permit consumed)
272            select! {
273                _ = notify2.notified().fuse() => {}
274                default => unreachable!("there should be notified")
275            }
276
277            // No permits left, third would block
278            select! {
279                _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
280                default => {}
281            }
282
283            notify.notify();
284
285            // Third completes
286            select! {
287                _ = notify2.notified().fuse() => {}
288                default => unreachable!("there should be notified")
289            }
290        })
291    }
292
293    #[test]
294    fn test_notify_n() {
295        async_global_executor::block_on(async {
296            let notify = Arc::new(Notify::new());
297            let notify2 = notify.clone();
298
299            notify.notify_n(3.try_into().unwrap());
300
301            for _ in 0..3 {
302                select! {
303                    _ = notify2.notified().fuse() => {}
304                    default => unreachable!("there should be notified")
305                }
306            }
307
308            select! {
309                _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
310                default => {}
311            }
312        })
313    }
314
315    #[test]
316    fn test_notify_waiters() {
317        async_global_executor::block_on(async {
318            let notify = Arc::new(Notify::new());
319            let notify2 = notify.clone();
320            let notify3 = notify.clone();
321
322            let t1 = async_global_executor::spawn(async move {
323                notify2.notified().await;
324            });
325            let t2 = async_global_executor::spawn(async move {
326                notify3.notified().await;
327            });
328
329            // Give tasks time to start waiting
330            async_global_executor::spawn(async {}).await;
331
332            // 1 permit, wake 2 waiters - only 1 can complete
333            notify.notify();
334            notify.notify_waiters(NonZeroUsize::new(2).unwrap());
335
336            // One completes. Add permit for the other.
337            notify.notify();
338
339            t1.await;
340            t2.await;
341        })
342    }
343
344    #[test]
345    fn stream() {
346        async_global_executor::block_on(async {
347            let notify = Arc::new(Notify::new());
348            let mut notify_stream = NotifyStream::new(notify.clone());
349
350            async_global_executor::spawn(async move {
351                notify.notify();
352                println!("sent notification");
353            })
354            .detach();
355
356            notify_stream.next().await.unwrap();
357        })
358    }
359}