async_event/
lib.rs

1//! An efficient async condition variable for lock-free algorithms, a.k.a.
2//! "eventcount".
3//!
4//! [Eventcount][eventcount]-like primitives are useful to make some operations
5//! on a lock-free structure blocking, for instance to transform bounded queues
6//! into bounded channels. Such a primitive allows an interested task to block
7//! until a predicate is satisfied by checking the predicate each time it
8//! receives a notification.
9//!
10//! While functionally similar to the [event_listener] crate, this
11//! implementation is more opinionated and limited to the `async` case. It
12//! strives to be more efficient, however, by limiting the amount of locking
13//! operations on the mutex-protected list of notifiers: the lock is typically
14//! taken only once for each time a waiter is blocked and once for notifying,
15//! thus reducing the need for synchronization operations. Finally, spurious
16//! wake-ups are only generated in very rare circumstances.
17//!
18//! This library is an offshoot of [Asynchronix][asynchronix], an ongoing effort
19//! at a high performance asynchronous computation framework for system
20//! simulation.
21//!
22//! [event_listener]: https://docs.rs/event_listener/latest/event_listener/
23//! [eventcount]:
24//!     https://www.1024cores.net/home/lock-free-algorithms/eventcounts
25//! [asynchronix]: https://github.com/asynchronics/asynchronix
26//!
27//! # Examples
28//!
29//! Wait until a non-zero value has been sent asynchronously.
30//!
31//! ```
32//! use std::sync::atomic::{AtomicUsize, Ordering};
33//! use std::sync::Arc;
34//! use std::thread;
35//!
36//! use futures_executor::block_on;
37//!
38//! use async_event::Event;
39//!
40//!
41//! let value = Arc::new(AtomicUsize::new(0));
42//! let event = Arc::new(Event::new());
43//!
44//! // Set a non-zero value concurrently.
45//! thread::spawn({
46//!     let value = value.clone();
47//!     let event = event.clone();
48//!
49//!     move || {
50//!         // A relaxed store is sufficient here: `Event::notify*` methods insert
51//!         // atomic fences to warrant adequate synchronization.
52//!         value.store(42, Ordering::Relaxed);
53//!         event.notify_one();
54//!     }
55//! });
56//!
57//! // Wait until the value is set.
58//! block_on(async move {
59//!     let v = event
60//!         .wait_until(|| {
61//!             // A relaxed load is sufficient here: `Event::wait_until` inserts
62//!             // atomic fences to warrant adequate synchronization.
63//!             let v = value.load(Ordering::Relaxed);
64//!             if v != 0 { Some(v) } else { None }
65//!         })
66//!         .await;
67//!
68//!      assert_eq!(v, 42);
69//! });
70//! ```
71mod loom_exports;
72
73use std::future::Future;
74use std::mem;
75use std::pin::Pin;
76use std::ptr::NonNull;
77use std::sync::atomic::Ordering;
78use std::task::{Context, Poll, Waker};
79
80use loom_exports::cell::UnsafeCell;
81use loom_exports::sync::atomic::{self, AtomicBool};
82use loom_exports::sync::Mutex;
83use pin_project_lite::pin_project;
84
85/// An object that can receive or send notifications.
86pub struct Event {
87    wait_set: WaitSet,
88}
89
90impl Event {
91    /// Creates a new event.
92    pub fn new() -> Self {
93        Self {
94            wait_set: WaitSet::default(),
95        }
96    }
97
98    /// Notify a number of awaiting events that the predicate should be checked.
99    ///
100    /// If less events than requested are currently awaiting, then all awaiting
101    /// event are notified.
102    #[inline(always)]
103    pub fn notify(&self, n: usize) {
104        // This fence synchronizes with the other fence in `WaitUntil::poll` and
105        // ensures that either the `poll` method will successfully check the
106        // predicate set before this call, or the notifier inserted by `poll`
107        // will be visible in the wait list when calling `WaitSet::notify` (or
108        // both).
109        atomic::fence(Ordering::SeqCst);
110
111        // Safety: all notifiers in the wait set are guaranteed to be alive
112        // since the `WaitUntil` drop handler ensures that notifiers are removed
113        // from the wait set before they are deallocated.
114        unsafe {
115            self.wait_set.notify_relaxed(n);
116        }
117    }
118
119    /// Notify one awaiting event (if any) that the predicate should be checked.
120    #[inline(always)]
121    pub fn notify_one(&self) {
122        self.notify(1);
123    }
124
125    /// Notify all awaiting events that the predicate should be checked.
126    #[inline(always)]
127    pub fn notify_all(&self) {
128        self.notify(usize::MAX);
129    }
130
131    /// Returns a future that can be `await`ed until the provided predicate is
132    /// satisfied.
133    pub fn wait_until<F, T>(&self, predicate: F) -> WaitUntil<F, T>
134    where
135        F: FnMut() -> Option<T>,
136    {
137        WaitUntil::new(&self.wait_set, predicate)
138    }
139
140    /// Returns a future that can be `await`ed until the provided predicate is
141    /// satisfied or until the provided future completes.
142    ///
143    /// The deadline is specified as a `Future` that is expected to resolves to
144    /// `()` after some duration, such as a `tokio::time::Sleep` future.
145    pub fn wait_until_or_timeout<F, T, D>(
146        &self,
147        predicate: F,
148        deadline: D,
149    ) -> WaitUntilOrTimeout<F, T, D>
150    where
151        F: FnMut() -> Option<T>,
152        D: Future<Output = ()>,
153    {
154        WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline)
155    }
156}
157
158impl Default for Event {
159    fn default() -> Self {
160        Self::new()
161    }
162}
163
164unsafe impl Send for Event {}
165unsafe impl Sync for Event {}
166
167/// A waker wrapper that can be inserted in a list.
168///
169/// A notifier always has an exclusive owner or borrower, except in one edge
170/// case: the `WaitSet::remove_relaxed()` method may create a shared reference
171/// while the notifier is concurrently accessed under the `wait_set` mutex by
172/// one of the `WaitSet` methods. So occasionally 2 references to a `Notifier`
173/// will exist at the same time, meaning that even when accessed under the
174/// `wait_set` mutex, a notifier can only be accessed by reference.
175struct Notifier {
176    /// The current waker, if any.
177    waker: Option<Waker>,
178    /// Pointer to the previous wait set notifier.
179    prev: UnsafeCell<Option<NonNull<Notifier>>>,
180    /// Pointer to the next wait set notifier.
181    next: UnsafeCell<Option<NonNull<Notifier>>>,
182    /// Flag indicating whether the notifier is currently in the wait set.
183    in_wait_set: AtomicBool,
184}
185
186impl Notifier {
187    /// Creates a new Notifier without any registered waker.
188    fn new() -> Self {
189        Self {
190            waker: None,
191            prev: UnsafeCell::new(None),
192            next: UnsafeCell::new(None),
193            in_wait_set: AtomicBool::new(false),
194        }
195    }
196
197    /// Stores the specified waker if it differs from the cached waker.
198    fn set_waker(&mut self, waker: &Waker) {
199        if match &self.waker {
200            Some(w) => !w.will_wake(waker),
201            None => true,
202        } {
203            self.waker = Some(waker.clone());
204        }
205    }
206
207    /// Notifies the task.
208    fn wake(&self) {
209        // Safety: the waker is only ever accessed mutably when the notifier is
210        // itself accessed mutably. The caller claims shared (non-mutable)
211        // ownership of the notifier, so there is not possible concurrent
212        // mutable access to the notifier and therefore to the waker.
213        if let Some(w) = &self.waker {
214            w.wake_by_ref();
215        }
216    }
217}
218
219unsafe impl Send for Notifier {}
220unsafe impl Sync for Notifier {}
221
222/// A future that can be `await`ed until a predicate is satisfied.
223pub struct WaitUntil<'a, F: FnMut() -> Option<T>, T> {
224    state: WaitUntilState,
225    predicate: F,
226    wait_set: &'a WaitSet,
227}
228
229impl<'a, F: FnMut() -> Option<T>, T> WaitUntil<'a, F, T> {
230    /// Creates a future associated with the specified event sink that can be
231    /// `await`ed until the specified predicate is satisfied.
232    fn new(wait_set: &'a WaitSet, predicate: F) -> Self {
233        Self {
234            state: WaitUntilState::Idle,
235            predicate,
236            wait_set,
237        }
238    }
239}
240
241impl<F: FnMut() -> Option<T>, T> Drop for WaitUntil<'_, F, T> {
242    fn drop(&mut self) {
243        if let WaitUntilState::Polled(notifier) = self.state {
244            // If we are in the `Polled` stated, it means that the future was
245            // cancelled and its notifier may still be in the wait set: it is
246            // necessary to cancel the notifier so that another event sink can
247            // be notified if one is registered, and then to deallocate the
248            // notifier.
249            //
250            // Safety: all notifiers in the wait set are guaranteed to be alive
251            // since this drop handler ensures that notifiers are removed from
252            // the wait set before they are deallocated. After the notifier is
253            // removed from the list we can claim unique ownership and
254            // deallocate the notifier.
255            unsafe {
256                self.wait_set.cancel(notifier);
257                let _ = Box::from_raw(notifier.as_ptr());
258            }
259        }
260    }
261}
262
263impl<'a, F: FnMut() -> Option<T>, T> Unpin for WaitUntil<'a, F, T> {}
264
265unsafe impl<F: (FnMut() -> Option<T>) + Send, T: Send> Send for WaitUntil<'_, F, T> {}
266
267impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
268    type Output = T;
269
270    #[inline]
271    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
272        assert!(self.state != WaitUntilState::Completed);
273
274        // Remove the notifier if it is in the wait set. In most cases this will
275        // be a cheap no-op because, unless the wake-up is spurious, the
276        // notifier was already removed from the wait set.
277        //
278        // Removing the notifier before checking the predicate is necessary to
279        // avoid races such as this one:
280        //
281        // 1) event sink A unsuccessfully checks the predicate, inserts its
282        //    notifier in the wait set, unsuccessfully re-checks the predicate,
283        //    returns `Poll::Pending`,
284        // 2) event sink B unsuccessfully checks the predicate, inserts its
285        //    notifier in the wait set, unsuccessfully re-checks the predicate,
286        //    returns `Poll::Pending`,
287        // 3) the event source makes one predicate satisfiable,
288        // 4) event sink A is spuriously awaken and successfully checks the
289        //    predicates, returns `Poll::Ready`,
290        // 5) the event source notifies event sink B,
291        // 6) event sink B is awaken and unsuccessfully checks the predicate,
292        //    inserts its notifier in the wait set, unsuccessfully re-checks the
293        //    predicate, returns `Poll::Pending`,
294        // 7) the event source makes another predicate satisfiable.
295        // 8) if now the notifier of event sink A was not removed from the wait
296        //    set, the event source may notify event sink A (which is no longer
297        //    interested) rather than event sink B, meaning that event sink B
298        //    will never be notified.
299        if let WaitUntilState::Polled(notifier) = self.state {
300            // Safety: all notifiers in the wait set are guaranteed to be alive
301            // since the `WaitUntil` drop handler ensures that notifiers are
302            // removed from the wait set before they are deallocated. Using the
303            // relaxed version of `notify` is enough since the notifier was
304            // inserted in the same future so there exists a happen-before
305            // relationship with the insertion operation.
306            unsafe { self.wait_set.remove_relaxed(notifier) };
307        }
308
309        // Fast path.
310        if let Some(v) = (self.predicate)() {
311            if let WaitUntilState::Polled(notifier) = self.state {
312                // Safety: the notifier is no longer in the wait set so we can
313                // claim unique ownership and deallocate the notifier.
314                let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
315            }
316
317            self.state = WaitUntilState::Completed;
318
319            return Poll::Ready(v);
320        }
321
322        let mut notifier = if let WaitUntilState::Polled(notifier) = self.state {
323            notifier
324        } else {
325            unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(Notifier::new()))) }
326        };
327
328        // Set or update the notifier.
329        //
330        // Safety: the notifier is not (or no longer) in the wait list so we
331        // have exclusive ownership.
332        let waker = cx.waker();
333        unsafe { notifier.as_mut().set_waker(waker) };
334
335        // Safety: all notifiers in the wait set are guaranteed to be alive
336        // since the `WaitUntil` drop handler ensures that notifiers are removed
337        // from the wait set before they are deallocated.
338        unsafe { self.wait_set.insert(notifier) };
339
340        // This fence synchronizes with the other fence in `Event::notify` and
341        // ensures that either the predicate below will be satisfied or the
342        // event source will see the notifier inserted above in the wait list
343        // after it makes the predicate satisfiable (or both).
344        atomic::fence(Ordering::SeqCst);
345
346        if let Some(v) = (self.predicate)() {
347            // We need to cancel and not merely remove the notifier from the
348            // wait set so that another event sink can be notified in case we
349            // have been notified just after checking the predicate. This is an
350            // example of race that makes this necessary:
351            //
352            // 1) event sink A and event sink B both unsuccessfully check the
353            //    predicate,
354            // 2) the event source makes one predicate satisfiable and tries to
355            //    notify an event sink but fails since no notifier has been
356            //    inserted in the wait set yet,
357            // 3) event sink A and event sink B both insert their notifier in
358            //    the wait set,
359            // 4) event sink A re-checks the predicate, successfully,
360            // 5) event sink B re-checks the predicate, unsuccessfully,
361            // 6) the event source makes another predicate satisfiable,
362            // 7) the event source sends a notification for the second predicate
363            //    but unfortunately chooses the "wrong" notifier in the wait
364            //    set, i.e. that of event sink A -- note that this is always
365            //    possible irrespective of FIFO or LIFO ordering because it also
366            //    depends on the order of notifier insertion in step 3)
367            // 8) if, before returning, event sink A merely removes itself from
368            //    the wait set without notifying another event sink, then event
369            //    sink B will never be notified.
370            //
371            // Safety: all notifiers in the wait set are guaranteed to be alive
372            // since the `WaitUntil` drop handler ensures that notifiers are
373            // removed from the wait set before they are deallocated.
374            unsafe {
375                self.wait_set.cancel(notifier);
376            }
377
378            self.state = WaitUntilState::Completed;
379
380            // Safety: the notifier is not longer in the wait set so we can
381            // claim unique ownership and deallocate the notifier.
382            let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
383
384            return Poll::Ready(v);
385        }
386
387        self.state = WaitUntilState::Polled(notifier);
388
389        Poll::Pending
390    }
391}
392
393/// State of the `WaitUntil` future.
394#[derive(PartialEq)]
395enum WaitUntilState {
396    Idle,
397    Polled(NonNull<Notifier>),
398    Completed,
399}
400
401pin_project! {
402    /// A future that can be `await`ed until a predicate is satisfied or until a
403    /// deadline elapses.
404    pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option<T>, T, D: Future<Output = ()>> {
405        wait_until: WaitUntil<'a, F, T>,
406        #[pin]
407        deadline: D,
408    }
409}
410
411impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D>
412where
413    F: FnMut() -> Option<T>,
414    D: Future<Output = ()>,
415{
416    /// Creates a future associated with the specified event sink that can be
417    /// `await`ed until the specified predicate is satisfied, or until the
418    /// specified timeout future completes.
419    fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self {
420        Self {
421            wait_until: WaitUntil::new(wait_set, predicate),
422            deadline,
423        }
424    }
425}
426
427impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D>
428where
429    F: FnMut() -> Option<T>,
430    D: Future<Output = ()>,
431{
432    type Output = Option<T>;
433
434    #[inline]
435    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436        let this = self.project();
437
438        if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) {
439            Poll::Ready(Some(value))
440        } else if this.deadline.poll(cx).is_ready() {
441            Poll::Ready(None)
442        } else {
443            Poll::Pending
444        }
445    }
446}
447
448/// A set of notifiers.
449///
450/// The set wraps a Mutex-protected list of notifiers and manages a flag for
451/// fast assessment of list emptiness.
452struct WaitSet {
453    list: Mutex<List>,
454    is_empty: AtomicBool,
455}
456
457impl WaitSet {
458    /// Inserts a node in the wait set.
459    ///
460    /// # Safety
461    ///
462    /// The specified notifier and all notifiers in the wait set must be alive.
463    /// The notifier should not be already in the wait set.
464    unsafe fn insert(&self, notifier: NonNull<Notifier>) {
465        let mut list = self.list.lock().unwrap();
466
467        #[cfg(any(debug_assertions, async_event_loom))]
468        if notifier.as_ref().in_wait_set.load(Ordering::Relaxed) {
469            drop(list); // avoids poisoning the lock
470            panic!("the notifier was already in the wait set");
471        }
472
473        // Orderings: Relaxed ordering is sufficient since before this point the
474        // notifier was not in the list and therefore not shared.
475        notifier.as_ref().in_wait_set.store(true, Ordering::Relaxed);
476
477        list.push_back(notifier);
478
479        // Ordering: since this flag is only ever mutated within the
480        // mutex-protected critical section, Relaxed ordering is sufficient.
481        self.is_empty.store(false, Ordering::Relaxed);
482    }
483
484    /// Remove the specified notifier if it is still in the wait set.
485    ///
486    /// After a call to `remove`, the caller is guaranteed that the wait set
487    /// will no longer access the specified notifier.
488    ///
489    /// Note that for performance reasons, the presence of the notifier in the
490    /// list is checked without acquiring the lock. This fast check will never
491    /// lead to a notifier staying in the list as long as there exists an
492    /// happens-before relationship between this call and the earlier call to
493    /// `insert`. A happens-before relationship always exists if these calls are
494    /// made on the same thread or across `await` points.
495    ///
496    /// # Safety
497    ///
498    /// The specified notifier and all notifiers in the wait set must be alive.
499    /// This function may fail to remove the notifier if a happens-before
500    /// relationship does not exist with the previous call to `insert`.
501    unsafe fn remove_relaxed(&self, notifier: NonNull<Notifier>) {
502        // Preliminarily check whether the notifier is already in the list (fast
503        // path).
504        //
505        // This is the only instance where the `in_wait_set` flag is accessed
506        // outside the mutex-protected critical section while the notifier may
507        // still be in the list. The only risk is that the load will be stale
508        // and will read `true` even though the notifier is no longer in the
509        // list, but this is not an issue since in that case the actual state
510        // will be checked again after taking the lock.
511        //
512        // Ordering: Acquire synchronizes with the `Release` orderings in the
513        // `notify` and `cancel` methods; it is necessary to ensure that the
514        // waker is no longer in use by the wait set and can therefore be
515        // modified after returning from `remove`.
516        let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Acquire);
517        if !in_wait_set {
518            return;
519        }
520
521        self.remove(notifier);
522    }
523
524    /// Remove the specified notifier if it is still in the wait set.
525    ///
526    /// After a call to `remove`, the caller is guaranteed that the wait set
527    /// will no longer access the specified notifier.
528    ///
529    /// # Safety
530    ///
531    /// The specified notifier and all notifiers in the wait set must be alive.
532    unsafe fn remove(&self, notifier: NonNull<Notifier>) {
533        let mut list = self.list.lock().unwrap();
534
535        // Check again whether the notifier is already in the list
536        //
537        // Ordering: since this flag is only ever mutated within the
538        // mutex-protected critical section and since the wait set also accesses
539        // the waker only in the critical section, even with Relaxed ordering it
540        // is guaranteed that if `in_wait_set` reads `false` then the waker is
541        // no longer in use by the wait set.
542        let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Relaxed);
543        if !in_wait_set {
544            return;
545        }
546
547        list.remove(notifier);
548        if list.is_empty() {
549            // Ordering: since this flag is only ever mutated within the
550            // mutex-protected critical section, Relaxed ordering is sufficient.
551            self.is_empty.store(true, Ordering::Relaxed);
552        }
553
554        // Ordering: this flag is only ever mutated within the mutex-protected
555        // critical section and since the waker is not accessed in this method,
556        // it does not need to synchronize with a later call to `remove`;
557        // therefore, Relaxed ordering is sufficient.
558        notifier
559            .as_ref()
560            .in_wait_set
561            .store(false, Ordering::Relaxed);
562    }
563
564    /// Remove the specified notifier if it is still in the wait set, otherwise
565    /// notify another event sink.
566    ///
567    /// After a call to `cancel`, the caller is guaranteed that the wait set
568    /// will no longer access the specified notifier.
569    ///
570    /// # Safety
571    ///
572    /// The specified notifier and all notifiers in the wait set must be alive.
573    /// Wakers of notifiers which pointer is in the wait set may not be accessed
574    /// mutably.
575    unsafe fn cancel(&self, notifier: NonNull<Notifier>) {
576        let mut list = self.list.lock().unwrap();
577
578        let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Relaxed);
579        if in_wait_set {
580            list.remove(notifier);
581            if list.is_empty() {
582                self.is_empty.store(true, Ordering::Relaxed);
583            }
584
585            // Ordering: this flag is only ever mutated within the
586            // mutex-protected critical section and since the waker is not
587            // accessed, it does not need to synchronize with the Acquire load
588            // in the `remove` method; therefore, Relaxed ordering is
589            // sufficient.
590            notifier
591                .as_ref()
592                .in_wait_set
593                .store(false, Ordering::Relaxed);
594        } else if let Some(other_notifier) = list.pop_front() {
595            // Safety: the waker can be accessed by reference because the
596            // event sink is not allowed to access the waker mutably before
597            // `in_wait_set` is cleared.
598            other_notifier.as_ref().wake();
599
600            // Ordering: the Release memory ordering synchronizes with the
601            // Acquire ordering in the `remove` method; it is required to
602            // ensure that once `in_wait_set` reads `false` (using Acquire
603            // ordering), the waker is no longer in use by the wait set and
604            // can therefore be modified.
605            other_notifier
606                .as_ref()
607                .in_wait_set
608                .store(false, Ordering::Release);
609        }
610    }
611
612    /// Send a notification to `count` notifiers within the wait set, or to all
613    /// notifiers if the wait set contains less than `count` notifiers.
614    ///
615    /// Note that for performance reasons, list emptiness is checked without
616    /// acquiring the wait set lock. Therefore, in order to prevent the
617    /// possibility that a wait set is seen as empty when it isn't, external
618    /// synchronization is required to make sure that all side effects of a
619    /// previous call to `insert` are fully visible. For instance, an atomic
620    /// memory fence maye be placed before this call and another one after the
621    /// insertion of a notifier.
622    ///
623    /// # Safety
624    ///
625    /// All notifiers in the wait set must be alive. Wakers of notifiers which
626    /// pointer is in the wait set may not be accessed mutably.
627    #[inline(always)]
628    unsafe fn notify_relaxed(&self, count: usize) {
629        let is_empty = self.is_empty.load(Ordering::Relaxed);
630        if is_empty {
631            return;
632        }
633
634        self.notify(count);
635    }
636
637    /// Send a notification to `count` notifiers within the wait set, or to all
638    /// notifiers if the wait set contains less than `count` notifiers.
639    ///
640    /// # Safety
641    ///
642    /// All notifiers in the wait set must be alive. Wakers of notifiers which
643    /// pointer is in the wait set may not be accessed mutably.
644    unsafe fn notify(&self, count: usize) {
645        let mut list = self.list.lock().unwrap();
646        for _ in 0..count {
647            let notifier = {
648                if let Some(notifier) = list.pop_front() {
649                    if list.is_empty() {
650                        self.is_empty.store(true, Ordering::Relaxed);
651                    }
652                    notifier
653                } else {
654                    return;
655                }
656            };
657
658            // Note: the event sink must be notified before the end of the
659            // mutex-protected critical section. Otherwise, a concurrent call to
660            // `remove` could succeed in taking the lock before the waker has
661            // been called, and seeing that the notifier is no longer in the
662            // list would lead its caller to believe that it has now sole
663            // ownership on the notifier even though the call to `wake` has yet
664            // to be made.
665            //
666            // Safety: the waker can be accessed by reference since the event
667            // sink is not allowed to access the waker mutably before
668            // `in_wait_set` is cleared.
669            notifier.as_ref().wake();
670
671            // Ordering: the Release memory ordering synchronizes with the
672            // Acquire ordering in the `remove` method; it is required to ensure
673            // that once `in_wait_set` reads `false` (using Acquire ordering),
674            // the waker can be safely modified.
675            notifier
676                .as_ref()
677                .in_wait_set
678                .store(false, Ordering::Release);
679        }
680    }
681}
682
683impl Default for WaitSet {
684    fn default() -> Self {
685        Self {
686            list: Default::default(),
687            is_empty: AtomicBool::new(true),
688        }
689    }
690}
691
692#[derive(Default)]
693struct List {
694    front: Option<NonNull<Notifier>>,
695    back: Option<NonNull<Notifier>>,
696}
697
698impl List {
699    /// Inserts a node at the back of the list.
700    ///
701    /// # Safety
702    ///
703    /// The provided notifier and all notifiers which pointer is in the list
704    /// must be alive.
705    unsafe fn push_back(&mut self, notifier: NonNull<Notifier>) {
706        // Safety: the `prev` and `next` pointers are only be accessed when the
707        // list is locked.
708        let old_back = mem::replace(&mut self.back, Some(notifier));
709        match old_back {
710            None => self.front = Some(notifier),
711            Some(prev) => prev.as_ref().next.with_mut(|n| *n = Some(notifier)),
712        }
713
714        // Link the new notifier.
715        let notifier = notifier.as_ref();
716        notifier.prev.with_mut(|n| *n = old_back);
717        notifier.next.with_mut(|n| *n = None);
718    }
719
720    /// Removes and returns the notifier at the front of the list, if any.
721    ///
722    /// # Safety
723    ///
724    /// All notifiers which pointer is in the list must be alive.
725    unsafe fn pop_front(&mut self) -> Option<NonNull<Notifier>> {
726        let notifier = self.front?;
727
728        // Unlink from the next notifier.
729        let next = notifier.as_ref().next.with(|n| *n);
730        self.front = next;
731        match next {
732            None => self.back = None,
733            Some(next) => next.as_ref().prev.with_mut(|n| *n = None),
734        }
735
736        Some(notifier)
737    }
738
739    /// Removes the specified notifier.
740    ///
741    /// # Safety
742    ///
743    /// The specified notifier and all notifiers which pointer is in the list
744    /// must be alive.
745    unsafe fn remove(&mut self, notifier: NonNull<Notifier>) {
746        // Unlink from the previous and next notifiers.
747        let prev = notifier.as_ref().prev.with(|n| *n);
748        let next = notifier.as_ref().next.with(|n| *n);
749        match prev {
750            None => self.front = next,
751            Some(prev) => prev.as_ref().next.with_mut(|n| *n = next),
752        }
753        match next {
754            None => self.back = prev,
755            Some(next) => next.as_ref().prev.with_mut(|n| *n = prev),
756        }
757    }
758
759    /// Returns `true` if the list is empty.
760    fn is_empty(&self) -> bool {
761        self.front.is_none()
762    }
763}
764
765/// Non-loom tests.
766#[cfg(all(test, not(async_event_loom)))]
767mod tests {
768    use super::*;
769
770    use std::sync::atomic::AtomicUsize;
771    use std::sync::Arc;
772    use std::thread;
773
774    use futures_executor::block_on;
775
776    #[test]
777    fn smoke() {
778        static SIGNAL: AtomicBool = AtomicBool::new(false);
779
780        let event = Arc::new(Event::new());
781
782        let th_recv = {
783            let event = event.clone();
784            thread::spawn(move || {
785                block_on(async move {
786                    event
787                        .wait_until(|| {
788                            if SIGNAL.load(Ordering::Relaxed) {
789                                Some(())
790                            } else {
791                                None
792                            }
793                        })
794                        .await;
795
796                    assert!(SIGNAL.load(Ordering::Relaxed));
797                })
798            })
799        };
800
801        SIGNAL.store(true, Ordering::Relaxed);
802        event.notify_one();
803
804        th_recv.join().unwrap();
805    }
806
807    #[test]
808    fn one_to_many() {
809        const RECEIVER_COUNT: usize = 4;
810        static SIGNAL: AtomicBool = AtomicBool::new(false);
811
812        let event = Arc::new(Event::new());
813
814        let th_recv: Vec<_> = (0..RECEIVER_COUNT)
815            .map(|_| {
816                let event = event.clone();
817                thread::spawn(move || {
818                    block_on(async move {
819                        event
820                            .wait_until(|| {
821                                if SIGNAL.load(Ordering::Relaxed) {
822                                    Some(())
823                                } else {
824                                    None
825                                }
826                            })
827                            .await;
828
829                        assert!(SIGNAL.load(Ordering::Relaxed));
830                    })
831                })
832            })
833            .collect();
834
835        SIGNAL.store(true, Ordering::Relaxed);
836        event.notify_one();
837        event.notify(3);
838
839        for th in th_recv {
840            th.join().unwrap();
841        }
842    }
843
844    #[test]
845    fn many_to_many() {
846        const TOKEN_COUNT: usize = 4;
847        static AVAILABLE_TOKENS: AtomicUsize = AtomicUsize::new(0);
848
849        let event = Arc::new(Event::new());
850
851        // Receive tokens from multiple threads.
852        let th_recv: Vec<_> = (0..TOKEN_COUNT)
853            .map(|_| {
854                let event = event.clone();
855                thread::spawn(move || {
856                    block_on(async move {
857                        event
858                            .wait_until(|| {
859                                AVAILABLE_TOKENS
860                                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |t| {
861                                        if t > 0 {
862                                            Some(t - 1)
863                                        } else {
864                                            None
865                                        }
866                                    })
867                                    .ok()
868                            })
869                            .await;
870                    })
871                })
872            })
873            .collect();
874
875        // Make tokens available from multiple threads.
876        let th_send: Vec<_> = (0..TOKEN_COUNT)
877            .map(|_| {
878                let event = event.clone();
879                thread::spawn(move || {
880                    AVAILABLE_TOKENS.fetch_add(1, Ordering::Relaxed);
881                    event.notify_one();
882                })
883            })
884            .collect();
885
886        for th in th_recv {
887            th.join().unwrap();
888        }
889        for th in th_send {
890            th.join().unwrap();
891        }
892
893        assert!(AVAILABLE_TOKENS.load(Ordering::Relaxed) == 0);
894    }
895
896    #[test]
897    fn notify_all() {
898        const RECEIVER_COUNT: usize = 4;
899        static SIGNAL: AtomicBool = AtomicBool::new(false);
900
901        let event = Arc::new(Event::new());
902
903        let th_recv: Vec<_> = (0..RECEIVER_COUNT)
904            .map(|_| {
905                let event = event.clone();
906                thread::spawn(move || {
907                    block_on(async move {
908                        event
909                            .wait_until(|| {
910                                if SIGNAL.load(Ordering::Relaxed) {
911                                    Some(())
912                                } else {
913                                    None
914                                }
915                            })
916                            .await;
917
918                        assert!(SIGNAL.load(Ordering::Relaxed));
919                    })
920                })
921            })
922            .collect();
923
924        SIGNAL.store(true, Ordering::Relaxed);
925        event.notify_all();
926
927        for th in th_recv {
928            th.join().unwrap();
929        }
930    }
931}
932
933/// Loom tests.
934#[cfg(all(test, async_event_loom))]
935mod tests {
936    use super::*;
937
938    use std::future::Future;
939    use std::marker::PhantomPinned;
940    use std::task::{Context, Poll};
941
942    use loom::model::Builder;
943    use loom::sync::atomic::AtomicUsize;
944    use loom::sync::Arc;
945    use loom::thread;
946
947    use waker_fn::waker_fn;
948
949    /// A waker factory that accepts notifications from the newest waker only.
950    #[derive(Clone, Default)]
951    struct MultiWaker {
952        state: Arc<AtomicUsize>,
953    }
954
955    impl MultiWaker {
956        /// Clears the notification flag.
957        ///
958        /// This operation has unconditional Relaxed semantic and for this
959        /// reason should be used instead of `take_notification` when the intent
960        /// is only to cancel a notification for book-keeping purposes, e.g. to
961        /// simulate a spurious wake-up, without introducing unwanted
962        /// synchronization.
963        fn clear_notification(&self) {
964            self.state.fetch_and(!1, Ordering::Relaxed);
965        }
966
967        /// Clears the notification flag and returns the former notification
968        /// status.
969        ///
970        /// This operation has Acquire semantic when a notification is indeed
971        /// present, and Relaxed otherwise. It is therefore appropriate to
972        /// simulate a scheduler receiving a notification as it ensures that all
973        /// memory operations preceding the notification of a task are visible.
974        fn take_notification(&self) -> bool {
975            // Clear the notification flag.
976            let mut state = self.state.load(Ordering::Relaxed);
977            loop {
978                let notified_stated = state | 1;
979                let unnotified_stated = state & !1;
980                match self.state.compare_exchange_weak(
981                    notified_stated,
982                    unnotified_stated,
983                    Ordering::Acquire,
984                    Ordering::Relaxed,
985                ) {
986                    Ok(_) => return true,
987                    Err(s) => {
988                        state = s;
989                        if state == unnotified_stated {
990                            return false;
991                        }
992                    }
993                }
994            }
995        }
996
997        /// Clears the notification flag and creates a new waker.
998        fn new_waker(&self) -> Waker {
999            // Increase the epoch and clear the notification flag.
1000            let mut state = self.state.load(Ordering::Relaxed);
1001            let mut epoch;
1002            loop {
1003                // Increase the epoch by 2.
1004                epoch = (state & !1) + 2;
1005                match self.state.compare_exchange_weak(
1006                    state,
1007                    epoch,
1008                    Ordering::Relaxed,
1009                    Ordering::Relaxed,
1010                ) {
1011                    Ok(_) => break,
1012                    Err(s) => state = s,
1013                }
1014            }
1015
1016            // Create a waker that only notifies if it is the newest waker.
1017            let waker_state = self.state.clone();
1018            waker_fn(move || {
1019                let mut state = waker_state.load(Ordering::Relaxed);
1020                loop {
1021                    let new_state = if state & !1 == epoch {
1022                        epoch | 1
1023                    } else {
1024                        break;
1025                    };
1026                    match waker_state.compare_exchange(
1027                        state,
1028                        new_state,
1029                        Ordering::Release,
1030                        Ordering::Relaxed,
1031                    ) {
1032                        Ok(_) => break,
1033                        Err(s) => state = s,
1034                    }
1035                }
1036            })
1037        }
1038    }
1039
1040    /// A simple counter that can be used to simulate the availability of a
1041    /// certain number of AVAILABLE_TOKENS. In order to model the weakest possible
1042    /// predicate from the viewpoint of atomic memory ordering, only Relaxed
1043    /// atomic operations are used.
1044    #[derive(Default)]
1045    struct Counter {
1046        count: AtomicUsize,
1047    }
1048
1049    impl Counter {
1050        fn increment(&self) {
1051            self.count.fetch_add(1, Ordering::Relaxed);
1052        }
1053        fn try_decrement(&self) -> Option<()> {
1054            let mut count = self.count.load(Ordering::Relaxed);
1055            loop {
1056                if count == 0 {
1057                    return None;
1058                }
1059                match self.count.compare_exchange(
1060                    count,
1061                    count - 1,
1062                    Ordering::Relaxed,
1063                    Ordering::Relaxed,
1064                ) {
1065                    Ok(_) => return Some(()),
1066                    Err(c) => count = c,
1067                }
1068            }
1069        }
1070    }
1071
1072    /// A closure that contains the targets of all references captured by a
1073    /// `WaitUntil` Future.
1074    ///
1075    /// This ugly thing is needed to arbitrarily extend the lifetime of a
1076    /// `WaitUntil` future and thus mimic the behavior of an executor task.
1077    struct WaitUntilClosure {
1078        event: Arc<Event>,
1079        token_counter: Arc<Counter>,
1080        wait_until: Option<Box<dyn Future<Output = ()>>>,
1081        _pin: PhantomPinned,
1082    }
1083
1084    impl WaitUntilClosure {
1085        /// Creates a `WaitUntil` future embedded together with the targets
1086        /// captured by reference.
1087        fn new(event: Arc<Event>, token_counter: Arc<Counter>) -> Pin<Box<Self>> {
1088            let res = Self {
1089                event,
1090                token_counter,
1091                wait_until: None,
1092                _pin: PhantomPinned,
1093            };
1094            let boxed = Box::new(res);
1095
1096            // Artificially extend the lifetimes of the captured references.
1097            let event_ptr = &*boxed.event as *const Event;
1098            let token_counter_ptr = &boxed.token_counter as *const Arc<Counter>;
1099
1100            // Safety: we now commit to never move the closure and to ensure
1101            // that the `WaitUntil` future does not outlive the captured
1102            // references.
1103            let wait_until: Box<dyn Future<Output = _>> = unsafe {
1104                Box::new((*event_ptr).wait_until(move || (*token_counter_ptr).try_decrement()))
1105            };
1106            let mut pinned_box: Pin<Box<WaitUntilClosure>> = boxed.into();
1107
1108            let mut_ref: Pin<&mut Self> = Pin::as_mut(&mut pinned_box);
1109            unsafe {
1110                // This is safe: we are not moving the closure.
1111                Pin::get_unchecked_mut(mut_ref).wait_until = Some(wait_until);
1112            }
1113
1114            pinned_box
1115        }
1116
1117        /// Returns a pinned, type-erased `WaitUntil` future.
1118        fn as_pinned_future(self: Pin<&mut Self>) -> Pin<&mut dyn Future<Output = ()>> {
1119            unsafe { self.map_unchecked_mut(|s| s.wait_until.as_mut().unwrap().as_mut()) }
1120        }
1121    }
1122
1123    impl Drop for WaitUntilClosure {
1124        fn drop(&mut self) {
1125            // Make sure that the `WaitUntil` future does not outlive its
1126            // captured references.
1127            self.wait_until = None;
1128        }
1129    }
1130
1131    /// An enum that registers the final state of a `WaitUntil` future at the
1132    /// completion of a thread.
1133    ///
1134    /// When the future is still in a `Polled` state, this future is moved into
1135    /// the enum so as to extend its lifetime and allow it to be further
1136    /// notified.
1137    #[allow(dead_code)]
1138    enum FutureState {
1139        Completed,
1140        Polled(Pin<Box<WaitUntilClosure>>),
1141        Cancelled,
1142    }
1143
1144    /// Make a certain amount of AVAILABLE_TOKENS available and notify as many waiters
1145    /// among all registered waiters, possibly from several notifier threads.
1146    /// Optionally, it is possible to:
1147    /// - request that `max_spurious_wake` threads will simulate a spurious
1148    ///   wake-up if the waiter is polled and returns `Poll::Pending`,
1149    /// - request that `max_cancellations` threads will cancel the waiter if the
1150    ///   waiter is polled and returns `Poll::Pending`,
1151    /// - change the waker each time it is polled.
1152    ///
1153    /// Note that the aggregate number of specified cancellations and spurious
1154    /// wake-ups cannot exceed the number of waiters.
1155    fn loom_notify(
1156        token_count: usize,
1157        waiter_count: usize,
1158        notifier_count: usize,
1159        max_spurious_wake: usize,
1160        max_cancellations: usize,
1161        change_waker: bool,
1162        preemption_bound: usize,
1163    ) {
1164        let mut builder = Builder::new();
1165        if builder.preemption_bound.is_none() {
1166            builder.preemption_bound = Some(preemption_bound);
1167        }
1168
1169        builder.check(move || {
1170            let token_counter = Arc::new(Counter::default());
1171            let event = Arc::new(Event::new());
1172
1173            let mut wakers: Vec<MultiWaker> = Vec::new();
1174            wakers.resize_with(waiter_count, Default::default);
1175
1176            let waiter_threads: Vec<_> = wakers
1177                .iter()
1178                .enumerate()
1179                .map(|(i, multi_waker)| {
1180                    thread::spawn({
1181                        let multi_waker = multi_waker.clone();
1182                        let mut wait_until =
1183                            WaitUntilClosure::new(event.clone(), token_counter.clone());
1184
1185                        move || {
1186                            // `max_cancellations` threads will cancel the
1187                            // waiter if the waiter returns `Poll::Pending`.
1188                            let cancel_waiter = i < max_cancellations;
1189                            // `max_spurious_wake` threads will simulate a
1190                            // spurious wake-up if the waiter returns
1191                            // `Poll::Pending`.
1192                            let mut spurious_wake = i >= max_cancellations
1193                                && i < (max_cancellations + max_spurious_wake);
1194
1195                            let mut waker = multi_waker.new_waker();
1196                            loop {
1197                                let mut cx = Context::from_waker(&waker);
1198                                let poll_state =
1199                                    wait_until.as_mut().as_pinned_future().poll(&mut cx);
1200
1201                                // Return successfully if the predicate was
1202                                // checked successfully.
1203                                if matches!(poll_state, Poll::Ready(_)) {
1204                                    return FutureState::Completed;
1205                                }
1206
1207                                // The future has returned Poll::Pending.
1208                                // Depending on the situation, we will either
1209                                // cancel the future, return and wait for a
1210                                // notification, or poll again.
1211
1212                                if cancel_waiter {
1213                                    // The `wait_until` future is dropped while
1214                                    // in pending state, which simulates future
1215                                    // cancellation. Note that the notification
1216                                    // was intentionally cleared earlier so the
1217                                    // task will not be counted as a task that
1218                                    // should eventually succeed.
1219                                    return FutureState::Cancelled;
1220                                }
1221                                if spurious_wake {
1222                                    // Clear the notification, if any.
1223                                    multi_waker.clear_notification();
1224                                } else if !multi_waker.take_notification() {
1225                                    // The async runtime would normally keep the
1226                                    // `wait_until` future alive after `poll`
1227                                    // returns `Pending`. This behavior is
1228                                    // emulated by returning the `WaitUntil`
1229                                    // closure from the thread so as to extend
1230                                    // it lifetime.
1231                                    return FutureState::Polled(wait_until);
1232                                }
1233
1234                                // The task was notified or spuriously awaken.
1235                                spurious_wake = false;
1236                                if change_waker {
1237                                    waker = multi_waker.new_waker();
1238                                }
1239                            }
1240                        }
1241                    })
1242                })
1243                .collect();
1244
1245            // Increment the token count and notify a consumer after each
1246            // increment.
1247            assert!(notifier_count >= 1);
1248            assert!(token_count >= notifier_count);
1249
1250            // Each notifier thread but the last one makes one and only one
1251            // token available.
1252            let notifier_threads: Vec<_> = (0..(notifier_count - 1))
1253                .map(|_| {
1254                    let token_counter = token_counter.clone();
1255                    let event = event.clone();
1256                    thread::spawn(move || {
1257                        token_counter.increment();
1258                        event.notify(1);
1259                    })
1260                })
1261                .collect();
1262
1263            // The last notifier thread completes the number of AVAILABLE_TOKENS as
1264            // needed.
1265            for _ in 0..(token_count - (notifier_count - 1)) {
1266                token_counter.increment();
1267                event.notify(1);
1268            }
1269
1270            // Join the remaining notifier threads.
1271            for th in notifier_threads {
1272                th.join().unwrap();
1273            }
1274
1275            // Join all waiter threads and check which of them have successfully
1276            // checked the predicate. It is important that all `FutureState`
1277            // returned by the threads be kept alive until _all_ threads have
1278            // joined because `FutureState::Polled` items extend the lifetime of
1279            // their future so they can still be notified.
1280            let future_state: Vec<_> = waiter_threads
1281                .into_iter()
1282                .map(|th| th.join().unwrap())
1283                .collect();
1284
1285            // See which threads have successfully completed. It is now OK to drop
1286            // the returned `FutureState`s.
1287            let success: Vec<_> = future_state
1288                .into_iter()
1289                .map(|state| match state {
1290                    FutureState::Completed => true,
1291                    _ => false,
1292                })
1293                .collect();
1294
1295            // Check which threads have been notified, excluding those which
1296            // future was cancelled.
1297            let notified: Vec<_> = wakers
1298                .iter()
1299                .enumerate()
1300                .map(|(i, test_waker)| {
1301                    // Count the notification unless the thread was cancelled
1302                    // since in that case the notification would be missed.
1303                    test_waker.take_notification() && i >= max_cancellations
1304                })
1305                .collect();
1306
1307            // Count how many threads have either succeeded or have been
1308            // notified.
1309            let actual_aggregate_count =
1310                success
1311                    .iter()
1312                    .zip(notified.iter())
1313                    .fold(0, |count, (&success, &notified)| {
1314                        if success || notified {
1315                            count + 1
1316                        } else {
1317                            count
1318                        }
1319                    });
1320
1321            // Compare with the number of event sinks that should eventually succeed.
1322            let min_expected_success_count = token_count.min(waiter_count - max_cancellations);
1323            if actual_aggregate_count < min_expected_success_count {
1324                panic!(
1325                    "Successful threads: {:?}; Notified threads: {:?}",
1326                    success, notified
1327                );
1328            }
1329        });
1330    }
1331
1332    #[test]
1333    fn loom_two_consumers() {
1334        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1335        loom_notify(2, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1336    }
1337    #[test]
1338    fn loom_two_consumers_spurious() {
1339        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1340        loom_notify(2, 2, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
1341    }
1342    #[test]
1343    fn loom_two_consumers_cancellation() {
1344        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1345        loom_notify(2, 2, 1, 1, 1, false, DEFAULT_PREEMPTION_BOUND);
1346    }
1347    #[test]
1348    fn loom_two_consumers_change_waker() {
1349        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1350        loom_notify(2, 2, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
1351    }
1352    #[test]
1353    fn loom_two_consumers_change_waker_spurious() {
1354        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1355        loom_notify(2, 2, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
1356    }
1357    #[test]
1358    fn loom_two_consumers_change_waker_cancellation() {
1359        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1360        loom_notify(1, 2, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
1361    }
1362    #[test]
1363    fn loom_two_consumers_change_waker_spurious_cancellation() {
1364        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1365        loom_notify(2, 2, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
1366    }
1367    #[test]
1368    fn loom_two_consumers_three_tokens() {
1369        const DEFAULT_PREEMPTION_BOUND: usize = 3;
1370        loom_notify(3, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1371    }
1372    #[test]
1373    fn loom_three_consumers() {
1374        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1375        loom_notify(3, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1376    }
1377    #[test]
1378    fn loom_three_consumers_spurious() {
1379        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1380        loom_notify(3, 3, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
1381    }
1382    #[test]
1383    fn loom_three_consumers_cancellation() {
1384        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1385        loom_notify(2, 3, 1, 0, 1, false, DEFAULT_PREEMPTION_BOUND);
1386    }
1387    #[test]
1388    fn loom_three_consumers_change_waker() {
1389        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1390        loom_notify(3, 3, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
1391    }
1392    #[test]
1393    fn loom_three_consumers_change_waker_spurious() {
1394        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1395        loom_notify(3, 3, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
1396    }
1397    #[test]
1398    fn loom_three_consumers_change_waker_cancellation() {
1399        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1400        loom_notify(3, 3, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
1401    }
1402    #[test]
1403    fn loom_three_consumers_change_waker_spurious_cancellation() {
1404        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1405        loom_notify(3, 3, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
1406    }
1407    #[test]
1408    fn loom_three_consumers_two_tokens() {
1409        const DEFAULT_PREEMPTION_BOUND: usize = 2;
1410        loom_notify(2, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1411    }
1412    #[test]
1413    fn loom_two_consumers_two_notifiers() {
1414        const DEFAULT_PREEMPTION_BOUND: usize = 3;
1415        loom_notify(2, 2, 2, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1416    }
1417    #[test]
1418    fn loom_one_consumer_three_notifiers() {
1419        const DEFAULT_PREEMPTION_BOUND: usize = 4;
1420        loom_notify(3, 1, 3, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1421    }
1422}