diatomic_waker/
lib.rs

1//! Async, fast synchronization primitives for task wakeup.
2//!
3//! `diatomic-waker` is similar to [`atomic-waker`][atomic-waker] in that it
4//! enables concurrent updates and notifications to a wrapped `Waker`. Unlike
5//! the latter, however, it does not use spinlocks[^spinlocks] and is faster, in
6//! particular when the consumer is notified periodically rather than just once.
7//! It can in particular be used as a very fast, single-consumer [eventcount] to
8//! turn a non-blocking data structure into an asynchronous one (see MPSC
9//! channel receiver example).
10//!
11//! The API distinguishes between the entity that registers wakers ([`WakeSink`]
12//! or [`WakeSinkRef`]) and the possibly many entities that notify the waker
13//! ([`WakeSource`]s or [`WakeSourceRef`]s).
14//!
15//! Most users will prefer to use [`WakeSink`] and [`WakeSource`], which readily
16//! store a shared [`DiatomicWaker`] within an `Arc`. You may otherwise elect to
17//! allocate a [`DiatomicWaker`] yourself, but will then need to use the
18//! lifetime-bounded [`WakeSinkRef`] and [`WakeSourceRef`], or ensure by other
19//! means that waker registration is not performed concurrently.
20//!
21//! [atomic-waker]: https://docs.rs/atomic-waker/latest/atomic_waker/
22//! [eventcount]:
23//!     https://www.1024cores.net/home/lock-free-algorithms/eventcounts
24//! [^spinlocks]: The implementation of [AtomicWaker][atomic-waker] yields to the
25//!     runtime on contention, which is in effect an executor-mediated spinlock.
26//!
27//! # Features flags
28//!
29//! By default, this crate enables the `alloc` feature to provide the owned
30//! [`WakeSink`] and [`WakeSource`]. It can be made `no-std`-compatible by
31//! specifying `default-features = false`.
32//!
33//!
34//! # Examples
35//!
36//! A multi-producer, single-consumer channel of capacity 1 for sending
37//! `NonZeroUsize` values, with an asynchronous receiver:
38//!
39//! ```
40//! use std::num::NonZeroUsize;
41//! use std::sync::atomic::{AtomicUsize, Ordering};
42//! use std::sync::Arc;
43//!
44//! use diatomic_waker::{WakeSink, WakeSource};
45//!
46//! // The sending side of the channel.
47//! #[derive(Clone)]
48//! struct Sender {
49//!     wake_src: WakeSource,
50//!     value: Arc<AtomicUsize>,
51//! }
52//!
53//! // The receiving side of the channel.
54//! struct Receiver {
55//!     wake_sink: WakeSink,
56//!     value: Arc<AtomicUsize>,
57//! }
58//!
59//! // Creates an empty channel.
60//! fn channel() -> (Sender, Receiver) {
61//!     let value = Arc::new(AtomicUsize::new(0));
62//!     let wake_sink = WakeSink::new();
63//!     let wake_src = wake_sink.source();
64//!
65//!     (
66//!         Sender {
67//!             wake_src,
68//!             value: value.clone(),
69//!         },
70//!         Receiver { wake_sink, value },
71//!     )
72//! }
73//!
74//! impl Sender {
75//!     // Sends a value if the channel is empty.
76//!     fn try_send(&self, value: NonZeroUsize) -> bool {
77//!         let success = self
78//!             .value
79//!             .compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
80//!             .is_ok();
81//!         if success {
82//!             self.wake_src.notify()
83//!         };
84//!
85//!         success
86//!     }
87//! }
88//!
89//! impl Receiver {
90//!     // Receives a value asynchronously.
91//!     async fn recv(&mut self) -> NonZeroUsize {
92//!         // Wait until the predicate returns `Some(value)`, i.e. when the atomic
93//!         // value becomes non-zero.
94//!         self.wake_sink
95//!             .wait_until(|| NonZeroUsize::new(self.value.swap(0, Ordering::Relaxed)))
96//!             .await
97//!     }
98//! }
99//! ```
100//!
101//!
102//! In some case, it may be necessary to use the lower-level
103//! [`register`](WakeSink::register) and [`unregister`](WakeSink::unregister)
104//! methods rather than the [`wait_until`](WakeSink::wait_until) convenience
105//! method.
106//!
107//! This is how the behavior of the above `recv` method could be
108//! reproduced with a hand-coded future:
109//!
110//! ```
111//! use std::future::Future;
112//! # use std::num::NonZeroUsize;
113//! use std::pin::Pin;
114//! # use std::sync::atomic::{AtomicUsize, Ordering};
115//! # use std::sync::Arc;
116//! use std::task::{Context, Poll};
117//! # use diatomic_waker::WakeSink;
118//!
119//! # struct Receiver {
120//! #     wake_sink: WakeSink,
121//! #     value: Arc<AtomicUsize>,
122//! # }
123//! struct Recv<'a> {
124//!     receiver: &'a mut Receiver,
125//! }
126//!
127//! impl Future for Recv<'_> {
128//!     type Output = NonZeroUsize;
129//!
130//!     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<NonZeroUsize> {
131//!         // Avoid waker registration if a value is readily available.
132//!         let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
133//!         if let Some(value) = value {
134//!             return Poll::Ready(value);
135//!         }
136//!
137//!         // Register the waker to be polled again once a value is available.
138//!         self.receiver.wake_sink.register(cx.waker());
139//!
140//!         // Check again after registering the waker to prevent a race condition.
141//!         let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
142//!         if let Some(value) = value {
143//!             // Avoid a spurious wake-up.
144//!             self.receiver.wake_sink.unregister();
145//!
146//!             return Poll::Ready(value);
147//!         }
148//!
149//!         Poll::Pending
150//!     }
151//! }
152//! ```
153#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
154#![cfg_attr(not(test), no_std)]
155#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg_hide))]
156
157#[cfg(feature = "alloc")]
158extern crate alloc;
159
160#[cfg(feature = "alloc")]
161mod arc_waker;
162mod borrowed_waker;
163mod loom_exports;
164#[deprecated(
165    since = "0.2.0",
166    note = "items from this module are now available in the root module"
167)]
168pub mod primitives;
169mod waker;
170
171#[cfg(feature = "alloc")]
172pub use arc_waker::{WakeSink, WakeSource};
173pub use borrowed_waker::{WakeSinkRef, WakeSourceRef};
174pub use waker::{DiatomicWaker, WaitUntil};
175
176/// Tests.
177#[cfg(all(test, not(diatomic_waker_loom)))]
178mod tests {
179    use std::sync::atomic::{AtomicBool, Ordering};
180    use std::thread;
181    use std::time::Duration;
182
183    use pollster::block_on;
184
185    use super::*;
186
187    #[test]
188    fn waker_wait_until() {
189        let mut sink = WakeSink::new();
190        let source = sink.source();
191        static FLAG: AtomicBool = AtomicBool::new(false);
192
193        let t1 = thread::spawn(move || {
194            std::thread::sleep(Duration::from_millis(10));
195            source.notify(); // force a spurious notification
196            std::thread::sleep(Duration::from_millis(10));
197            FLAG.store(true, Ordering::Relaxed);
198            source.notify();
199        });
200
201        let t2 = thread::spawn(move || {
202            block_on(sink.wait_until(|| {
203                if FLAG.load(Ordering::Relaxed) {
204                    Some(())
205                } else {
206                    None
207                }
208            }));
209
210            assert!(FLAG.load(Ordering::Relaxed));
211        });
212
213        t1.join().unwrap();
214        t2.join().unwrap();
215    }
216
217    #[test]
218    fn waker_ref_wait_until() {
219        let mut w = DiatomicWaker::new();
220        let mut sink = w.sink_ref();
221        let source = sink.source_ref();
222        static FLAG: AtomicBool = AtomicBool::new(false);
223
224        thread::scope(|s| {
225            s.spawn(move || {
226                std::thread::sleep(Duration::from_millis(10));
227                source.notify(); // force a spurious notification
228                std::thread::sleep(Duration::from_millis(10));
229                FLAG.store(true, Ordering::Relaxed);
230                source.notify();
231            });
232
233            s.spawn(move || {
234                block_on(sink.wait_until(|| {
235                    if FLAG.load(Ordering::Relaxed) {
236                        Some(())
237                    } else {
238                        None
239                    }
240                }));
241
242                assert!(FLAG.load(Ordering::Relaxed));
243            });
244        });
245    }
246}
247
248/// Loom tests.
249#[cfg(all(test, diatomic_waker_loom))]
250mod tests {
251    use super::*;
252
253    use core::task::Waker;
254    use std::future::Future;
255    use std::pin::Pin;
256    use std::sync::atomic::Ordering;
257    use std::sync::Arc;
258    use std::task::{Context, Poll};
259
260    use loom::model::Builder;
261    use loom::sync::atomic::{AtomicU32, AtomicUsize};
262    use loom::thread;
263
264    use waker_fn::waker_fn;
265
266    /// A waker factory that registers notifications from the newest waker only.
267    #[derive(Clone, Default)]
268    struct MultiWaker {
269        state: Arc<AtomicU32>,
270    }
271    impl MultiWaker {
272        /// Clears the notification flag and returns the former notification
273        /// status.
274        ///
275        /// This operation has Acquire semantic when a notification is indeed
276        /// present, and Relaxed otherwise. It is therefore appropriate to
277        /// simulate a scheduler receiving a notification as it ensures that all
278        /// memory operations preceding the notification of a task are visible.
279        fn take_notification(&self) -> bool {
280            // Clear the notification flag.
281            let mut state = self.state.load(Ordering::Relaxed);
282            loop {
283                // This is basically a `fetch_or` but with an atomic memory
284                // ordering that depends on the LSB.
285                let notified_stated = state | 1;
286                let unnotified_stated = state & !1;
287                match self.state.compare_exchange_weak(
288                    notified_stated,
289                    unnotified_stated,
290                    Ordering::Acquire,
291                    Ordering::Relaxed,
292                ) {
293                    Ok(_) => return true,
294                    Err(s) => {
295                        state = s;
296                        if state == unnotified_stated {
297                            return false;
298                        }
299                    }
300                }
301            }
302        }
303
304        /// Clears the notification flag and creates a new waker.
305        fn new_waker(&self) -> Waker {
306            // Increase the epoch and clear the notification flag.
307            let mut state = self.state.load(Ordering::Relaxed);
308            let mut epoch;
309            loop {
310                // Increase the epoch by 2.
311                epoch = (state & !1) + 2;
312                match self.state.compare_exchange_weak(
313                    state,
314                    epoch,
315                    Ordering::Relaxed,
316                    Ordering::Relaxed,
317                ) {
318                    Ok(_) => break,
319                    Err(s) => state = s,
320                }
321            }
322
323            // Create a waker that only notifies if it is the newest waker.
324            let waker_state = self.state.clone();
325            waker_fn(move || {
326                let mut state = waker_state.load(Ordering::Relaxed);
327                loop {
328                    let new_state = if state & !1 == epoch {
329                        epoch | 1
330                    } else {
331                        break;
332                    };
333                    match waker_state.compare_exchange(
334                        state,
335                        new_state,
336                        Ordering::Release,
337                        Ordering::Relaxed,
338                    ) {
339                        Ok(_) => break,
340                        Err(s) => state = s,
341                    }
342                }
343            })
344        }
345    }
346
347    // A simple counter that can be used to simulate the availability of a
348    // certain number of tokens. In order to model the weakest possible
349    // predicate from the viewpoint of atomic memory ordering, only Relaxed
350    // atomic operations are used.
351    #[derive(Clone, Default)]
352    struct Counter {
353        count: Arc<AtomicUsize>,
354    }
355    impl Counter {
356        fn increment(&self) {
357            self.count.fetch_add(1, Ordering::Relaxed);
358        }
359        fn try_decrement(&self) -> bool {
360            let mut count = self.count.load(Ordering::Relaxed);
361            loop {
362                if count == 0 {
363                    return false;
364                }
365                match self.count.compare_exchange(
366                    count,
367                    count - 1,
368                    Ordering::Relaxed,
369                    Ordering::Relaxed,
370                ) {
371                    Ok(_) => return true,
372                    Err(c) => count = c,
373                }
374            }
375        }
376    }
377
378    /// Test whether notifications may be lost.
379    ///
380    /// Make a certain amount of tokens available and notify the sink each time
381    /// a token is made available. Optionally, it is possible to:
382    /// - request that `max_spurious_wake` threads will simulate a spurious
383    ///   wake-up,
384    /// - change the waker each time it is polled.
385    ///
386    /// A default preemption bound will be applied if none was specified through
387    /// an environment variable.
388    fn loom_notify(
389        token_count: usize,
390        max_spurious_wake: usize,
391        change_waker: bool,
392        preemption_bound: usize,
393    ) {
394        // Only set the preemption bound if it wasn't already specified via a environment variable.
395        let mut builder = Builder::new();
396        if builder.preemption_bound.is_none() {
397            builder.preemption_bound = Some(preemption_bound);
398        }
399
400        builder.check(move || {
401            let token_counter = Counter::default();
402            let mut wake_sink = WakeSink::new();
403
404            for src_id in 0..token_count {
405                thread::spawn({
406                    let token_counter = token_counter.clone();
407                    let wake_src = wake_sink.source();
408
409                    move || {
410                        if src_id < max_spurious_wake {
411                            wake_src.notify();
412                        }
413                        token_counter.increment();
414                        wake_src.notify();
415                    }
416                });
417            }
418
419            let multi_waker = MultiWaker::default();
420            let mut waker = multi_waker.new_waker();
421            let mut satisfied_predicates_count = 0;
422
423            // Iterate until all tokens are "received".
424            //
425            // Note: the loop does not have any assertion. This is by design:
426            // missed notifications will be caught by Loom with a `Model
427            // exceeded maximum number of branches` error because the spin loop
428            // will then spin forever.
429            while satisfied_predicates_count < token_count {
430                let mut wait_until = wake_sink.wait_until(|| {
431                    if token_counter.try_decrement() {
432                        Some(())
433                    } else {
434                        None
435                    }
436                });
437
438                // Poll the predicate until it is satisfied.
439                loop {
440                    let mut cx = Context::from_waker(&waker);
441                    let poll_state = Pin::new(&mut wait_until).poll(&mut cx);
442
443                    if poll_state == Poll::Ready(()) {
444                        satisfied_predicates_count += 1;
445                        break;
446                    }
447
448                    // Simulate the scheduler by spinning until the next
449                    // notification.
450                    while !multi_waker.take_notification() {
451                        thread::yield_now();
452                    }
453
454                    if change_waker {
455                        waker = multi_waker.new_waker();
456                    }
457                }
458            }
459        });
460    }
461
462    #[test]
463    fn loom_notify_two_tokens() {
464        const DEFAULT_PREEMPTION_BOUND: usize = 4;
465
466        loom_notify(2, 0, false, DEFAULT_PREEMPTION_BOUND);
467    }
468
469    #[test]
470    fn loom_notify_two_tokens_one_spurious() {
471        const DEFAULT_PREEMPTION_BOUND: usize = 4;
472
473        loom_notify(2, 1, false, DEFAULT_PREEMPTION_BOUND);
474    }
475
476    #[test]
477    fn loom_notify_two_tokens_change_waker() {
478        const DEFAULT_PREEMPTION_BOUND: usize = 3;
479
480        loom_notify(2, 0, true, DEFAULT_PREEMPTION_BOUND);
481    }
482
483    #[test]
484    fn loom_notify_two_tokens_one_spurious_change_waker() {
485        const DEFAULT_PREEMPTION_BOUND: usize = 3;
486
487        loom_notify(2, 1, true, DEFAULT_PREEMPTION_BOUND);
488    }
489
490    #[test]
491    fn loom_notify_three_tokens() {
492        const DEFAULT_PREEMPTION_BOUND: usize = 2;
493
494        loom_notify(3, 0, false, DEFAULT_PREEMPTION_BOUND);
495    }
496
497    #[test]
498    /// Test whether concurrent read and write access to the waker is possible.
499    ///
500    /// 3 different wakers are registered to force a waker slot to be re-used.
501    fn loom_waker_slot_reuse() {
502        // This tests require a high preemption bound to catch typical atomic
503        // memory ordering mistakes.
504        const DEFAULT_PREEMPTION_BOUND: usize = 5;
505
506        // Only set the preemption bound if it wasn't already specified via a
507        // environment variable.
508        let mut builder = Builder::new();
509        if builder.preemption_bound.is_none() {
510            builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
511        }
512
513        builder.check(move || {
514            let mut wake_sink = WakeSink::new();
515
516            thread::spawn({
517                let wake_src = wake_sink.source();
518
519                move || {
520                    wake_src.notify();
521                }
522            });
523            thread::spawn({
524                let wake_src = wake_sink.source();
525
526                move || {
527                    wake_src.notify();
528                    wake_src.notify();
529                }
530            });
531
532            let multi_waker = MultiWaker::default();
533            for _ in 0..3 {
534                let waker = multi_waker.new_waker();
535                wake_sink.register(&waker);
536            }
537        });
538    }
539}