async_notify/lib.rs
1//! A general version async Notify, like `tokio` Notify but can work with any async runtime.
2
3use std::future::Future;
4use std::ops::Deref;
5use std::pin::{pin, Pin};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::task::{ready, Context, Poll};
8
9use event_listener::{Event, EventListener};
10use futures_core::Stream;
11use pin_project_lite::pin_project;
12
13/// Notify a single task to wake up.
14///
15/// `Notify` provides a basic mechanism to notify a single task of an event.
16/// `Notify` itself does not carry any data. Instead, it is to be used to signal
17/// another task to perform an operation.
18///
19/// If [`notify()`] is called **before** [`notified().await`], then the next call to
20/// [`notified().await`] will complete immediately, consuming the permit. Any
21/// subsequent calls to [`notified().await`] will wait for a new permit.
22///
23/// If [`notify()`] is called **multiple** times before [`notified().await`], only a
24/// **single** permit is stored. The next call to [`notified().await`] will
25/// complete immediately, but the one after will wait for a new permit.
26///
27/// [`notify()`]: Notify::notify
28/// [`notified().await`]: Notify::notified()
29///
30/// # Examples
31///
32/// Basic usage.
33///
34/// ```
35/// use std::sync::Arc;
36/// use async_notify::Notify;
37///
38/// async_global_executor::block_on(async {
39/// let notify = Arc::new(Notify::new());
40/// let notify2 = notify.clone();
41///
42/// async_global_executor::spawn(async move {
43/// notify2.notify();
44/// println!("sent notification");
45/// })
46/// .detach();
47///
48/// println!("received notification");
49/// notify.notified().await;
50/// })
51/// ```
52#[derive(Debug)]
53pub struct Notify {
54 count: AtomicBool,
55 event: Event,
56}
57
58/// Like tokio Notify, this is a runtime independent Notify.
59impl Notify {
60 pub fn new() -> Self {
61 Self {
62 count: Default::default(),
63 event: Default::default(),
64 }
65 }
66
67 /// Notifies a waiting task
68 ///
69 /// If a task is currently waiting, that task is notified. Otherwise, a
70 /// permit is stored in this `Notify` value and the **next** call to
71 /// [`notified().await`] will complete immediately consuming the permit made
72 /// available by this call to `notify()`.
73 ///
74 /// At most one permit may be stored by `Notify`. Many sequential calls to
75 /// `notify` will result in a single permit being stored. The next call to
76 /// `notified().await` will complete immediately, but the one after that
77 /// will wait.
78 ///
79 /// [`notified().await`]: Notify::notified()
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use std::sync::Arc;
85 /// use async_notify::Notify;
86 ///
87 /// async_global_executor::block_on(async {
88 /// let notify = Arc::new(Notify::new());
89 /// let notify2 = notify.clone();
90 ///
91 /// async_global_executor::spawn(async move {
92 /// notify2.notify();
93 /// println!("sent notification");
94 /// })
95 /// .detach();
96 ///
97 /// println!("received notification");
98 /// notify.notified().await;
99 /// })
100 /// ```
101 #[inline]
102 pub fn notify(&self) {
103 self.count.store(true, Ordering::Release);
104 self.event.notify(1);
105 }
106
107 /// Wait for a notification.
108 ///
109 /// Each `Notify` value holds a single permit. If a permit is available from
110 /// an earlier call to [`notify()`], then `notified().await` will complete
111 /// immediately, consuming that permit. Otherwise, `notified().await` waits
112 /// for a permit to be made available by the next call to `notify()`.
113 ///
114 /// This method is cancel safety.
115 ///
116 /// [`notify()`]: Notify::notify
117 #[inline]
118 pub async fn notified(&self) {
119 loop {
120 if self.fast_path() {
121 return;
122 }
123
124 let listener = EventListener::new();
125 let mut listener = pin!(listener);
126 listener.as_mut().listen(&self.event);
127
128 if self.fast_path() {
129 return;
130 }
131
132 listener.await;
133 }
134 }
135
136 fn fast_path(&self) -> bool {
137 self.count
138 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
139 .is_ok()
140 }
141}
142
143impl Default for Notify {
144 fn default() -> Notify {
145 Notify::new()
146 }
147}
148
149pin_project! {
150 /// A [`Stream`](Stream) [`Notify`] wrapper
151 pub struct NotifyStream<T: Deref<Target=Notify>> {
152 #[pin]
153 notify: T,
154 listener: Option<Pin<Box<EventListener>>>,
155 }
156}
157
158impl<T: Deref<Target = Notify>> NotifyStream<T> {
159 /// Create [`NotifyStream`] from `T`
160 pub fn new(notify: T) -> Self {
161 Self {
162 notify,
163 listener: None,
164 }
165 }
166
167 /// acquire the inner [`T`]
168 pub fn into_inner(self) -> T {
169 self.notify
170 }
171}
172
173impl<T: Deref<Target = Notify>> AsRef<Notify> for NotifyStream<T> {
174 fn as_ref(&self) -> &Notify {
175 self.notify.deref()
176 }
177}
178
179impl<T: Deref<Target = Notify>> Stream for NotifyStream<T> {
180 type Item = ();
181
182 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
183 let this = self.project();
184 let notify = this.notify.deref();
185
186 loop {
187 if notify.fast_path() {
188 *this.listener = None;
189
190 return Poll::Ready(Some(()));
191 }
192
193 match this.listener.as_mut() {
194 None => {
195 let listener = notify.event.listen();
196 *this.listener = Some(listener);
197 }
198 Some(listener) => {
199 ready!(listener.as_mut().poll(cx));
200 }
201 }
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use std::sync::Arc;
209
210 use futures_util::{select, FutureExt, StreamExt};
211
212 use super::*;
213
214 #[test]
215 fn test() {
216 async_global_executor::block_on(async {
217 let notify = Arc::new(Notify::new());
218 let notify2 = notify.clone();
219
220 async_global_executor::spawn(async move {
221 notify2.notify();
222 println!("sent notification");
223 })
224 .detach();
225
226 println!("received notification");
227 notify.notified().await;
228 })
229 }
230
231 #[test]
232 fn test_multi_notify() {
233 async_global_executor::block_on(async {
234 let notify = Arc::new(Notify::new());
235 let notify2 = notify.clone();
236
237 notify.notify();
238 notify.notify();
239
240 select! {
241 _ = notify2.notified().fuse() => {}
242 default => unreachable!("there should be notified")
243 }
244
245 select! {
246 _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
247 default => {}
248 }
249
250 notify.notify();
251
252 select! {
253 _ = notify2.notified().fuse() => {}
254 default => unreachable!("there should be notified")
255 }
256 })
257 }
258
259 #[test]
260 fn stream() {
261 async_global_executor::block_on(async {
262 let notify = Arc::new(Notify::new());
263 let mut notify_stream = NotifyStream::new(notify.clone());
264
265 async_global_executor::spawn(async move {
266 notify.notify();
267 println!("sent notification");
268 })
269 .detach();
270
271 notify_stream.next().await.unwrap();
272 })
273 }
274}