async_notify/lib.rs
1//! A general version async Notify, like `tokio` Notify but can work with any async runtime.
2
3use std::future::Future;
4use std::num::NonZeroUsize;
5use std::ops::Deref;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::task::{Context, Poll, ready};
9
10use event_listener::{Event, EventListener, listener};
11use futures_core::Stream;
12use pin_project_lite::pin_project;
13
14/// Notify a single task to wake up.
15///
16/// `Notify` provides a basic mechanism to notify a single task of an event.
17/// `Notify` itself does not carry any data. Instead, it is to be used to signal
18/// another task to perform an operation.
19///
20/// If [`notify()`] is called **before** [`notified().await`], then the next call to
21/// [`notified().await`] will complete immediately, consuming one permit. Permits
22/// accumulate: each call to [`notify()`] or [`notify_n()`] adds permits that can
23/// be consumed by subsequent [`notified().await`] calls.
24///
25/// [`notify()`]: Notify::notify
26/// [`notified().await`]: Notify::notified()
27///
28/// # Examples
29///
30/// Basic usage.
31///
32/// ```
33/// use std::sync::Arc;
34/// use async_notify::Notify;
35///
36/// async_global_executor::block_on(async {
37/// let notify = Arc::new(Notify::new());
38/// let notify2 = notify.clone();
39///
40/// async_global_executor::spawn(async move {
41/// notify2.notify();
42/// println!("sent notification");
43/// })
44/// .detach();
45///
46/// println!("received notification");
47/// notify.notified().await;
48/// })
49/// ```
50#[derive(Debug, Default)]
51pub struct Notify {
52 count: AtomicUsize,
53 event: Event,
54}
55
56/// Like tokio Notify, this is a runtime independent Notify.
57impl Notify {
58 /// Create a [`Notify`]
59 pub const fn new() -> Self {
60 Self {
61 count: AtomicUsize::new(0),
62 event: Event::new(),
63 }
64 }
65
66 /// Notifies a waiting task
67 ///
68 /// Adds one permit to this `Notify`. If a task is currently waiting on
69 /// [`notified().await`], that task will be woken and complete. Otherwise,
70 /// the permit is stored and the next call to [`notified().await`] will
71 /// complete immediately. Permits accumulate across multiple `notify()` calls.
72 ///
73 /// [`notified().await`]: Notify::notified()
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// use std::sync::Arc;
79 /// use async_notify::Notify;
80 ///
81 /// async_global_executor::block_on(async {
82 /// let notify = Arc::new(Notify::new());
83 /// let notify2 = notify.clone();
84 ///
85 /// async_global_executor::spawn(async move {
86 /// notify2.notify();
87 /// println!("sent notification");
88 /// })
89 /// .detach();
90 ///
91 /// println!("received notification");
92 /// notify.notified().await;
93 /// })
94 /// ```
95 #[inline]
96 pub fn notify(&self) {
97 self.notify_n(NonZeroUsize::new(1).unwrap())
98 }
99
100 /// Grants `n` permits and notifies up to `n` waiting tasks.
101 ///
102 /// Adds `n` permits to this `Notify`. If there are tasks currently waiting
103 /// on [`notified().await`], up to `n` of them will be woken and complete,
104 /// each consuming one permit. If no tasks are waiting, the permits are
105 /// stored and the next up to `n` calls to [`notified().await`] will complete
106 /// immediately.
107 ///
108 /// This is a generalization of [`notify()`] which is equivalent to
109 /// `notify_n(NonZeroUsize::MIN)`.
110 ///
111 /// [`notified().await`]: Notify::notified()
112 /// [`notify()`]: Notify::notify
113 #[inline]
114 pub fn notify_n(&self, n: NonZeroUsize) {
115 let n = n.get();
116 self.count.fetch_add(n, Ordering::Release);
117 self.event.notify(n);
118 }
119
120 /// Wakes up to `n` waiting tasks to compete for existing permits.
121 ///
122 /// Unlike [`notify_n()`], this does **not** add any permits. It only wakes
123 /// up to `n` tasks that are waiting on [`notified().await`]. Those tasks
124 /// will then compete for whatever permits are currently available. At most
125 /// one task can consume each available permit; the rest will wait for the
126 /// next notification.
127 ///
128 /// Use this when you want to wake multiple waiters to race for a single
129 /// resource (e.g. thundering herd mitigation).
130 ///
131 /// [`notified().await`]: Notify::notified()
132 /// [`notify_n()`]: Notify::notify_n
133 #[inline]
134 pub fn notify_waiters(&self, n: NonZeroUsize) {
135 self.event.notify(n.get());
136 }
137
138 /// Wait for a notification.
139 ///
140 /// Each `Notify` value holds a number of permits. If a permit is available
141 /// from an earlier call to [`notify()`] or [`notify_n()`], then
142 /// `notified().await` will complete immediately, consuming one permit.
143 /// Otherwise, `notified().await` waits for a permit to be made available.
144 ///
145 /// This method is cancel safety.
146 ///
147 /// [`notify()`]: Notify::notify
148 /// [`notify_n()`]: Notify::notify_n
149 #[inline]
150 pub async fn notified(&self) {
151 loop {
152 if self.fast_path() {
153 return;
154 }
155
156 listener!(self.event => listener);
157
158 if self.fast_path() {
159 return;
160 }
161
162 listener.await;
163 }
164 }
165
166 fn fast_path(&self) -> bool {
167 // to support old version rustc
168 #[allow(deprecated)]
169 self.count
170 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |c| c.checked_sub(1))
171 .is_ok()
172 }
173}
174
175pin_project! {
176 /// A [`Stream`](Stream) [`Notify`] wrapper
177 pub struct NotifyStream<T: Deref<Target=Notify>> {
178 #[pin]
179 notify: T,
180 listener: Option<EventListener>,
181 }
182}
183
184impl<T: Deref<Target = Notify>> NotifyStream<T> {
185 /// Create [`NotifyStream`] from `T`
186 pub const fn new(notify: T) -> Self {
187 Self {
188 notify,
189 listener: None,
190 }
191 }
192
193 /// acquire the inner [`T`]
194 pub fn into_inner(self) -> T {
195 self.notify
196 }
197}
198
199impl<T: Deref<Target = Notify>> AsRef<Notify> for NotifyStream<T> {
200 fn as_ref(&self) -> &Notify {
201 self.notify.deref()
202 }
203}
204
205impl<T: Deref<Target = Notify>> Stream for NotifyStream<T> {
206 type Item = ();
207
208 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
209 let this = self.project();
210 let notify = this.notify.deref();
211
212 loop {
213 if notify.fast_path() {
214 *this.listener = None;
215
216 return Poll::Ready(Some(()));
217 }
218
219 match this.listener.as_mut() {
220 None => {
221 let listener = notify.event.listen();
222 *this.listener = Some(listener);
223 }
224 Some(listener) => {
225 ready!(Pin::new(listener).poll(cx));
226 }
227 }
228 }
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use std::sync::Arc;
235
236 use futures_util::{FutureExt, StreamExt, select};
237
238 use super::*;
239
240 #[test]
241 fn test() {
242 async_global_executor::block_on(async {
243 let notify = Arc::new(Notify::new());
244 let notify2 = notify.clone();
245
246 async_global_executor::spawn(async move {
247 notify2.notify();
248 println!("sent notification");
249 })
250 .detach();
251
252 println!("received notification");
253 notify.notified().await;
254 })
255 }
256
257 #[test]
258 fn test_multi_notify() {
259 async_global_executor::block_on(async {
260 let notify = Arc::new(Notify::new());
261 let notify2 = notify.clone();
262
263 // 2 permits
264 notify.notify();
265 notify.notify();
266
267 // First completes (1 permit consumed)
268 select! {
269 _ = notify2.notified().fuse() => {}
270 default => unreachable!("there should be notified")
271 }
272
273 // Second completes (1 permit consumed)
274 select! {
275 _ = notify2.notified().fuse() => {}
276 default => unreachable!("there should be notified")
277 }
278
279 // No permits left, third would block
280 select! {
281 _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
282 default => {}
283 }
284
285 notify.notify();
286
287 // Third completes
288 select! {
289 _ = notify2.notified().fuse() => {}
290 default => unreachable!("there should be notified")
291 }
292 })
293 }
294
295 #[test]
296 fn test_notify_n() {
297 async_global_executor::block_on(async {
298 let notify = Arc::new(Notify::new());
299 let notify2 = notify.clone();
300
301 notify.notify_n(3.try_into().unwrap());
302
303 for _ in 0..3 {
304 select! {
305 _ = notify2.notified().fuse() => {}
306 default => unreachable!("there should be notified")
307 }
308 }
309
310 select! {
311 _ = notify2.notified().fuse() => unreachable!("there should not be notified"),
312 default => {}
313 }
314 })
315 }
316
317 #[test]
318 fn test_notify_waiters() {
319 async_global_executor::block_on(async {
320 let notify = Arc::new(Notify::new());
321 let notify2 = notify.clone();
322 let notify3 = notify.clone();
323
324 let t1 = async_global_executor::spawn(async move {
325 notify2.notified().await;
326 });
327 let t2 = async_global_executor::spawn(async move {
328 notify3.notified().await;
329 });
330
331 // Give tasks time to start waiting
332 async_global_executor::spawn(async {}).await;
333
334 // 1 permit, wake 2 waiters - only 1 can complete
335 notify.notify();
336 notify.notify_waiters(NonZeroUsize::new(2).unwrap());
337
338 // One completes. Add permit for the other.
339 notify.notify();
340
341 t1.await;
342 t2.await;
343 })
344 }
345
346 #[test]
347 fn stream() {
348 async_global_executor::block_on(async {
349 let notify = Arc::new(Notify::new());
350 let mut notify_stream = NotifyStream::new(notify.clone());
351
352 async_global_executor::spawn(async move {
353 notify.notify();
354 println!("sent notification");
355 })
356 .detach();
357
358 notify_stream.next().await.unwrap();
359 })
360 }
361}