async_notify/lib.rs
1//! A general version async Notify, like `tokio` Notify but can work with any async runtime.
2
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::task::{Context, Poll, ready};
9
10use event_listener::{Event, EventListener, listener};
11use futures_core::Stream;
12use pin_project_lite::pin_project;
13
14/// Notify a single task to wake up.
15///
16/// `Notify` provides a basic mechanism to notify a single task of an event.
17/// `Notify` itself does not carry any data. Instead, it is to be used to signal
18/// another task to perform an operation.
19///
20/// If [`notify()`] is called **before** [`notified().await`], then the next call to
21/// [`notified().await`] will complete immediately, consuming one permit. Permits
22/// accumulate: each call to [`notify()`] or [`notify_n()`] adds permits that can
23/// be consumed by subsequent [`notified().await`] calls.
24///
25/// [`notify()`]: Notify::notify
26/// [`notified().await`]: Notify::notified()
27///
28/// # Examples
29///
30/// Basic usage.
31///
32/// ```
33/// use std::sync::Arc;
34/// use async_notify::Notify;
35///
36/// async_global_executor::block_on(async {
37/// let notify = Arc::new(Notify::new());
38/// let notify2 = notify.clone();
39///
40/// async_global_executor::spawn(async move {
41/// notify2.notify();
42/// println!("sent notification");
43/// })
44/// .detach();
45///
46/// println!("received notification");
47/// notify.notified().await;
48/// })
49/// ```
50#[derive(Debug, Default)]
51pub struct Notify {
52 count: AtomicUsize,
53 event: Event,
54}
55
56/// Like tokio Notify, this is a runtime independent Notify.
57impl Notify {
58 /// Create a [`Notify`]
59 pub const fn new() -> Self {
60 Self {
61 count: AtomicUsize::new(0),
62 event: Event::new(),
63 }
64 }
65
66 /// Notifies a waiting task
67 ///
68 /// Adds one permit to this `Notify`. If a task is currently waiting on
69 /// [`notified().await`], that task will be woken and complete. Otherwise,
70 /// the permit is stored and the next call to [`notified().await`] will
71 /// complete immediately. Permits accumulate across multiple `notify()` calls.
72 ///
73 /// [`notified().await`]: Notify::notified()
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// use std::sync::Arc;
79 /// use async_notify::Notify;
80 ///
81 /// async_global_executor::block_on(async {
82 /// let notify = Arc::new(Notify::new());
83 /// let notify2 = notify.clone();
84 ///
85 /// async_global_executor::spawn(async move {
86 /// notify2.notify();
87 /// println!("sent notification");
88 /// })
89 /// .detach();
90 ///
91 /// println!("received notification");
92 /// notify.notified().await;
93 /// })
94 /// ```
95 #[inline]
96 pub fn notify(&self) {
97 self.notify_n(NonZeroUsize::new(1).unwrap())
98 }
99
100 /// Grants `n` permits and notifies up to `n` waiting tasks.
101 ///
102 /// Adds `n` permits to this `Notify`. If there are tasks currently waiting
103 /// on [`notified().await`], up to `n` of them will be woken and complete,
104 /// each consuming one permit. If no tasks are waiting, the permits are
105 /// stored and the next up to `n` calls to [`notified().await`] will complete
106 /// immediately.
107 ///
108 /// This is a generalization of [`notify()`] which is equivalent to
109 /// `notify_n(NonZeroUsize::MIN)`.
110 ///
111 /// [`notified().await`]: Notify::notified()
112 /// [`notify()`]: Notify::notify
113 #[inline]
114 pub fn notify_n(&self, n: NonZeroUsize) {
115 let n = n.get();
116 self.count.fetch_add(n, Ordering::Release);
117 self.event.notify(n);
118 }
119
120 /// Wakes up to `n` waiting tasks to compete for existing permits.
121 ///
122 /// Unlike [`notify_n()`], this does **not** add any permits. It only wakes
123 /// up to `n` tasks that are waiting on [`notified().await`]. Those tasks
124 /// will then compete for whatever permits are currently available. At most
125 /// one task can consume each available permit; the rest will wait for the
126 /// next notification.
127 ///
128 /// Use this when you want to wake multiple waiters to race for a single
129 /// resource (e.g. thundering herd mitigation).
130 ///
131 /// [`notified().await`]: Notify::notified()
132 /// [`notify_n()`]: Notify::notify_n
133 #[inline]
134 pub fn notify_waiters(&self, n: NonZeroUsize) {
135 self.event.notify(n.get());
136 }
137
138 /// Wait for a notification.
139 ///
140 /// Each `Notify` value holds a number of permits. If a permit is available
141 /// from an earlier call to [`notify()`] or [`notify_n()`], then
142 /// `notified().await` will complete immediately, consuming one permit.
143 /// Otherwise, `notified().await` waits for a permit to be made available.
144 ///
145 /// This method is cancel safety.
146 ///
147 /// [`notify()`]: Notify::notify
148 /// [`notify_n()`]: Notify::notify_n
149 #[inline]
150 pub async fn notified(&self) {
151 loop {
152 if self.fast_path() {
153 return;
154 }
155
156 listener!(self.event => listener);
157
158 if self.fast_path() {
159 return;
160 }
161
162 listener.await;
163 }
164 }
165
166 fn fast_path(&self) -> bool {
167 self.count
168 .try_update(Ordering::AcqRel, Ordering::Acquire, |c| c.checked_sub(1))
169 .is_ok()
170 }
171}
172
173pin_project! {
174 /// A [`Stream`](Stream) [`Notify`] wrapper
175 pub struct NotifyStream<T: Deref<Target=Notify>> {
176 #[pin]
177 notify: T,
178 listener: Option<EventListener>,
179 }
180}
181
182impl<T: Deref<Target = Notify>> NotifyStream<T> {
183 /// Create [`NotifyStream`] from `T`
184 pub const fn new(notify: T) -> Self {
185 Self {
186 notify,
187 listener: None,
188 }
189 }
190
191 /// acquire the inner [`T`]
192 pub fn into_inner(self) -> T {
193 self.notify
194 }
195}
196
197impl<T: Deref<Target = Notify>> AsRef<Notify> for NotifyStream<T> {
198 fn as_ref(&self) -> &Notify {
199 self.notify.deref()
200 }
201}
202
203impl<T: Deref<Target = Notify>> Stream for NotifyStream<T> {
204 type Item = ();
205
206 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207 let this = self.project();
208 let notify = this.notify.deref();
209
210 loop {
211 if notify.fast_path() {
212 *this.listener = None;
213
214 return Poll::Ready(Some(()));
215 }
216
217 match this.listener.as_mut() {
218 None => {
219 let listener = notify.event.listen();
220 *this.listener = Some(listener);
221 }
222 Some(listener) => {
223 ready!(Pin::new(listener).poll(cx));
224 }
225 }
226 }
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use std::sync::Arc;
233
234 use futures_util::{FutureExt, StreamExt, select};
235
236 use super::*;
237
238 #[test]
239 fn test() {
240 async_global_executor::block_on(async {
241 let notify = Arc::new(Notify::new());
242 let notify2 = notify.clone();
243
244 async_global_executor::spawn(async move {
245 notify2.notify();
246 println!("sent notification");
247 })
248 .detach();
249
250 println!("received notification");
251 notify.notified().await;
252 })
253 }
254
255 #[test]
256 fn test_multi_notify() {
257 async_global_executor::block_on(async {
258 let notify = Arc::new(Notify::new());
259 let notify2 = notify.clone();
260
261 // 2 permits
262 notify.notify();
263 notify.notify();
264
265 // First completes (1 permit consumed)
266 select! {
267 _ = notify2.notified().fuse() => {}
268 default => unreachable!("there should be notified")
269 }
270
271 // Second completes (1 permit consumed)
272 select! {
273 _ = notify2.notified().fuse() => {}
274 default => unreachable!("there should be notified")
275 }
276
277 // No permits left, third would block
278 select! {
279 _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
280 default => {}
281 }
282
283 notify.notify();
284
285 // Third completes
286 select! {
287 _ = notify2.notified().fuse() => {}
288 default => unreachable!("there should be notified")
289 }
290 })
291 }
292
293 #[test]
294 fn test_notify_n() {
295 async_global_executor::block_on(async {
296 let notify = Arc::new(Notify::new());
297 let notify2 = notify.clone();
298
299 notify.notify_n(3.try_into().unwrap());
300
301 for _ in 0..3 {
302 select! {
303 _ = notify2.notified().fuse() => {}
304 default => unreachable!("there should be notified")
305 }
306 }
307
308 select! {
309 _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
310 default => {}
311 }
312 })
313 }
314
315 #[test]
316 fn test_notify_waiters() {
317 async_global_executor::block_on(async {
318 let notify = Arc::new(Notify::new());
319 let notify2 = notify.clone();
320 let notify3 = notify.clone();
321
322 let t1 = async_global_executor::spawn(async move {
323 notify2.notified().await;
324 });
325 let t2 = async_global_executor::spawn(async move {
326 notify3.notified().await;
327 });
328
329 // Give tasks time to start waiting
330 async_global_executor::spawn(async {}).await;
331
332 // 1 permit, wake 2 waiters - only 1 can complete
333 notify.notify();
334 notify.notify_waiters(NonZeroUsize::new(2).unwrap());
335
336 // One completes. Add permit for the other.
337 notify.notify();
338
339 t1.await;
340 t2.await;
341 })
342 }
343
344 #[test]
345 fn stream() {
346 async_global_executor::block_on(async {
347 let notify = Arc::new(Notify::new());
348 let mut notify_stream = NotifyStream::new(notify.clone());
349
350 async_global_executor::spawn(async move {
351 notify.notify();
352 println!("sent notification");
353 })
354 .detach();
355
356 notify_stream.next().await.unwrap();
357 })
358 }
359}