async_notify/
lib.rs

1//! A general version async Notify, like `tokio` Notify but can work with any async runtime.
2
3use std::future::Future;
4use std::ops::Deref;
5use std::pin::{pin, Pin};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::task::{ready, Context, Poll};
8
9use event_listener::{Event, EventListener};
10use futures_core::Stream;
11use pin_project_lite::pin_project;
12
13/// Notify a single task to wake up.
14///
15/// `Notify` provides a basic mechanism to notify a single task of an event.
16/// `Notify` itself does not carry any data. Instead, it is to be used to signal
17/// another task to perform an operation.
18///
19/// If [`notify()`] is called **before** [`notified().await`], then the next call to
20/// [`notified().await`] will complete immediately, consuming the permit. Any
21/// subsequent calls to [`notified().await`] will wait for a new permit.
22///
23/// If [`notify()`] is called **multiple** times before [`notified().await`], only a
24/// **single** permit is stored. The next call to [`notified().await`] will
25/// complete immediately, but the one after will wait for a new permit.
26///
27/// [`notify()`]: Notify::notify
28/// [`notified().await`]: Notify::notified()
29///
30/// # Examples
31///
32/// Basic usage.
33///
34/// ```
35/// use std::sync::Arc;
36/// use async_notify::Notify;
37///
38/// async_global_executor::block_on(async {
39///    let notify = Arc::new(Notify::new());
40///    let notify2 = notify.clone();
41///
42///    async_global_executor::spawn(async move {
43///        notify2.notify();
44///        println!("sent notification");
45///    })
46///    .detach();
47///
48///    println!("received notification");
49///    notify.notified().await;
50/// })
51/// ```
52#[derive(Debug)]
53pub struct Notify {
54    count: AtomicBool,
55    event: Event,
56}
57
58/// Like tokio Notify, this is a runtime independent Notify.
59impl Notify {
60    pub fn new() -> Self {
61        Self {
62            count: Default::default(),
63            event: Default::default(),
64        }
65    }
66
67    /// Notifies a waiting task
68    ///
69    /// If a task is currently waiting, that task is notified. Otherwise, a
70    /// permit is stored in this `Notify` value and the **next** call to
71    /// [`notified().await`] will complete immediately consuming the permit made
72    /// available by this call to `notify()`.
73    ///
74    /// At most one permit may be stored by `Notify`. Many sequential calls to
75    /// `notify` will result in a single permit being stored. The next call to
76    /// `notified().await` will complete immediately, but the one after that
77    /// will wait.
78    ///
79    /// [`notified().await`]: Notify::notified()
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use std::sync::Arc;
85    /// use async_notify::Notify;
86    ///
87    /// async_global_executor::block_on(async {
88    ///    let notify = Arc::new(Notify::new());
89    ///    let notify2 = notify.clone();
90    ///
91    ///    async_global_executor::spawn(async move {
92    ///        notify2.notify();
93    ///        println!("sent notification");
94    ///    })
95    ///    .detach();
96    ///
97    ///    println!("received notification");
98    ///    notify.notified().await;
99    /// })
100    /// ```
101    #[inline]
102    pub fn notify(&self) {
103        self.count.store(true, Ordering::Release);
104        self.event.notify(1);
105    }
106
107    /// Wait for a notification.
108    ///
109    /// Each `Notify` value holds a single permit. If a permit is available from
110    /// an earlier call to [`notify()`], then `notified().await` will complete
111    /// immediately, consuming that permit. Otherwise, `notified().await` waits
112    /// for a permit to be made available by the next call to `notify()`.
113    ///
114    /// This method is cancel safety.
115    ///
116    /// [`notify()`]: Notify::notify
117    #[inline]
118    pub async fn notified(&self) {
119        loop {
120            if self.fast_path() {
121                return;
122            }
123
124            let listener = EventListener::new();
125            let mut listener = pin!(listener);
126            listener.as_mut().listen(&self.event);
127
128            if self.fast_path() {
129                return;
130            }
131
132            listener.await;
133        }
134    }
135
136    fn fast_path(&self) -> bool {
137        self.count
138            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
139            .is_ok()
140    }
141}
142
143impl Default for Notify {
144    fn default() -> Notify {
145        Notify::new()
146    }
147}
148
149pin_project! {
150    /// A [`Stream`](Stream) [`Notify`] wrapper
151    pub struct NotifyStream<T: Deref<Target=Notify>> {
152        #[pin]
153        notify: T,
154        listener: Option<Pin<Box<EventListener>>>,
155    }
156}
157
158impl<T: Deref<Target = Notify>> NotifyStream<T> {
159    /// Create [`NotifyStream`] from `T`
160    pub fn new(notify: T) -> Self {
161        Self {
162            notify,
163            listener: None,
164        }
165    }
166
167    /// acquire the inner [`T`]
168    pub fn into_inner(self) -> T {
169        self.notify
170    }
171}
172
173impl<T: Deref<Target = Notify>> AsRef<Notify> for NotifyStream<T> {
174    fn as_ref(&self) -> &Notify {
175        self.notify.deref()
176    }
177}
178
179impl<T: Deref<Target = Notify>> Stream for NotifyStream<T> {
180    type Item = ();
181
182    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
183        let this = self.project();
184        let notify = this.notify.deref();
185
186        loop {
187            if notify.fast_path() {
188                *this.listener = None;
189
190                return Poll::Ready(Some(()));
191            }
192
193            match this.listener.as_mut() {
194                None => {
195                    let listener = notify.event.listen();
196                    *this.listener = Some(listener);
197                }
198                Some(listener) => {
199                    ready!(listener.as_mut().poll(cx));
200                }
201            }
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use std::sync::Arc;
209
210    use futures_util::{select, FutureExt, StreamExt};
211
212    use super::*;
213
214    #[test]
215    fn test() {
216        async_global_executor::block_on(async {
217            let notify = Arc::new(Notify::new());
218            let notify2 = notify.clone();
219
220            async_global_executor::spawn(async move {
221                notify2.notify();
222                println!("sent notification");
223            })
224            .detach();
225
226            println!("received notification");
227            notify.notified().await;
228        })
229    }
230
231    #[test]
232    fn test_multi_notify() {
233        async_global_executor::block_on(async {
234            let notify = Arc::new(Notify::new());
235            let notify2 = notify.clone();
236
237            notify.notify();
238            notify.notify();
239
240            select! {
241                _ = notify2.notified().fuse() => {}
242                default => unreachable!("there should be notified")
243            }
244
245            select! {
246                _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
247                default => {}
248            }
249
250            notify.notify();
251
252            select! {
253                _ = notify2.notified().fuse() => {}
254                default => unreachable!("there should be notified")
255            }
256        })
257    }
258
259    #[test]
260    fn stream() {
261        async_global_executor::block_on(async {
262            let notify = Arc::new(Notify::new());
263            let mut notify_stream = NotifyStream::new(notify.clone());
264
265            async_global_executor::spawn(async move {
266                notify.notify();
267                println!("sent notification");
268            })
269            .detach();
270
271            notify_stream.next().await.unwrap();
272        })
273    }
274}