maniac_runtime/future/event/
mod.rs

1//! An efficient async condition variable for lock-free algorithms, a.k.a.
2//! "eventcount".
3//!
4//! [Eventcount][eventcount]-like primitives are useful to make some operations
5//! on a lock-free structure blocking, for instance to transform bounded queues
6//! into bounded channels. Such a primitive allows an interested task to block
7//! until a predicate is satisfied by checking the predicate each time it
8//! receives a notification.
9//!
10//! While functionally similar to the [event_listener] crate, this
11//! implementation is more opinionated and limited to the `async` case. It
12//! strives to be more efficient, however, by limiting the amount of locking
13//! operations on the mutex-protected list of notifiers: the lock is typically
14//! taken only once for each time a waiter is blocked and once for notifying,
15//! thus reducing the need for synchronization operations. Finally, spurious
16//! wake-ups are only generated in very rare circumstances.
17//!
18//! This library is an offshoot of [Asynchronix][asynchronix], an ongoing effort
19//! at a high performance asynchronous computation framework for system
20//! simulation.
21//!
22//! [event_listener]: https://docs.rs/event_listener/latest/event_listener/
23//! [eventcount]:
24//!     https://www.1024cores.net/home/lock-free-algorithms/eventcounts
25//! [asynchronix]: https://github.com/asynchronics/asynchronix
26//!
27//! # Examples
28//!
29//! Wait until a non-zero value has been sent asynchronously.
30//!
31//! ```ignore
32//! use std::sync::atomic::{AtomicUsize, Ordering};
33//! use std::sync::Arc;
34//! use std::thread;
35//!
36//! use futures_executor::block_on;
37//!
38//! use async_event::Event;
39//!
40//!
41//! let value = Arc::new(AtomicUsize::new(0));
42//! let event = Arc::new(Event::new());
43//!
44//! // Set a non-zero value concurrently.
45//! thread::spawn({
46//!     let value = value.clone();
47//!     let event = event.clone();
48//!
49//!     move || {
50//!         // A relaxed store is sufficient here: `Event::notify*` methods insert
51//!         // atomic fences to warrant adequate synchronization.
52//!         value.store(42, Ordering::Relaxed);
53//!         event.notify_one();
54//!     }
55//! });
56//!
57//! // Wait until the value is set.
58//! block_on(async move {
59//!     let v = event
60//!         .wait_until(|| {
61//!             // A relaxed load is sufficient here: `Event::wait_until` inserts
62//!             // atomic fences to warrant adequate synchronization.
63//!             let v = value.load(Ordering::Relaxed);
64//!             if v != 0 { Some(v) } else { None }
65//!         })
66//!         .await;
67//!
68//!      assert_eq!(v, 42);
69//! });
70//! ```ignore
71#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
72
73mod loom_exports;
74
75use std::future::Future;
76use std::mem;
77use std::pin::Pin;
78use std::ptr::NonNull;
79use std::sync::atomic::Ordering;
80use std::task::{Context, Poll, Waker};
81
82use loom_exports::cell::UnsafeCell;
83use loom_exports::sync::Mutex;
84use loom_exports::sync::atomic::{self, AtomicBool};
85use pin_project_lite::pin_project;
86
87/// An object that can receive or send notifications.
88#[derive(Debug)]
89pub struct Event {
90    wait_set: WaitSet,
91}
92
93impl Event {
94    /// Creates a new event.
95    pub fn new() -> Self {
96        Self {
97            wait_set: WaitSet::default(),
98        }
99    }
100
101    /// Notify a number of awaiting events that the predicate should be checked.
102    ///
103    /// If less events than requested are currently awaiting, then all awaiting
104    /// event are notified.
105    #[inline(always)]
106    pub fn notify(&self, n: usize) {
107        // This fence synchronizes with the other fence in `WaitUntil::poll` and
108        // ensures that either the `poll` method will successfully check the
109        // predicate set before this call, or the notifier inserted by `poll`
110        // will be visible in the wait list when calling `WaitSet::notify` (or
111        // both).
112        atomic::fence(Ordering::SeqCst);
113
114        // Safety: all notifiers in the wait set are guaranteed to be alive
115        // since the `WaitUntil` drop handler ensures that notifiers are removed
116        // from the wait set before they are deallocated.
117        unsafe {
118            self.wait_set.notify_relaxed(n);
119        }
120    }
121
122    /// Notify one awaiting event (if any) that the predicate should be checked.
123    #[inline(always)]
124    pub fn notify_one(&self) {
125        self.notify(1);
126    }
127
128    /// Notify all awaiting events that the predicate should be checked.
129    #[inline(always)]
130    pub fn notify_all(&self) {
131        self.notify(usize::MAX);
132    }
133
134    /// Returns a future that can be `await`ed until the provided predicate is
135    /// satisfied.
136    pub fn wait_until<F, T>(&self, predicate: F) -> WaitUntil<'_, F, T>
137    where
138        F: FnMut() -> Option<T>,
139    {
140        WaitUntil::new(&self.wait_set, predicate)
141    }
142
143    /// Returns a future that can be `await`ed until the provided predicate is
144    /// satisfied or until the provided future completes.
145    ///
146    /// The deadline is specified as a `Future` that is expected to resolves to
147    /// `()` after some duration, such as a `tokio::time::Sleep` future.
148    pub fn wait_until_or_timeout<F, T, D>(
149        &self,
150        predicate: F,
151        deadline: D,
152    ) -> WaitUntilOrTimeout<'_, F, T, D>
153    where
154        F: FnMut() -> Option<T>,
155        D: Future<Output = ()>,
156    {
157        WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline)
158    }
159}
160
161impl Default for Event {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166
167unsafe impl Send for Event {}
168unsafe impl Sync for Event {}
169
170/// A waker wrapper that can be inserted in a list.
171///
172/// A notifier always has an exclusive owner or borrower, except in one edge
173/// case: the `WaitSet::remove_relaxed()` method may create a shared reference
174/// while the notifier is concurrently accessed under the `wait_set` mutex by
175/// one of the `WaitSet` methods. So occasionally 2 references to a `Notifier`
176/// will exist at the same time, meaning that even when accessed under the
177/// `wait_set` mutex, a notifier can only be accessed by reference.
178struct Notifier {
179    /// The current waker, if any.
180    waker: Option<Waker>,
181    /// Pointer to the previous wait set notifier.
182    prev: UnsafeCell<Option<NonNull<Notifier>>>,
183    /// Pointer to the next wait set notifier.
184    next: UnsafeCell<Option<NonNull<Notifier>>>,
185    /// Flag indicating whether the notifier is currently in the wait set.
186    in_wait_set: AtomicBool,
187}
188
189impl Notifier {
190    /// Creates a new Notifier without any registered waker.
191    fn new() -> Self {
192        Self {
193            waker: None,
194            prev: UnsafeCell::new(None),
195            next: UnsafeCell::new(None),
196            in_wait_set: AtomicBool::new(false),
197        }
198    }
199
200    /// Stores the specified waker if it differs from the cached waker.
201    fn set_waker(&mut self, waker: &Waker) {
202        if match &self.waker {
203            Some(w) => !w.will_wake(waker),
204            None => true,
205        } {
206            self.waker = Some(waker.clone());
207        }
208    }
209
210    /// Notifies the task.
211    fn wake(&self) {
212        // Safety: the waker is only ever accessed mutably when the notifier is
213        // itself accessed mutably. The caller claims shared (non-mutable)
214        // ownership of the notifier, so there is not possible concurrent
215        // mutable access to the notifier and therefore to the waker.
216        if let Some(w) = &self.waker {
217            w.wake_by_ref();
218        }
219    }
220}
221
222unsafe impl Send for Notifier {}
223unsafe impl Sync for Notifier {}
224
225/// A future that can be `await`ed until a predicate is satisfied.
226#[derive(Debug)]
227pub struct WaitUntil<'a, F: FnMut() -> Option<T>, T> {
228    state: WaitUntilState,
229    predicate: F,
230    wait_set: &'a WaitSet,
231}
232
233impl<'a, F: FnMut() -> Option<T>, T> WaitUntil<'a, F, T> {
234    /// Creates a future associated with the specified event sink that can be
235    /// `await`ed until the specified predicate is satisfied.
236    fn new(wait_set: &'a WaitSet, predicate: F) -> Self {
237        Self {
238            state: WaitUntilState::Idle,
239            predicate,
240            wait_set,
241        }
242    }
243}
244
245impl<F: FnMut() -> Option<T>, T> Drop for WaitUntil<'_, F, T> {
246    fn drop(&mut self) {
247        if let WaitUntilState::Polled(notifier) = self.state {
248            // If we are in the `Polled` stated, it means that the future was
249            // cancelled and its notifier may still be in the wait set: it is
250            // necessary to cancel the notifier so that another event sink can
251            // be notified if one is registered, and then to deallocate the
252            // notifier.
253            //
254            // Safety: all notifiers in the wait set are guaranteed to be alive
255            // since this drop handler ensures that notifiers are removed from
256            // the wait set before they are deallocated. After the notifier is
257            // removed from the list we can claim unique ownership and
258            // deallocate the notifier.
259            unsafe {
260                self.wait_set.cancel(notifier);
261                let _ = Box::from_raw(notifier.as_ptr());
262            }
263        }
264    }
265}
266
267impl<'a, F: FnMut() -> Option<T>, T> Unpin for WaitUntil<'a, F, T> {}
268
269unsafe impl<F: (FnMut() -> Option<T>) + Send, T: Send> Send for WaitUntil<'_, F, T> {}
270
271impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
272    type Output = T;
273
274    #[inline]
275    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
276        assert!(self.state != WaitUntilState::Completed);
277
278        // Remove the notifier if it is in the wait set. In most cases this will
279        // be a cheap no-op because, unless the wake-up is spurious, the
280        // notifier was already removed from the wait set.
281        //
282        // Removing the notifier before checking the predicate is necessary to
283        // avoid races such as this one:
284        //
285        // 1) event sink A unsuccessfully checks the predicate, inserts its
286        //    notifier in the wait set, unsuccessfully re-checks the predicate,
287        //    returns `Poll::Pending`,
288        // 2) event sink B unsuccessfully checks the predicate, inserts its
289        //    notifier in the wait set, unsuccessfully re-checks the predicate,
290        //    returns `Poll::Pending`,
291        // 3) the event source makes one predicate satisfiable,
292        // 4) event sink A is spuriously awaken and successfully checks the
293        //    predicates, returns `Poll::Ready`,
294        // 5) the event source notifies event sink B,
295        // 6) event sink B is awaken and unsuccessfully checks the predicate,
296        //    inserts its notifier in the wait set, unsuccessfully re-checks the
297        //    predicate, returns `Poll::Pending`,
298        // 7) the event source makes another predicate satisfiable.
299        // 8) if now the notifier of event sink A was not removed from the wait
300        //    set, the event source may notify event sink A (which is no longer
301        //    interested) rather than event sink B, meaning that event sink B
302        //    will never be notified.
303        if let WaitUntilState::Polled(notifier) = self.state {
304            // Safety: all notifiers in the wait set are guaranteed to be alive
305            // since the `WaitUntil` drop handler ensures that notifiers are
306            // removed from the wait set before they are deallocated. Using the
307            // relaxed version of `notify` is enough since the notifier was
308            // inserted in the same future so there exists a happen-before
309            // relationship with the insertion operation.
310            unsafe { self.wait_set.remove_relaxed(notifier) };
311        }
312
313        // Fast path.
314        if let Some(v) = (self.predicate)() {
315            if let WaitUntilState::Polled(notifier) = self.state {
316                // Safety: the notifier is no longer in the wait set so we can
317                // claim unique ownership and deallocate the notifier.
318                let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
319            }
320
321            self.state = WaitUntilState::Completed;
322
323            return Poll::Ready(v);
324        }
325
326        let mut notifier = if let WaitUntilState::Polled(notifier) = self.state {
327            notifier
328        } else {
329            unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(Notifier::new()))) }
330        };
331
332        // Set or update the notifier.
333        //
334        // Safety: the notifier is not (or no longer) in the wait list so we
335        // have exclusive ownership.
336        let waker = cx.waker();
337        unsafe { notifier.as_mut().set_waker(waker) };
338
339        // Safety: all notifiers in the wait set are guaranteed to be alive
340        // since the `WaitUntil` drop handler ensures that notifiers are removed
341        // from the wait set before they are deallocated.
342        unsafe { self.wait_set.insert(notifier) };
343
344        // This fence synchronizes with the other fence in `Event::notify` and
345        // ensures that either the predicate below will be satisfied or the
346        // event source will see the notifier inserted above in the wait list
347        // after it makes the predicate satisfiable (or both).
348        atomic::fence(Ordering::SeqCst);
349
350        if let Some(v) = (self.predicate)() {
351            // We need to cancel and not merely remove the notifier from the
352            // wait set so that another event sink can be notified in case we
353            // have been notified just after checking the predicate. This is an
354            // example of race that makes this necessary:
355            //
356            // 1) event sink A and event sink B both unsuccessfully check the
357            //    predicate,
358            // 2) the event source makes one predicate satisfiable and tries to
359            //    notify an event sink but fails since no notifier has been
360            //    inserted in the wait set yet,
361            // 3) event sink A and event sink B both insert their notifier in
362            //    the wait set,
363            // 4) event sink A re-checks the predicate, successfully,
364            // 5) event sink B re-checks the predicate, unsuccessfully,
365            // 6) the event source makes another predicate satisfiable,
366            // 7) the event source sends a notification for the second predicate
367            //    but unfortunately chooses the "wrong" notifier in the wait
368            //    set, i.e. that of event sink A -- note that this is always
369            //    possible irrespective of FIFO or LIFO ordering because it also
370            //    depends on the order of notifier insertion in step 3)
371            // 8) if, before returning, event sink A merely removes itself from
372            //    the wait set without notifying another event sink, then event
373            //    sink B will never be notified.
374            //
375            // Safety: all notifiers in the wait set are guaranteed to be alive
376            // since the `WaitUntil` drop handler ensures that notifiers are
377            // removed from the wait set before they are deallocated.
378            unsafe {
379                self.wait_set.cancel(notifier);
380            }
381
382            self.state = WaitUntilState::Completed;
383
384            // Safety: the notifier is not longer in the wait set so we can
385            // claim unique ownership and deallocate the notifier.
386            let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
387
388            return Poll::Ready(v);
389        }
390
391        self.state = WaitUntilState::Polled(notifier);
392
393        Poll::Pending
394    }
395}
396
397/// State of the `WaitUntil` future.
398#[derive(Debug, PartialEq)]
399enum WaitUntilState {
400    Idle,
401    Polled(NonNull<Notifier>),
402    Completed,
403}
404
405pin_project! {
406    /// A future that can be `await`ed until a predicate is satisfied or until a
407    /// deadline elapses.
408    pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option<T>, T, D: Future<Output = ()>> {
409        wait_until: WaitUntil<'a, F, T>,
410        #[pin]
411        deadline: D,
412    }
413}
414
415impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D>
416where
417    F: FnMut() -> Option<T>,
418    D: Future<Output = ()>,
419{
420    /// Creates a future associated with the specified event sink that can be
421    /// `await`ed until the specified predicate is satisfied, or until the
422    /// specified timeout future completes.
423    fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self {
424        Self {
425            wait_until: WaitUntil::new(wait_set, predicate),
426            deadline,
427        }
428    }
429}
430
431impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D>
432where
433    F: FnMut() -> Option<T>,
434    D: Future<Output = ()>,
435{
436    type Output = Option<T>;
437
438    #[inline]
439    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
440        let this = self.project();
441
442        if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) {
443            Poll::Ready(Some(value))
444        } else if this.deadline.poll(cx).is_ready() {
445            Poll::Ready(None)
446        } else {
447            Poll::Pending
448        }
449    }
450}
451
452/// A set of notifiers.
453///
454/// The set wraps a Mutex-protected list of notifiers and manages a flag for
455/// fast assessment of list emptiness.
456#[derive(Debug)]
457struct WaitSet {
458    list: Mutex<List>,
459    is_empty: AtomicBool,
460}
461
462impl WaitSet {
463    /// Inserts a node in the wait set.
464    ///
465    /// # Safety
466    ///
467    /// The specified notifier and all notifiers in the wait set must be alive.
468    /// The notifier should not be already in the wait set.
469    unsafe fn insert(&self, notifier: NonNull<Notifier>) {
470        let mut list = self.list.lock().unwrap();
471
472        #[cfg(any(debug_assertions, all(test, async_event_loom)))]
473        if unsafe { notifier.as_ref().in_wait_set.load(Ordering::Relaxed) } {
474            drop(list); // avoids poisoning the lock
475            panic!("the notifier was already in the wait set");
476        }
477
478        // Orderings: Relaxed ordering is sufficient since before this point the
479        // notifier was not in the list and therefore not shared.
480        unsafe { notifier.as_ref().in_wait_set.store(true, Ordering::Relaxed) };
481
482        unsafe { list.push_back(notifier) };
483
484        // Ordering: since this flag is only ever mutated within the
485        // mutex-protected critical section, Relaxed ordering is sufficient.
486        self.is_empty.store(false, Ordering::Relaxed);
487    }
488
489    /// Remove the specified notifier if it is still in the wait set.
490    ///
491    /// After a call to `remove`, the caller is guaranteed that the wait set
492    /// will no longer access the specified notifier.
493    ///
494    /// Note that for performance reasons, the presence of the notifier in the
495    /// list is checked without acquiring the lock. This fast check will never
496    /// lead to a notifier staying in the list as long as there exists an
497    /// happens-before relationship between this call and the earlier call to
498    /// `insert`. A happens-before relationship always exists if these calls are
499    /// made on the same thread or across `await` points.
500    ///
501    /// # Safety
502    ///
503    /// The specified notifier and all notifiers in the wait set must be alive.
504    /// This function may fail to remove the notifier if a happens-before
505    /// relationship does not exist with the previous call to `insert`.
506    unsafe fn remove_relaxed(&self, notifier: NonNull<Notifier>) {
507        // Preliminarily check whether the notifier is already in the list (fast
508        // path).
509        //
510        // This is the only instance where the `in_wait_set` flag is accessed
511        // outside the mutex-protected critical section while the notifier may
512        // still be in the list. The only risk is that the load will be stale
513        // and will read `true` even though the notifier is no longer in the
514        // list, but this is not an issue since in that case the actual state
515        // will be checked again after taking the lock.
516        //
517        // Ordering: Acquire synchronizes with the `Release` orderings in the
518        // `notify` and `cancel` methods; it is necessary to ensure that the
519        // waker is no longer in use by the wait set and can therefore be
520        // modified after returning from `remove`.
521        let in_wait_set = unsafe { notifier.as_ref().in_wait_set.load(Ordering::Acquire) };
522        if !in_wait_set {
523            return;
524        }
525
526        unsafe { self.remove(notifier) };
527    }
528
529    /// Remove the specified notifier if it is still in the wait set.
530    ///
531    /// After a call to `remove`, the caller is guaranteed that the wait set
532    /// will no longer access the specified notifier.
533    ///
534    /// # Safety
535    ///
536    /// The specified notifier and all notifiers in the wait set must be alive.
537    unsafe fn remove(&self, notifier: NonNull<Notifier>) {
538        let mut list = self.list.lock().unwrap();
539
540        // Check again whether the notifier is already in the list
541        //
542        // Ordering: since this flag is only ever mutated within the
543        // mutex-protected critical section and since the wait set also accesses
544        // the waker only in the critical section, even with Relaxed ordering it
545        // is guaranteed that if `in_wait_set` reads `false` then the waker is
546        // no longer in use by the wait set.
547        let in_wait_set = unsafe { notifier.as_ref().in_wait_set.load(Ordering::Relaxed) };
548        if !in_wait_set {
549            return;
550        }
551
552        unsafe { list.remove(notifier) };
553        if list.is_empty() {
554            // Ordering: since this flag is only ever mutated within the
555            // mutex-protected critical section, Relaxed ordering is sufficient.
556            self.is_empty.store(true, Ordering::Relaxed);
557        }
558
559        // Ordering: this flag is only ever mutated within the mutex-protected
560        // critical section and since the waker is not accessed in this method,
561        // it does not need to synchronize with a later call to `remove`;
562        // therefore, Relaxed ordering is sufficient.
563        unsafe {
564            notifier
565                .as_ref()
566                .in_wait_set
567                .store(false, Ordering::Relaxed)
568        };
569    }
570
571    /// Remove the specified notifier if it is still in the wait set, otherwise
572    /// notify another event sink.
573    ///
574    /// After a call to `cancel`, the caller is guaranteed that the wait set
575    /// will no longer access the specified notifier.
576    ///
577    /// # Safety
578    ///
579    /// The specified notifier and all notifiers in the wait set must be alive.
580    /// Wakers of notifiers which pointer is in the wait set may not be accessed
581    /// mutably.
582    unsafe fn cancel(&self, notifier: NonNull<Notifier>) {
583        let mut list = self.list.lock().unwrap();
584
585        let in_wait_set = unsafe { notifier.as_ref().in_wait_set.load(Ordering::Relaxed) };
586        if in_wait_set {
587            unsafe { list.remove(notifier) };
588            if list.is_empty() {
589                self.is_empty.store(true, Ordering::Relaxed);
590            }
591
592            // Ordering: this flag is only ever mutated within the
593            // mutex-protected critical section and since the waker is not
594            // accessed, it does not need to synchronize with the Acquire load
595            // in the `remove` method; therefore, Relaxed ordering is
596            // sufficient.
597            unsafe {
598                notifier
599                    .as_ref()
600                    .in_wait_set
601                    .store(false, Ordering::Relaxed)
602            };
603        } else if let Some(other_notifier) = unsafe { list.pop_front() } {
604            // Safety: the waker can be accessed by reference because the
605            // event sink is not allowed to access the waker mutably before
606            // `in_wait_set` is cleared.
607            unsafe { other_notifier.as_ref().wake() };
608
609            // Ordering: the Release memory ordering synchronizes with the
610            // Acquire ordering in the `remove` method; it is required to
611            // ensure that once `in_wait_set` reads `false` (using Acquire
612            // ordering), the waker is no longer in use by the wait set and
613            // can therefore be modified.
614            unsafe {
615                other_notifier
616                    .as_ref()
617                    .in_wait_set
618                    .store(false, Ordering::Release)
619            };
620        }
621    }
622
623    /// Send a notification to `count` notifiers within the wait set, or to all
624    /// notifiers if the wait set contains less than `count` notifiers.
625    ///
626    /// Note that for performance reasons, list emptiness is checked without
627    /// acquiring the wait set lock. Therefore, in order to prevent the
628    /// possibility that a wait set is seen as empty when it isn't, external
629    /// synchronization is required to make sure that all side effects of a
630    /// previous call to `insert` are fully visible. For instance, an atomic
631    /// memory fence maye be placed before this call and another one after the
632    /// insertion of a notifier.
633    ///
634    /// # Safety
635    ///
636    /// All notifiers in the wait set must be alive. Wakers of notifiers which
637    /// pointer is in the wait set may not be accessed mutably.
638    #[inline(always)]
639    unsafe fn notify_relaxed(&self, count: usize) {
640        let is_empty = self.is_empty.load(Ordering::Relaxed);
641        if is_empty {
642            return;
643        }
644
645        unsafe { self.notify(count) };
646    }
647
648    /// Send a notification to `count` notifiers within the wait set, or to all
649    /// notifiers if the wait set contains less than `count` notifiers.
650    ///
651    /// # Safety
652    ///
653    /// All notifiers in the wait set must be alive. Wakers of notifiers which
654    /// pointer is in the wait set may not be accessed mutably.
655    unsafe fn notify(&self, count: usize) {
656        let mut list = self.list.lock().unwrap();
657        for _ in 0..count {
658            let notifier = {
659                if let Some(notifier) = unsafe { list.pop_front() } {
660                    if list.is_empty() {
661                        self.is_empty.store(true, Ordering::Relaxed);
662                    }
663                    notifier
664                } else {
665                    return;
666                }
667            };
668
669            // Note: the event sink must be notified before the end of the
670            // mutex-protected critical section. Otherwise, a concurrent call to
671            // `remove` could succeed in taking the lock before the waker has
672            // been called, and seeing that the notifier is no longer in the
673            // list would lead its caller to believe that it has now sole
674            // ownership on the notifier even though the call to `wake` has yet
675            // to be made.
676            //
677            // Safety: the waker can be accessed by reference since the event
678            // sink is not allowed to access the waker mutably before
679            // `in_wait_set` is cleared.
680            unsafe { notifier.as_ref().wake() };
681
682            // Ordering: the Release memory ordering synchronizes with the
683            // Acquire ordering in the `remove` method; it is required to ensure
684            // that once `in_wait_set` reads `false` (using Acquire ordering),
685            // the waker can be safely modified.
686            unsafe {
687                notifier
688                    .as_ref()
689                    .in_wait_set
690                    .store(false, Ordering::Release)
691            };
692        }
693    }
694}
695
696impl Default for WaitSet {
697    fn default() -> Self {
698        Self {
699            list: Default::default(),
700            is_empty: AtomicBool::new(true),
701        }
702    }
703}
704
705#[derive(Default, Debug)]
706struct List {
707    front: Option<NonNull<Notifier>>,
708    back: Option<NonNull<Notifier>>,
709}
710
711impl List {
712    /// Inserts a node at the back of the list.
713    ///
714    /// # Safety
715    ///
716    /// The provided notifier and all notifiers which pointer is in the list
717    /// must be alive.
718    unsafe fn push_back(&mut self, notifier: NonNull<Notifier>) {
719        // Safety: the `prev` and `next` pointers are only be accessed when the
720        // list is locked.
721        let old_back = mem::replace(&mut self.back, Some(notifier));
722        match old_back {
723            None => self.front = Some(notifier),
724            Some(prev) => unsafe { prev.as_ref().next.with_mut(|n| *n = Some(notifier)) },
725        }
726
727        // Link the new notifier.
728        let notifier = unsafe { notifier.as_ref() };
729        notifier.prev.with_mut(|n| unsafe { *n = old_back });
730        notifier.next.with_mut(|n| unsafe { *n = None });
731    }
732
733    /// Removes and returns the notifier at the front of the list, if any.
734    ///
735    /// # Safety
736    ///
737    /// All notifiers which pointer is in the list must be alive.
738    unsafe fn pop_front(&mut self) -> Option<NonNull<Notifier>> {
739        let notifier = self.front?;
740
741        // Unlink from the next notifier.
742        let next = unsafe { notifier.as_ref().next.with(|n| *n) };
743        self.front = next;
744        match next {
745            None => self.back = None,
746            Some(next) => unsafe { next.as_ref().prev.with_mut(|n| *n = None) },
747        }
748
749        Some(notifier)
750    }
751
752    /// Removes the specified notifier.
753    ///
754    /// # Safety
755    ///
756    /// The specified notifier and all notifiers which pointer is in the list
757    /// must be alive.
758    unsafe fn remove(&mut self, notifier: NonNull<Notifier>) {
759        // Unlink from the previous and next notifiers.
760        let prev = unsafe { notifier.as_ref().prev.with(|n| *n) };
761        let next = unsafe { notifier.as_ref().next.with(|n| *n) };
762        match prev {
763            None => self.front = next,
764            Some(prev) => unsafe { prev.as_ref().next.with_mut(|n| *n = next) },
765        }
766        match next {
767            None => self.back = prev,
768            Some(next) => unsafe { next.as_ref().prev.with_mut(|n| *n = prev) },
769        }
770    }
771
772    /// Returns `true` if the list is empty.
773    fn is_empty(&self) -> bool {
774        self.front.is_none()
775    }
776}
777
778/// Non-loom tests.
779#[cfg(all(test, not(async_event_loom)))]
780mod tests {
781    use super::*;
782
783    use std::sync::Arc;
784    use std::sync::atomic::AtomicUsize;
785    use std::thread;
786
787    use crate::future::block_on;
788
789    #[test]
790    fn smoke() {
791        static SIGNAL: AtomicBool = AtomicBool::new(false);
792
793        let event = Arc::new(Event::new());
794
795        let th_recv = {
796            let event = event.clone();
797            thread::spawn(move || {
798                block_on(async move {
799                    event
800                        .wait_until(|| {
801                            if SIGNAL.load(Ordering::Relaxed) {
802                                Some(())
803                            } else {
804                                None
805                            }
806                        })
807                        .await;
808
809                    assert!(SIGNAL.load(Ordering::Relaxed));
810                })
811            })
812        };
813
814        SIGNAL.store(true, Ordering::Relaxed);
815        event.notify_one();
816
817        th_recv.join().unwrap();
818    }
819
820    #[test]
821    fn one_to_many() {
822        const RECEIVER_COUNT: usize = 4;
823        static SIGNAL: AtomicBool = AtomicBool::new(false);
824
825        let event = Arc::new(Event::new());
826
827        let th_recv: Vec<_> = (0..RECEIVER_COUNT)
828            .map(|_| {
829                let event = event.clone();
830                thread::spawn(move || {
831                    block_on(async move {
832                        event
833                            .wait_until(|| {
834                                if SIGNAL.load(Ordering::Relaxed) {
835                                    Some(())
836                                } else {
837                                    None
838                                }
839                            })
840                            .await;
841
842                        assert!(SIGNAL.load(Ordering::Relaxed));
843                    })
844                })
845            })
846            .collect();
847
848        SIGNAL.store(true, Ordering::Relaxed);
849        event.notify_one();
850        event.notify(3);
851
852        for th in th_recv {
853            th.join().unwrap();
854        }
855    }
856
857    #[test]
858    fn many_to_many() {
859        const TOKEN_COUNT: usize = 4;
860        static AVAILABLE_TOKENS: AtomicUsize = AtomicUsize::new(0);
861
862        let event = Arc::new(Event::new());
863
864        // Receive tokens from multiple threads.
865        let th_recv: Vec<_> = (0..TOKEN_COUNT)
866            .map(|_| {
867                let event = event.clone();
868                thread::spawn(move || {
869                    block_on(async move {
870                        event
871                            .wait_until(|| {
872                                AVAILABLE_TOKENS
873                                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |t| {
874                                        if t > 0 { Some(t - 1) } else { None }
875                                    })
876                                    .ok()
877                            })
878                            .await;
879                    })
880                })
881            })
882            .collect();
883
884        // Make tokens available from multiple threads.
885        let th_send: Vec<_> = (0..TOKEN_COUNT)
886            .map(|_| {
887                let event = event.clone();
888                thread::spawn(move || {
889                    AVAILABLE_TOKENS.fetch_add(1, Ordering::Relaxed);
890                    event.notify_one();
891                })
892            })
893            .collect();
894
895        for th in th_recv {
896            th.join().unwrap();
897        }
898        for th in th_send {
899            th.join().unwrap();
900        }
901
902        assert!(AVAILABLE_TOKENS.load(Ordering::Relaxed) == 0);
903    }
904
905    #[test]
906    fn notify_all() {
907        const RECEIVER_COUNT: usize = 4;
908        static SIGNAL: AtomicBool = AtomicBool::new(false);
909
910        let event = Arc::new(Event::new());
911
912        let th_recv: Vec<_> = (0..RECEIVER_COUNT)
913            .map(|_| {
914                let event = event.clone();
915                thread::spawn(move || {
916                    block_on(async move {
917                        event
918                            .wait_until(|| {
919                                if SIGNAL.load(Ordering::Relaxed) {
920                                    Some(())
921                                } else {
922                                    None
923                                }
924                            })
925                            .await;
926
927                        assert!(SIGNAL.load(Ordering::Relaxed));
928                    })
929                })
930            })
931            .collect();
932
933        SIGNAL.store(true, Ordering::Relaxed);
934        event.notify_all();
935
936        for th in th_recv {
937            th.join().unwrap();
938        }
939    }
940}
941
942/// Loom tests.
943#[cfg(all(test, async_event_loom))]
944mod tests {
945    use super::*;
946
947    use std::future::Future;
948    use std::marker::PhantomPinned;
949    use std::task::{Context, Poll};
950
951    use loom::model::Builder;
952    use loom::sync::Arc;
953    use loom::sync::atomic::AtomicUsize;
954    use loom::thread;
955
956    use waker_fn::waker_fn;
957
958    /// A waker factory that accepts notifications from the newest waker only.
959    #[derive(Clone, Default)]
960    struct MultiWaker {
961        state: Arc<AtomicUsize>,
962    }
963
964    impl MultiWaker {
965        /// Clears the notification flag.
966        ///
967        /// This operation has unconditional Relaxed semantic and for this
968        /// reason should be used instead of `take_notification` when the intent
969        /// is only to cancel a notification for book-keeping purposes, e.g. to
970        /// simulate a spurious wake-up, without introducing unwanted
971        /// synchronization.
972        fn clear_notification(&self) {
973            self.state.fetch_and(!1, Ordering::Relaxed);
974        }
975
976        /// Clears the notification flag and returns the former notification
977        /// status.
978        ///
979        /// This operation has Acquire semantic when a notification is indeed
980        /// present, and Relaxed otherwise. It is therefore appropriate to
981        /// simulate a scheduler receiving a notification as it ensures that all
982        /// memory operations preceding the notification of a task are visible.
983        fn take_notification(&self) -> bool {
984            // Clear the notification flag.
985            let mut state = self.state.load(Ordering::Relaxed);
986            loop {
987                let notified_stated = state | 1;
988                let unnotified_stated = state & !1;
989                match self.state.compare_exchange_weak(
990                    notified_stated,
991                    unnotified_stated,
992                    Ordering::Acquire,
993                    Ordering::Relaxed,
994                ) {
995                    Ok(_) => return true,
996                    Err(s) => {
997                        state = s;
998                        if state == unnotified_stated {
999                            return false;
1000                        }
1001                    }
1002                }
1003            }
1004        }
1005
1006        /// Clears the notification flag and creates a new waker.
1007        fn new_waker(&self) -> Waker {
1008            // Increase the epoch and clear the notification flag.
1009            let mut state = self.state.load(Ordering::Relaxed);
1010            let mut epoch;
1011            loop {
1012                // Increase the epoch by 2.
1013                epoch = (state & !1) + 2;
1014                match self.state.compare_exchange_weak(
1015                    state,
1016                    epoch,
1017                    Ordering::Relaxed,
1018                    Ordering::Relaxed,
1019                ) {
1020                    Ok(_) => break,
1021                    Err(s) => state = s,
1022                }
1023            }
1024
1025            // Create a waker that only notifies if it is the newest waker.
1026            let waker_state = self.state.clone();
1027            waker_fn(move || {
1028                let mut state = waker_state.load(Ordering::Relaxed);
1029                loop {
1030                    let new_state = if state & !1 == epoch {
1031                        epoch | 1
1032                    } else {
1033                        break;
1034                    };
1035                    match waker_state.compare_exchange(
1036                        state,
1037                        new_state,
1038                        Ordering::Release,
1039                        Ordering::Relaxed,
1040                    ) {
1041                        Ok(_) => break,
1042                        Err(s) => state = s,
1043                    }
1044                }
1045            })
1046        }
1047    }
1048
1049    /// A simple counter that can be used to simulate the availability of a
1050    /// certain number of AVAILABLE_TOKENS. In order to model the weakest possible
1051    /// predicate from the viewpoint of atomic memory ordering, only Relaxed
1052    /// atomic operations are used.
1053    #[derive(Default)]
1054    struct Counter {
1055        count: AtomicUsize,
1056    }
1057
1058    impl Counter {
1059        fn increment(&self) {
1060            self.count.fetch_add(1, Ordering::Relaxed);
1061        }
1062        fn try_decrement(&self) -> Option<()> {
1063            let mut count = self.count.load(Ordering::Relaxed);
1064            loop {
1065                if count == 0 {
1066                    return None;
1067                }
1068                match self.count.compare_exchange(
1069                    count,
1070                    count - 1,
1071                    Ordering::Relaxed,
1072                    Ordering::Relaxed,
1073                ) {
1074                    Ok(_) => return Some(()),
1075                    Err(c) => count = c,
1076                }
1077            }
1078        }
1079    }
1080
1081    /// A closure that contains the targets of all references captured by a
1082    /// `WaitUntil` Future.
1083    ///
1084    /// This ugly thing is needed to arbitrarily extend the lifetime of a
1085    /// `WaitUntil` future and thus mimic the behavior of an executor task.
1086    struct WaitUntilClosure {
1087        event: Arc<Event>,
1088        token_counter: Arc<Counter>,
1089        wait_until: Option<Box<dyn Future<Output = ()>>>,
1090        _pin: PhantomPinned,
1091    }
1092
1093    impl WaitUntilClosure {
1094        /// Creates a `WaitUntil` future embedded together with the targets
1095        /// captured by reference.
1096        fn new(event: Arc<Event>, token_counter: Arc<Counter>) -> Pin<Box<Self>> {
1097            let res = Self {
1098                event,
1099                token_counter,
1100                wait_until: None,
1101                _pin: PhantomPinned,
1102            };
1103            let boxed = Box::new(res);
1104
1105            // Artificially extend the lifetimes of the captured references.
1106            let event_ptr = &*boxed.event as *const Event;
1107            let token_counter_ptr = &boxed.token_counter as *const Arc<Counter>;
1108
1109            // Safety: we now commit to never move the closure and to ensure
1110            // that the `WaitUntil` future does not outlive the captured
1111            // references.
1112            let wait_until: Box<dyn Future<Output = _>> = unsafe {
1113                Box::new((*event_ptr).wait_until(move || (*token_counter_ptr).try_decrement()))
1114            };
1115            let mut pinned_box: Pin<Box<WaitUntilClosure>> = boxed.into();
1116
1117            let mut_ref: Pin<&mut Self> = Pin::as_mut(&mut pinned_box);
1118            unsafe {
1119                // This is safe: we are not moving the closure.
1120                Pin::get_unchecked_mut(mut_ref).wait_until = Some(wait_until);
1121            }
1122
1123            pinned_box
1124        }
1125
1126        /// Returns a pinned, type-erased `WaitUntil` future.
1127        fn as_pinned_future(self: Pin<&mut Self>) -> Pin<&mut dyn Future<Output = ()>> {
1128            unsafe { self.map_unchecked_mut(|s| s.wait_until.as_mut().unwrap().as_mut()) }
1129        }
1130    }
1131
1132    impl Drop for WaitUntilClosure {
1133        fn drop(&mut self) {
1134            // Make sure that the `WaitUntil` future does not outlive its
1135            // captured references.
1136            self.wait_until = None;
1137        }
1138    }
1139
1140    /// An enum that registers the final state of a `WaitUntil` future at the
1141    /// completion of a thread.
1142    ///
1143    /// When the future is still in a `Polled` state, this future is moved into
1144    /// the enum so as to extend its lifetime and allow it to be further
1145    /// notified.
1146    #[allow(dead_code)]
1147    enum FutureState {
1148        Completed,
1149        Polled(Pin<Box<WaitUntilClosure>>),
1150        Cancelled,
1151    }
1152
1153    /// Make a certain amount of AVAILABLE_TOKENS available and notify as many waiters
1154    /// among all registered waiters, possibly from several notifier threads.
1155    /// Optionally, it is possible to:
1156    /// - request that `max_spurious_wake` threads will simulate a spurious
1157    ///   wake-up if the waiter is polled and returns `Poll::Pending`,
1158    /// - request that `max_cancellations` threads will cancel the waiter if the
1159    ///   waiter is polled and returns `Poll::Pending`,
1160    /// - change the waker each time it is polled.
1161    ///
1162    /// Note that the aggregate number of specified cancellations and spurious
1163    /// wake-ups cannot exceed the number of waiters.
1164    fn loom_notify(
1165        token_count: usize,
1166        waiter_count: usize,
1167        notifier_count: usize,
1168        max_spurious_wake: usize,
1169        max_cancellations: usize,
1170        change_waker: bool,
1171        preemption_bound: usize,
1172    ) {
1173        let mut builder = Builder::new();
1174        if builder.preemption_bound.is_none() {
1175            builder.preemption_bound = Some(preemption_bound);
1176        }
1177
1178        builder.check(move || {
1179            let token_counter = Arc::new(Counter::default());
1180            let event = Arc::new(Event::new());
1181
1182            let mut wakers: Vec<MultiWaker> = Vec::new();
1183            wakers.resize_with(waiter_count, Default::default);
1184
1185            let waiter_threads: Vec<_> = wakers
1186                .iter()
1187                .enumerate()
1188                .map(|(i, multi_waker)| {
1189                    thread::spawn({
1190                        let multi_waker = multi_waker.clone();
1191                        let mut wait_until =
1192                            WaitUntilClosure::new(event.clone(), token_counter.clone());
1193
1194                        move || {
1195                            // `max_cancellations` threads will cancel the
1196                            // waiter if the waiter returns `Poll::Pending`.
1197                            let cancel_waiter = i < max_cancellations;
1198                            // `max_spurious_wake` threads will simulate a
1199                            // spurious wake-up if the waiter returns
1200                            // `Poll::Pending`.
1201                            let mut spurious_wake = i >= max_cancellations
1202                                && i < (max_cancellations + max_spurious_wake);
1203
1204                            let mut waker = multi_waker.new_waker();
1205                            loop {
1206                                let mut cx = Context::from_waker(&waker);
1207                                let poll_state =
1208                                    wait_until.as_mut().as_pinned_future().poll(&mut cx);
1209
1210                                // Return successfully if the predicate was
1211                                // checked successfully.
1212                                if matches!(poll_state, Poll::Ready(_)) {
1213                                    return FutureState::Completed;
1214                                }
1215
1216                                // The future has returned Poll::Pending.
1217                                // Depending on the situation, we will either
1218                                // cancel the future, return and wait for a
1219                                // notification, or poll again.
1220
1221                                if cancel_waiter {
1222                                    // The `wait_until` future is dropped while
1223                                    // in pending state, which simulates future
1224                                    // cancellation. Note that the notification
1225                                    // was intentionally cleared earlier so the
1226                                    // task will not be counted as a task that
1227                                    // should eventually succeed.
1228                                    return FutureState::Cancelled;
1229                                }
1230                                if spurious_wake {
1231                                    // Clear the notification, if any.
1232                                    multi_waker.clear_notification();
1233                                } else if !multi_waker.take_notification() {
1234                                    // The async runtime would normally keep the
1235                                    // `wait_until` future alive after `poll`
1236                                    // returns `Pending`. This behavior is
1237                                    // emulated by returning the `WaitUntil`
1238                                    // closure from the thread so as to extend
1239                                    // it lifetime.
1240                                    return FutureState::Polled(wait_until);
1241                                }
1242
1243                                // The task was notified or spuriously awaken.
1244                                spurious_wake = false;
1245                                if change_waker {
1246                                    waker = multi_waker.new_waker();
1247                                }
1248                            }
1249                        }
1250                    })
1251                })
1252                .collect();
1253
1254            // Increment the token count and notify a consumer after each
1255            // increment.
1256            assert!(notifier_count >= 1);
1257            assert!(token_count >= notifier_count);
1258
1259            // Each notifier thread but the last one makes one and only one
1260            // token available.
1261            let notifier_threads: Vec<_> = (0..(notifier_count - 1))
1262                .map(|_| {
1263                    let token_counter = token_counter.clone();
1264                    let event = event.clone();
1265                    thread::spawn(move || {
1266                        token_counter.increment();
1267                        event.notify(1);
1268                    })
1269                })
1270                .collect();
1271
1272            // The last notifier thread completes the number of AVAILABLE_TOKENS as
1273            // needed.
1274            for _ in 0..(token_count - (notifier_count - 1)) {
1275                token_counter.increment();
1276                event.notify(1);
1277            }
1278
1279            // Join the remaining notifier threads.
1280            for th in notifier_threads {
1281                th.join().unwrap();
1282            }
1283
1284            // Join all waiter threads and check which of them have successfully
1285            // checked the predicate. It is important that all `FutureState`
1286            // returned by the threads be kept alive until _all_ threads have
1287            // joined because `FutureState::Polled` items extend the lifetime of
1288            // their future so they can still be notified.
1289            let future_state: Vec<_> = waiter_threads
1290                .into_iter()
1291                .map(|th| th.join().unwrap())
1292                .collect();
1293
1294            // See which threads have successfully completed. It is now OK to drop
1295            // the returned `FutureState`s.
1296            let success: Vec<_> = future_state
1297                .into_iter()
1298                .map(|state| match state {
1299                    FutureState::Completed => true,
1300                    _ => false,
1301                })
1302                .collect();
1303
1304            // Check which threads have been notified, excluding those which
1305            // future was cancelled.
1306            let notified: Vec<_> = wakers
1307                .iter()
1308                .enumerate()
1309                .map(|(i, test_waker)| {
1310                    // Count the notification unless the thread was cancelled
1311                    // since in that case the notification would be missed.
1312                    test_waker.take_notification() && i >= max_cancellations
1313                })
1314                .collect();
1315
1316            // Count how many threads have either succeeded or have been
1317            // notified.
1318            let actual_aggregate_count =
1319                success
1320                    .iter()
1321                    .zip(notified.iter())
1322                    .fold(0, |count, (&success, &notified)| {
1323                        if success || notified {
1324                            count + 1
1325                        } else {
1326                            count
1327                        }
1328                    });
1329
1330            // Compare with the number of event sinks that should eventually succeed.
1331            let min_expected_success_count = token_count.min(waiter_count - max_cancellations);
1332            if actual_aggregate_count < min_expected_success_count {
1333                panic!(
1334                    "Successful threads: {:?}; Notified threads: {:?}",
1335                    success, notified
1336                );
1337            }
1338        });
1339    }
1340
1341    #[test]
1342    fn loom_two_consumers() {
1343        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1344        loom_notify(2, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1345    }
1346    #[test]
1347    fn loom_two_consumers_spurious() {
1348        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1349        loom_notify(2, 2, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
1350    }
1351    #[test]
1352    fn loom_two_consumers_cancellation() {
1353        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1354        loom_notify(2, 2, 1, 1, 1, false, DEFAULT_PREEMPTION_BOUND);
1355    }
1356    #[test]
1357    fn loom_two_consumers_change_waker() {
1358        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1359        loom_notify(2, 2, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
1360    }
1361    #[test]
1362    fn loom_two_consumers_change_waker_spurious() {
1363        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1364        loom_notify(2, 2, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
1365    }
1366    #[test]
1367    fn loom_two_consumers_change_waker_cancellation() {
1368        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1369        loom_notify(1, 2, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
1370    }
1371    #[test]
1372    fn loom_two_consumers_change_waker_spurious_cancellation() {
1373        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1374        loom_notify(2, 2, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
1375    }
1376    #[test]
1377    fn loom_two_consumers_three_tokens() {
1378        const DEFAULT_PREEMPTION_BOUND: usize = 3;
1379        loom_notify(3, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1380    }
1381    #[test]
1382    fn loom_three_consumers() {
1383        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1384        loom_notify(3, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1385    }
1386    #[test]
1387    fn loom_three_consumers_spurious() {
1388        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1389        loom_notify(3, 3, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
1390    }
1391    #[test]
1392    fn loom_three_consumers_cancellation() {
1393        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1394        loom_notify(2, 3, 1, 0, 1, false, DEFAULT_PREEMPTION_BOUND);
1395    }
1396    #[test]
1397    fn loom_three_consumers_change_waker() {
1398        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1399        loom_notify(3, 3, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
1400    }
1401    #[test]
1402    fn loom_three_consumers_change_waker_spurious() {
1403        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1404        loom_notify(3, 3, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
1405    }
1406    #[test]
1407    fn loom_three_consumers_change_waker_cancellation() {
1408        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1409        loom_notify(3, 3, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
1410    }
1411    #[test]
1412    fn loom_three_consumers_change_waker_spurious_cancellation() {
1413        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1414        loom_notify(3, 3, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
1415    }
1416    #[test]
1417    fn loom_three_consumers_two_tokens() {
1418        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1419        loom_notify(2, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1420    }
1421    #[test]
1422    fn loom_two_consumers_two_notifiers() {
1423        const DEFAULT_PREEMPTION_BOUND: usize = 3;
1424        loom_notify(2, 2, 2, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1425    }
1426    #[test]
1427    fn loom_one_consumer_three_notifiers() {
1428        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1429        loom_notify(3, 1, 3, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1430    }
1431}