async_notify/
lib.rs

1//! A general version async Notify, like `tokio` Notify but can work with any async runtime.
2
3use std::future::Future;
4use std::ops::Deref;
5use std::pin::{pin, Pin};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::task::{ready, Context, Poll};
8
9use event_listener::{Event, EventListener};
10use futures_core::Stream;
11use pin_project_lite::pin_project;
12
13/// Notify a single task to wake up.
14///
15/// `Notify` provides a basic mechanism to notify a single task of an event.
16/// `Notify` itself does not carry any data. Instead, it is to be used to signal
17/// another task to perform an operation.
18///
19/// If [`notify()`] is called **before** [`notified().await`], then the next call to
20/// [`notified().await`] will complete immediately, consuming the permit. Any
21/// subsequent calls to [`notified().await`] will wait for a new permit.
22///
23/// If [`notify()`] is called **multiple** times before [`notified().await`], only a
24/// **single** permit is stored. The next call to [`notified().await`] will
25/// complete immediately, but the one after will wait for a new permit.
26///
27/// [`notify()`]: Notify::notify
28/// [`notified().await`]: Notify::notified()
29///
30/// # Examples
31///
32/// Basic usage.
33///
34/// ```
35/// use std::sync::Arc;
36/// use async_notify::Notify;
37///
38/// async_global_executor::block_on(async {
39///    let notify = Arc::new(Notify::new());
40///    let notify2 = notify.clone();
41///
42///    async_global_executor::spawn(async move {
43///        notify2.notify();
44///        println!("sent notification");
45///    })
46///    .detach();
47///
48///    println!("received notification");
49///    notify.notified().await;
50/// })
51/// ```
52#[derive(Debug, Default)]
53pub struct Notify {
54    count: AtomicBool,
55    event: Event,
56}
57
58/// Like tokio Notify, this is a runtime independent Notify.
59impl Notify {
60    /// Create a [`Notify`]
61    pub const fn new() -> Self {
62        Self {
63            count: AtomicBool::new(false),
64            event: Event::new(),
65        }
66    }
67
68    /// Notifies a waiting task
69    ///
70    /// If a task is currently waiting, that task is notified. Otherwise, a
71    /// permit is stored in this `Notify` value and the **next** call to
72    /// [`notified().await`] will complete immediately consuming the permit made
73    /// available by this call to `notify()`.
74    ///
75    /// At most one permit may be stored by `Notify`. Many sequential calls to
76    /// `notify` will result in a single permit being stored. The next call to
77    /// `notified().await` will complete immediately, but the one after that
78    /// will wait.
79    ///
80    /// [`notified().await`]: Notify::notified()
81    ///
82    /// # Examples
83    ///
84    /// ```
85    /// use std::sync::Arc;
86    /// use async_notify::Notify;
87    ///
88    /// async_global_executor::block_on(async {
89    ///    let notify = Arc::new(Notify::new());
90    ///    let notify2 = notify.clone();
91    ///
92    ///    async_global_executor::spawn(async move {
93    ///        notify2.notify();
94    ///        println!("sent notification");
95    ///    })
96    ///    .detach();
97    ///
98    ///    println!("received notification");
99    ///    notify.notified().await;
100    /// })
101    /// ```
102    #[inline]
103    pub fn notify(&self) {
104        self.count.store(true, Ordering::Release);
105        self.event.notify(1);
106    }
107
108    /// Wait for a notification.
109    ///
110    /// Each `Notify` value holds a single permit. If a permit is available from
111    /// an earlier call to [`notify()`], then `notified().await` will complete
112    /// immediately, consuming that permit. Otherwise, `notified().await` waits
113    /// for a permit to be made available by the next call to `notify()`.
114    ///
115    /// This method is cancel safety.
116    ///
117    /// [`notify()`]: Notify::notify
118    #[inline]
119    pub async fn notified(&self) {
120        loop {
121            if self.fast_path() {
122                return;
123            }
124
125            let listener = EventListener::new();
126            let mut listener = pin!(listener);
127            listener.as_mut().listen(&self.event);
128
129            if self.fast_path() {
130                return;
131            }
132
133            listener.await;
134        }
135    }
136
137    fn fast_path(&self) -> bool {
138        self.count
139            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
140            .is_ok()
141    }
142}
143
144pin_project! {
145    /// A [`Stream`](Stream) [`Notify`] wrapper
146    pub struct NotifyStream<T: Deref<Target=Notify>> {
147        #[pin]
148        notify: T,
149        listener: Option<Pin<Box<EventListener>>>,
150    }
151}
152
153impl<T: Deref<Target = Notify>> NotifyStream<T> {
154    /// Create [`NotifyStream`] from `T`
155    pub fn new(notify: T) -> Self {
156        Self {
157            notify,
158            listener: None,
159        }
160    }
161
162    /// acquire the inner [`T`]
163    pub fn into_inner(self) -> T {
164        self.notify
165    }
166}
167
168impl<T: Deref<Target = Notify>> AsRef<Notify> for NotifyStream<T> {
169    fn as_ref(&self) -> &Notify {
170        self.notify.deref()
171    }
172}
173
174impl<T: Deref<Target = Notify>> Stream for NotifyStream<T> {
175    type Item = ();
176
177    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
178        let this = self.project();
179        let notify = this.notify.deref();
180
181        loop {
182            if notify.fast_path() {
183                *this.listener = None;
184
185                return Poll::Ready(Some(()));
186            }
187
188            match this.listener.as_mut() {
189                None => {
190                    let listener = notify.event.listen();
191                    *this.listener = Some(listener);
192                }
193                Some(listener) => {
194                    ready!(listener.as_mut().poll(cx));
195                }
196            }
197        }
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use std::sync::Arc;
204
205    use futures_util::{select, FutureExt, StreamExt};
206
207    use super::*;
208
209    #[test]
210    fn test() {
211        async_global_executor::block_on(async {
212            let notify = Arc::new(Notify::new());
213            let notify2 = notify.clone();
214
215            async_global_executor::spawn(async move {
216                notify2.notify();
217                println!("sent notification");
218            })
219            .detach();
220
221            println!("received notification");
222            notify.notified().await;
223        })
224    }
225
226    #[test]
227    fn test_multi_notify() {
228        async_global_executor::block_on(async {
229            let notify = Arc::new(Notify::new());
230            let notify2 = notify.clone();
231
232            notify.notify();
233            notify.notify();
234
235            select! {
236                _ = notify2.notified().fuse() => {}
237                default => unreachable!("there should be notified")
238            }
239
240            select! {
241                _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
242                default => {}
243            }
244
245            notify.notify();
246
247            select! {
248                _ = notify2.notified().fuse() => {}
249                default => unreachable!("there should be notified")
250            }
251        })
252    }
253
254    #[test]
255    fn stream() {
256        async_global_executor::block_on(async {
257            let notify = Arc::new(Notify::new());
258            let mut notify_stream = NotifyStream::new(notify.clone());
259
260            async_global_executor::spawn(async move {
261                notify.notify();
262                println!("sent notification");
263            })
264            .detach();
265
266            notify_stream.next().await.unwrap();
267        })
268    }
269}