Skip to main content

async_notify/
lib.rs

1//! A general version async Notify, like `tokio` Notify but can work with any async runtime.
2
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::task::{Context, Poll, ready};
9
10use event_listener::{Event, EventListener, listener};
11use futures_core::Stream;
12use pin_project_lite::pin_project;
13
14/// Notify a single task to wake up.
15///
16/// `Notify` provides a basic mechanism to notify a single task of an event.
17/// `Notify` itself does not carry any data. Instead, it is to be used to signal
18/// another task to perform an operation.
19///
20/// If [`notify()`] is called **before** [`notified().await`], then the next call to
21/// [`notified().await`] will complete immediately, consuming one permit. Permits
22/// accumulate: each call to [`notify()`] or [`notify_n()`] adds permits that can
23/// be consumed by subsequent [`notified().await`] calls.
24///
25/// [`notify()`]: Notify::notify
26/// [`notified().await`]: Notify::notified()
27///
28/// # Examples
29///
30/// Basic usage.
31///
32/// ```
33/// use std::sync::Arc;
34/// use async_notify::Notify;
35///
36/// async_global_executor::block_on(async {
37///    let notify = Arc::new(Notify::new());
38///    let notify2 = notify.clone();
39///
40///    async_global_executor::spawn(async move {
41///        notify2.notify();
42///        println!("sent notification");
43///    })
44///    .detach();
45///
46///    println!("received notification");
47///    notify.notified().await;
48/// })
49/// ```
50#[derive(Debug, Default)]
51pub struct Notify {
52    count: AtomicUsize,
53    event: Event,
54}
55
56/// Like tokio Notify, this is a runtime independent Notify.
57impl Notify {
58    /// Create a [`Notify`]
59    pub const fn new() -> Self {
60        Self {
61            count: AtomicUsize::new(0),
62            event: Event::new(),
63        }
64    }
65
66    /// Notifies a waiting task
67    ///
68    /// Adds one permit to this `Notify`. If a task is currently waiting on
69    /// [`notified().await`], that task will be woken and complete. Otherwise,
70    /// the permit is stored and the next call to [`notified().await`] will
71    /// complete immediately. Permits accumulate across multiple `notify()` calls.
72    ///
73    /// [`notified().await`]: Notify::notified()
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// use std::sync::Arc;
79    /// use async_notify::Notify;
80    ///
81    /// async_global_executor::block_on(async {
82    ///    let notify = Arc::new(Notify::new());
83    ///    let notify2 = notify.clone();
84    ///
85    ///    async_global_executor::spawn(async move {
86    ///        notify2.notify();
87    ///        println!("sent notification");
88    ///    })
89    ///    .detach();
90    ///
91    ///    println!("received notification");
92    ///    notify.notified().await;
93    /// })
94    /// ```
95    #[inline]
96    pub fn notify(&self) {
97        self.notify_n(NonZeroUsize::new(1).unwrap())
98    }
99
100    /// Grants `n` permits and notifies up to `n` waiting tasks.
101    ///
102    /// Adds `n` permits to this `Notify`. If there are tasks currently waiting
103    /// on [`notified().await`], up to `n` of them will be woken and complete,
104    /// each consuming one permit. If no tasks are waiting, the permits are
105    /// stored and the next up to `n` calls to [`notified().await`] will complete
106    /// immediately.
107    ///
108    /// This is a generalization of [`notify()`] which is equivalent to
109    /// `notify_n(NonZeroUsize::MIN)`.
110    ///
111    /// [`notified().await`]: Notify::notified()
112    /// [`notify()`]: Notify::notify
113    #[inline]
114    pub fn notify_n(&self, n: NonZeroUsize) {
115        let n = n.get();
116        self.count.fetch_add(n, Ordering::Release);
117        self.event.notify(n);
118    }
119
120    /// Wakes up to `n` waiting tasks to compete for existing permits.
121    ///
122    /// Unlike [`notify_n()`], this does **not** add any permits. It only wakes
123    /// up to `n` tasks that are waiting on [`notified().await`]. Those tasks
124    /// will then compete for whatever permits are currently available. At most
125    /// one task can consume each available permit; the rest will wait for the
126    /// next notification.
127    ///
128    /// Use this when you want to wake multiple waiters to race for a single
129    /// resource (e.g. thundering herd mitigation).
130    ///
131    /// [`notified().await`]: Notify::notified()
132    /// [`notify_n()`]: Notify::notify_n
133    #[inline]
134    pub fn notify_waiters(&self, n: NonZeroUsize) {
135        self.event.notify(n.get());
136    }
137
138    /// Wait for a notification.
139    ///
140    /// Each `Notify` value holds a number of permits. If a permit is available
141    /// from an earlier call to [`notify()`] or [`notify_n()`], then
142    /// `notified().await` will complete immediately, consuming one permit.
143    /// Otherwise, `notified().await` waits for a permit to be made available.
144    ///
145    /// This method is cancel safety.
146    ///
147    /// [`notify()`]: Notify::notify
148    /// [`notify_n()`]: Notify::notify_n
149    #[inline]
150    pub async fn notified(&self) {
151        loop {
152            if self.fast_path() {
153                return;
154            }
155
156            listener!(self.event => listener);
157
158            if self.fast_path() {
159                return;
160            }
161
162            listener.await;
163        }
164    }
165
166    fn fast_path(&self) -> bool {
167        // to support old version rustc
168        #[allow(deprecated)]
169        self.count
170            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |c| c.checked_sub(1))
171            .is_ok()
172    }
173}
174
175pin_project! {
176    /// A [`Stream`](Stream) [`Notify`] wrapper
177    pub struct NotifyStream<T: Deref<Target=Notify>> {
178        #[pin]
179        notify: T,
180        listener: Option<EventListener>,
181    }
182}
183
184impl<T: Deref<Target = Notify>> NotifyStream<T> {
185    /// Create [`NotifyStream`] from `T`
186    pub const fn new(notify: T) -> Self {
187        Self {
188            notify,
189            listener: None,
190        }
191    }
192
193    /// acquire the inner [`T`]
194    pub fn into_inner(self) -> T {
195        self.notify
196    }
197}
198
199impl<T: Deref<Target = Notify>> AsRef<Notify> for NotifyStream<T> {
200    fn as_ref(&self) -> &Notify {
201        self.notify.deref()
202    }
203}
204
205impl<T: Deref<Target = Notify>> Stream for NotifyStream<T> {
206    type Item = ();
207
208    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
209        let this = self.project();
210        let notify = this.notify.deref();
211
212        loop {
213            if notify.fast_path() {
214                *this.listener = None;
215
216                return Poll::Ready(Some(()));
217            }
218
219            match this.listener.as_mut() {
220                None => {
221                    let listener = notify.event.listen();
222                    *this.listener = Some(listener);
223                }
224                Some(listener) => {
225                    ready!(Pin::new(listener).poll(cx));
226                }
227            }
228        }
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use std::sync::Arc;
235
236    use futures_util::{FutureExt, StreamExt, select};
237
238    use super::*;
239
240    #[test]
241    fn test() {
242        async_global_executor::block_on(async {
243            let notify = Arc::new(Notify::new());
244            let notify2 = notify.clone();
245
246            async_global_executor::spawn(async move {
247                notify2.notify();
248                println!("sent notification");
249            })
250            .detach();
251
252            println!("received notification");
253            notify.notified().await;
254        })
255    }
256
257    #[test]
258    fn test_multi_notify() {
259        async_global_executor::block_on(async {
260            let notify = Arc::new(Notify::new());
261            let notify2 = notify.clone();
262
263            // 2 permits
264            notify.notify();
265            notify.notify();
266
267            // First completes (1 permit consumed)
268            select! {
269                _ = notify2.notified().fuse() => {}
270                default => unreachable!("there should be notified")
271            }
272
273            // Second completes (1 permit consumed)
274            select! {
275                _ = notify2.notified().fuse() => {}
276                default => unreachable!("there should be notified")
277            }
278
279            // No permits left, third would block
280            select! {
281                _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
282                default => {}
283            }
284
285            notify.notify();
286
287            // Third completes
288            select! {
289                _ = notify2.notified().fuse() => {}
290                default => unreachable!("there should be notified")
291            }
292        })
293    }
294
295    #[test]
296    fn test_notify_n() {
297        async_global_executor::block_on(async {
298            let notify = Arc::new(Notify::new());
299            let notify2 = notify.clone();
300
301            notify.notify_n(3.try_into().unwrap());
302
303            for _ in 0..3 {
304                select! {
305                    _ = notify2.notified().fuse() => {}
306                    default => unreachable!("there should be notified")
307                }
308            }
309
310            select! {
311                _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
312                default => {}
313            }
314        })
315    }
316
317    #[test]
318    fn test_notify_waiters() {
319        async_global_executor::block_on(async {
320            let notify = Arc::new(Notify::new());
321            let notify2 = notify.clone();
322            let notify3 = notify.clone();
323
324            let t1 = async_global_executor::spawn(async move {
325                notify2.notified().await;
326            });
327            let t2 = async_global_executor::spawn(async move {
328                notify3.notified().await;
329            });
330
331            // Give tasks time to start waiting
332            async_global_executor::spawn(async {}).await;
333
334            // 1 permit, wake 2 waiters - only 1 can complete
335            notify.notify();
336            notify.notify_waiters(NonZeroUsize::new(2).unwrap());
337
338            // One completes. Add permit for the other.
339            notify.notify();
340
341            t1.await;
342            t2.await;
343        })
344    }
345
346    #[test]
347    fn stream() {
348        async_global_executor::block_on(async {
349            let notify = Arc::new(Notify::new());
350            let mut notify_stream = NotifyStream::new(notify.clone());
351
352            async_global_executor::spawn(async move {
353                notify.notify();
354                println!("sent notification");
355            })
356            .detach();
357
358            notify_stream.next().await.unwrap();
359        })
360    }
361}