async_notify/lib.rs
1//! A general version async Notify, like `tokio` Notify but can work with any async runtime.
2
3use std::future::Future;
4use std::ops::Deref;
5use std::pin::{pin, Pin};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::task::{ready, Context, Poll};
8
9use event_listener::{Event, EventListener};
10use futures_core::Stream;
11use pin_project_lite::pin_project;
12
13/// Notify a single task to wake up.
14///
15/// `Notify` provides a basic mechanism to notify a single task of an event.
16/// `Notify` itself does not carry any data. Instead, it is to be used to signal
17/// another task to perform an operation.
18///
19/// If [`notify()`] is called **before** [`notified().await`], then the next call to
20/// [`notified().await`] will complete immediately, consuming the permit. Any
21/// subsequent calls to [`notified().await`] will wait for a new permit.
22///
23/// If [`notify()`] is called **multiple** times before [`notified().await`], only a
24/// **single** permit is stored. The next call to [`notified().await`] will
25/// complete immediately, but the one after will wait for a new permit.
26///
27/// [`notify()`]: Notify::notify
28/// [`notified().await`]: Notify::notified()
29///
30/// # Examples
31///
32/// Basic usage.
33///
34/// ```
35/// use std::sync::Arc;
36/// use async_notify::Notify;
37///
38/// async_global_executor::block_on(async {
39/// let notify = Arc::new(Notify::new());
40/// let notify2 = notify.clone();
41///
42/// async_global_executor::spawn(async move {
43/// notify2.notify();
44/// println!("sent notification");
45/// })
46/// .detach();
47///
48/// println!("received notification");
49/// notify.notified().await;
50/// })
51/// ```
52#[derive(Debug, Default)]
53pub struct Notify {
54 count: AtomicBool,
55 event: Event,
56}
57
58/// Like tokio Notify, this is a runtime independent Notify.
59impl Notify {
60 /// Create a [`Notify`]
61 pub const fn new() -> Self {
62 Self {
63 count: AtomicBool::new(false),
64 event: Event::new(),
65 }
66 }
67
68 /// Notifies a waiting task
69 ///
70 /// If a task is currently waiting, that task is notified. Otherwise, a
71 /// permit is stored in this `Notify` value and the **next** call to
72 /// [`notified().await`] will complete immediately consuming the permit made
73 /// available by this call to `notify()`.
74 ///
75 /// At most one permit may be stored by `Notify`. Many sequential calls to
76 /// `notify` will result in a single permit being stored. The next call to
77 /// `notified().await` will complete immediately, but the one after that
78 /// will wait.
79 ///
80 /// [`notified().await`]: Notify::notified()
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// use std::sync::Arc;
86 /// use async_notify::Notify;
87 ///
88 /// async_global_executor::block_on(async {
89 /// let notify = Arc::new(Notify::new());
90 /// let notify2 = notify.clone();
91 ///
92 /// async_global_executor::spawn(async move {
93 /// notify2.notify();
94 /// println!("sent notification");
95 /// })
96 /// .detach();
97 ///
98 /// println!("received notification");
99 /// notify.notified().await;
100 /// })
101 /// ```
102 #[inline]
103 pub fn notify(&self) {
104 self.count.store(true, Ordering::Release);
105 self.event.notify(1);
106 }
107
108 /// Wait for a notification.
109 ///
110 /// Each `Notify` value holds a single permit. If a permit is available from
111 /// an earlier call to [`notify()`], then `notified().await` will complete
112 /// immediately, consuming that permit. Otherwise, `notified().await` waits
113 /// for a permit to be made available by the next call to `notify()`.
114 ///
115 /// This method is cancel safety.
116 ///
117 /// [`notify()`]: Notify::notify
118 #[inline]
119 pub async fn notified(&self) {
120 loop {
121 if self.fast_path() {
122 return;
123 }
124
125 let listener = EventListener::new();
126 let mut listener = pin!(listener);
127 listener.as_mut().listen(&self.event);
128
129 if self.fast_path() {
130 return;
131 }
132
133 listener.await;
134 }
135 }
136
137 fn fast_path(&self) -> bool {
138 self.count
139 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
140 .is_ok()
141 }
142}
143
144pin_project! {
145 /// A [`Stream`](Stream) [`Notify`] wrapper
146 pub struct NotifyStream<T: Deref<Target=Notify>> {
147 #[pin]
148 notify: T,
149 listener: Option<Pin<Box<EventListener>>>,
150 }
151}
152
153impl<T: Deref<Target = Notify>> NotifyStream<T> {
154 /// Create [`NotifyStream`] from `T`
155 pub fn new(notify: T) -> Self {
156 Self {
157 notify,
158 listener: None,
159 }
160 }
161
162 /// acquire the inner [`T`]
163 pub fn into_inner(self) -> T {
164 self.notify
165 }
166}
167
168impl<T: Deref<Target = Notify>> AsRef<Notify> for NotifyStream<T> {
169 fn as_ref(&self) -> &Notify {
170 self.notify.deref()
171 }
172}
173
174impl<T: Deref<Target = Notify>> Stream for NotifyStream<T> {
175 type Item = ();
176
177 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
178 let this = self.project();
179 let notify = this.notify.deref();
180
181 loop {
182 if notify.fast_path() {
183 *this.listener = None;
184
185 return Poll::Ready(Some(()));
186 }
187
188 match this.listener.as_mut() {
189 None => {
190 let listener = notify.event.listen();
191 *this.listener = Some(listener);
192 }
193 Some(listener) => {
194 ready!(listener.as_mut().poll(cx));
195 }
196 }
197 }
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use std::sync::Arc;
204
205 use futures_util::{select, FutureExt, StreamExt};
206
207 use super::*;
208
209 #[test]
210 fn test() {
211 async_global_executor::block_on(async {
212 let notify = Arc::new(Notify::new());
213 let notify2 = notify.clone();
214
215 async_global_executor::spawn(async move {
216 notify2.notify();
217 println!("sent notification");
218 })
219 .detach();
220
221 println!("received notification");
222 notify.notified().await;
223 })
224 }
225
226 #[test]
227 fn test_multi_notify() {
228 async_global_executor::block_on(async {
229 let notify = Arc::new(Notify::new());
230 let notify2 = notify.clone();
231
232 notify.notify();
233 notify.notify();
234
235 select! {
236 _ = notify2.notified().fuse() => {}
237 default => unreachable!("there should be notified")
238 }
239
240 select! {
241 _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
242 default => {}
243 }
244
245 notify.notify();
246
247 select! {
248 _ = notify2.notified().fuse() => {}
249 default => unreachable!("there should be notified")
250 }
251 })
252 }
253
254 #[test]
255 fn stream() {
256 async_global_executor::block_on(async {
257 let notify = Arc::new(Notify::new());
258 let mut notify_stream = NotifyStream::new(notify.clone());
259
260 async_global_executor::spawn(async move {
261 notify.notify();
262 println!("sent notification");
263 })
264 .detach();
265
266 notify_stream.next().await.unwrap();
267 })
268 }
269}