maniac_runtime/future/event/mod.rs
1//! An efficient async condition variable for lock-free algorithms, a.k.a.
2//! "eventcount".
3//!
4//! [Eventcount][eventcount]-like primitives are useful to make some operations
5//! on a lock-free structure blocking, for instance to transform bounded queues
6//! into bounded channels. Such a primitive allows an interested task to block
7//! until a predicate is satisfied by checking the predicate each time it
8//! receives a notification.
9//!
10//! While functionally similar to the [event_listener] crate, this
11//! implementation is more opinionated and limited to the `async` case. It
12//! strives to be more efficient, however, by limiting the amount of locking
13//! operations on the mutex-protected list of notifiers: the lock is typically
14//! taken only once for each time a waiter is blocked and once for notifying,
15//! thus reducing the need for synchronization operations. Finally, spurious
16//! wake-ups are only generated in very rare circumstances.
17//!
18//! This library is an offshoot of [Asynchronix][asynchronix], an ongoing effort
19//! at a high performance asynchronous computation framework for system
20//! simulation.
21//!
22//! [event_listener]: https://docs.rs/event_listener/latest/event_listener/
23//! [eventcount]:
24//! https://www.1024cores.net/home/lock-free-algorithms/eventcounts
25//! [asynchronix]: https://github.com/asynchronics/asynchronix
26//!
27//! # Examples
28//!
29//! Wait until a non-zero value has been sent asynchronously.
30//!
31//! ```ignore
32//! use std::sync::atomic::{AtomicUsize, Ordering};
33//! use std::sync::Arc;
34//! use std::thread;
35//!
36//! use futures_executor::block_on;
37//!
38//! use async_event::Event;
39//!
40//!
41//! let value = Arc::new(AtomicUsize::new(0));
42//! let event = Arc::new(Event::new());
43//!
44//! // Set a non-zero value concurrently.
45//! thread::spawn({
46//! let value = value.clone();
47//! let event = event.clone();
48//!
49//! move || {
50//! // A relaxed store is sufficient here: `Event::notify*` methods insert
51//! // atomic fences to warrant adequate synchronization.
52//! value.store(42, Ordering::Relaxed);
53//! event.notify_one();
54//! }
55//! });
56//!
57//! // Wait until the value is set.
58//! block_on(async move {
59//! let v = event
60//! .wait_until(|| {
61//! // A relaxed load is sufficient here: `Event::wait_until` inserts
62//! // atomic fences to warrant adequate synchronization.
63//! let v = value.load(Ordering::Relaxed);
64//! if v != 0 { Some(v) } else { None }
65//! })
66//! .await;
67//!
68//! assert_eq!(v, 42);
69//! });
70//! ```ignore
71#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
72
73mod loom_exports;
74
75use std::future::Future;
76use std::mem;
77use std::pin::Pin;
78use std::ptr::NonNull;
79use std::sync::atomic::Ordering;
80use std::task::{Context, Poll, Waker};
81
82use loom_exports::cell::UnsafeCell;
83use loom_exports::sync::Mutex;
84use loom_exports::sync::atomic::{self, AtomicBool};
85use pin_project_lite::pin_project;
86
87/// An object that can receive or send notifications.
88#[derive(Debug)]
89pub struct Event {
90 wait_set: WaitSet,
91}
92
93impl Event {
94 /// Creates a new event.
95 pub fn new() -> Self {
96 Self {
97 wait_set: WaitSet::default(),
98 }
99 }
100
101 /// Notify a number of awaiting events that the predicate should be checked.
102 ///
103 /// If less events than requested are currently awaiting, then all awaiting
104 /// event are notified.
105 #[inline(always)]
106 pub fn notify(&self, n: usize) {
107 // This fence synchronizes with the other fence in `WaitUntil::poll` and
108 // ensures that either the `poll` method will successfully check the
109 // predicate set before this call, or the notifier inserted by `poll`
110 // will be visible in the wait list when calling `WaitSet::notify` (or
111 // both).
112 atomic::fence(Ordering::SeqCst);
113
114 // Safety: all notifiers in the wait set are guaranteed to be alive
115 // since the `WaitUntil` drop handler ensures that notifiers are removed
116 // from the wait set before they are deallocated.
117 unsafe {
118 self.wait_set.notify_relaxed(n);
119 }
120 }
121
122 /// Notify one awaiting event (if any) that the predicate should be checked.
123 #[inline(always)]
124 pub fn notify_one(&self) {
125 self.notify(1);
126 }
127
128 /// Notify all awaiting events that the predicate should be checked.
129 #[inline(always)]
130 pub fn notify_all(&self) {
131 self.notify(usize::MAX);
132 }
133
134 /// Returns a future that can be `await`ed until the provided predicate is
135 /// satisfied.
136 pub fn wait_until<F, T>(&self, predicate: F) -> WaitUntil<'_, F, T>
137 where
138 F: FnMut() -> Option<T>,
139 {
140 WaitUntil::new(&self.wait_set, predicate)
141 }
142
143 /// Returns a future that can be `await`ed until the provided predicate is
144 /// satisfied or until the provided future completes.
145 ///
146 /// The deadline is specified as a `Future` that is expected to resolves to
147 /// `()` after some duration, such as a `tokio::time::Sleep` future.
148 pub fn wait_until_or_timeout<F, T, D>(
149 &self,
150 predicate: F,
151 deadline: D,
152 ) -> WaitUntilOrTimeout<'_, F, T, D>
153 where
154 F: FnMut() -> Option<T>,
155 D: Future<Output = ()>,
156 {
157 WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline)
158 }
159}
160
161impl Default for Event {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167unsafe impl Send for Event {}
168unsafe impl Sync for Event {}
169
170/// A waker wrapper that can be inserted in a list.
171///
172/// A notifier always has an exclusive owner or borrower, except in one edge
173/// case: the `WaitSet::remove_relaxed()` method may create a shared reference
174/// while the notifier is concurrently accessed under the `wait_set` mutex by
175/// one of the `WaitSet` methods. So occasionally 2 references to a `Notifier`
176/// will exist at the same time, meaning that even when accessed under the
177/// `wait_set` mutex, a notifier can only be accessed by reference.
178struct Notifier {
179 /// The current waker, if any.
180 waker: Option<Waker>,
181 /// Pointer to the previous wait set notifier.
182 prev: UnsafeCell<Option<NonNull<Notifier>>>,
183 /// Pointer to the next wait set notifier.
184 next: UnsafeCell<Option<NonNull<Notifier>>>,
185 /// Flag indicating whether the notifier is currently in the wait set.
186 in_wait_set: AtomicBool,
187}
188
189impl Notifier {
190 /// Creates a new Notifier without any registered waker.
191 fn new() -> Self {
192 Self {
193 waker: None,
194 prev: UnsafeCell::new(None),
195 next: UnsafeCell::new(None),
196 in_wait_set: AtomicBool::new(false),
197 }
198 }
199
200 /// Stores the specified waker if it differs from the cached waker.
201 fn set_waker(&mut self, waker: &Waker) {
202 if match &self.waker {
203 Some(w) => !w.will_wake(waker),
204 None => true,
205 } {
206 self.waker = Some(waker.clone());
207 }
208 }
209
210 /// Notifies the task.
211 fn wake(&self) {
212 // Safety: the waker is only ever accessed mutably when the notifier is
213 // itself accessed mutably. The caller claims shared (non-mutable)
214 // ownership of the notifier, so there is not possible concurrent
215 // mutable access to the notifier and therefore to the waker.
216 if let Some(w) = &self.waker {
217 w.wake_by_ref();
218 }
219 }
220}
221
222unsafe impl Send for Notifier {}
223unsafe impl Sync for Notifier {}
224
225/// A future that can be `await`ed until a predicate is satisfied.
226#[derive(Debug)]
227pub struct WaitUntil<'a, F: FnMut() -> Option<T>, T> {
228 state: WaitUntilState,
229 predicate: F,
230 wait_set: &'a WaitSet,
231}
232
233impl<'a, F: FnMut() -> Option<T>, T> WaitUntil<'a, F, T> {
234 /// Creates a future associated with the specified event sink that can be
235 /// `await`ed until the specified predicate is satisfied.
236 fn new(wait_set: &'a WaitSet, predicate: F) -> Self {
237 Self {
238 state: WaitUntilState::Idle,
239 predicate,
240 wait_set,
241 }
242 }
243}
244
245impl<F: FnMut() -> Option<T>, T> Drop for WaitUntil<'_, F, T> {
246 fn drop(&mut self) {
247 if let WaitUntilState::Polled(notifier) = self.state {
248 // If we are in the `Polled` stated, it means that the future was
249 // cancelled and its notifier may still be in the wait set: it is
250 // necessary to cancel the notifier so that another event sink can
251 // be notified if one is registered, and then to deallocate the
252 // notifier.
253 //
254 // Safety: all notifiers in the wait set are guaranteed to be alive
255 // since this drop handler ensures that notifiers are removed from
256 // the wait set before they are deallocated. After the notifier is
257 // removed from the list we can claim unique ownership and
258 // deallocate the notifier.
259 unsafe {
260 self.wait_set.cancel(notifier);
261 let _ = Box::from_raw(notifier.as_ptr());
262 }
263 }
264 }
265}
266
267impl<'a, F: FnMut() -> Option<T>, T> Unpin for WaitUntil<'a, F, T> {}
268
269unsafe impl<F: (FnMut() -> Option<T>) + Send, T: Send> Send for WaitUntil<'_, F, T> {}
270
271impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
272 type Output = T;
273
274 #[inline]
275 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
276 assert!(self.state != WaitUntilState::Completed);
277
278 // Remove the notifier if it is in the wait set. In most cases this will
279 // be a cheap no-op because, unless the wake-up is spurious, the
280 // notifier was already removed from the wait set.
281 //
282 // Removing the notifier before checking the predicate is necessary to
283 // avoid races such as this one:
284 //
285 // 1) event sink A unsuccessfully checks the predicate, inserts its
286 // notifier in the wait set, unsuccessfully re-checks the predicate,
287 // returns `Poll::Pending`,
288 // 2) event sink B unsuccessfully checks the predicate, inserts its
289 // notifier in the wait set, unsuccessfully re-checks the predicate,
290 // returns `Poll::Pending`,
291 // 3) the event source makes one predicate satisfiable,
292 // 4) event sink A is spuriously awaken and successfully checks the
293 // predicates, returns `Poll::Ready`,
294 // 5) the event source notifies event sink B,
295 // 6) event sink B is awaken and unsuccessfully checks the predicate,
296 // inserts its notifier in the wait set, unsuccessfully re-checks the
297 // predicate, returns `Poll::Pending`,
298 // 7) the event source makes another predicate satisfiable.
299 // 8) if now the notifier of event sink A was not removed from the wait
300 // set, the event source may notify event sink A (which is no longer
301 // interested) rather than event sink B, meaning that event sink B
302 // will never be notified.
303 if let WaitUntilState::Polled(notifier) = self.state {
304 // Safety: all notifiers in the wait set are guaranteed to be alive
305 // since the `WaitUntil` drop handler ensures that notifiers are
306 // removed from the wait set before they are deallocated. Using the
307 // relaxed version of `notify` is enough since the notifier was
308 // inserted in the same future so there exists a happen-before
309 // relationship with the insertion operation.
310 unsafe { self.wait_set.remove_relaxed(notifier) };
311 }
312
313 // Fast path.
314 if let Some(v) = (self.predicate)() {
315 if let WaitUntilState::Polled(notifier) = self.state {
316 // Safety: the notifier is no longer in the wait set so we can
317 // claim unique ownership and deallocate the notifier.
318 let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
319 }
320
321 self.state = WaitUntilState::Completed;
322
323 return Poll::Ready(v);
324 }
325
326 let mut notifier = if let WaitUntilState::Polled(notifier) = self.state {
327 notifier
328 } else {
329 unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(Notifier::new()))) }
330 };
331
332 // Set or update the notifier.
333 //
334 // Safety: the notifier is not (or no longer) in the wait list so we
335 // have exclusive ownership.
336 let waker = cx.waker();
337 unsafe { notifier.as_mut().set_waker(waker) };
338
339 // Safety: all notifiers in the wait set are guaranteed to be alive
340 // since the `WaitUntil` drop handler ensures that notifiers are removed
341 // from the wait set before they are deallocated.
342 unsafe { self.wait_set.insert(notifier) };
343
344 // This fence synchronizes with the other fence in `Event::notify` and
345 // ensures that either the predicate below will be satisfied or the
346 // event source will see the notifier inserted above in the wait list
347 // after it makes the predicate satisfiable (or both).
348 atomic::fence(Ordering::SeqCst);
349
350 if let Some(v) = (self.predicate)() {
351 // We need to cancel and not merely remove the notifier from the
352 // wait set so that another event sink can be notified in case we
353 // have been notified just after checking the predicate. This is an
354 // example of race that makes this necessary:
355 //
356 // 1) event sink A and event sink B both unsuccessfully check the
357 // predicate,
358 // 2) the event source makes one predicate satisfiable and tries to
359 // notify an event sink but fails since no notifier has been
360 // inserted in the wait set yet,
361 // 3) event sink A and event sink B both insert their notifier in
362 // the wait set,
363 // 4) event sink A re-checks the predicate, successfully,
364 // 5) event sink B re-checks the predicate, unsuccessfully,
365 // 6) the event source makes another predicate satisfiable,
366 // 7) the event source sends a notification for the second predicate
367 // but unfortunately chooses the "wrong" notifier in the wait
368 // set, i.e. that of event sink A -- note that this is always
369 // possible irrespective of FIFO or LIFO ordering because it also
370 // depends on the order of notifier insertion in step 3)
371 // 8) if, before returning, event sink A merely removes itself from
372 // the wait set without notifying another event sink, then event
373 // sink B will never be notified.
374 //
375 // Safety: all notifiers in the wait set are guaranteed to be alive
376 // since the `WaitUntil` drop handler ensures that notifiers are
377 // removed from the wait set before they are deallocated.
378 unsafe {
379 self.wait_set.cancel(notifier);
380 }
381
382 self.state = WaitUntilState::Completed;
383
384 // Safety: the notifier is not longer in the wait set so we can
385 // claim unique ownership and deallocate the notifier.
386 let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
387
388 return Poll::Ready(v);
389 }
390
391 self.state = WaitUntilState::Polled(notifier);
392
393 Poll::Pending
394 }
395}
396
397/// State of the `WaitUntil` future.
398#[derive(Debug, PartialEq)]
399enum WaitUntilState {
400 Idle,
401 Polled(NonNull<Notifier>),
402 Completed,
403}
404
405pin_project! {
406 /// A future that can be `await`ed until a predicate is satisfied or until a
407 /// deadline elapses.
408 pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option<T>, T, D: Future<Output = ()>> {
409 wait_until: WaitUntil<'a, F, T>,
410 #[pin]
411 deadline: D,
412 }
413}
414
415impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D>
416where
417 F: FnMut() -> Option<T>,
418 D: Future<Output = ()>,
419{
420 /// Creates a future associated with the specified event sink that can be
421 /// `await`ed until the specified predicate is satisfied, or until the
422 /// specified timeout future completes.
423 fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self {
424 Self {
425 wait_until: WaitUntil::new(wait_set, predicate),
426 deadline,
427 }
428 }
429}
430
431impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D>
432where
433 F: FnMut() -> Option<T>,
434 D: Future<Output = ()>,
435{
436 type Output = Option<T>;
437
438 #[inline]
439 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
440 let this = self.project();
441
442 if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) {
443 Poll::Ready(Some(value))
444 } else if this.deadline.poll(cx).is_ready() {
445 Poll::Ready(None)
446 } else {
447 Poll::Pending
448 }
449 }
450}
451
452/// A set of notifiers.
453///
454/// The set wraps a Mutex-protected list of notifiers and manages a flag for
455/// fast assessment of list emptiness.
456#[derive(Debug)]
457struct WaitSet {
458 list: Mutex<List>,
459 is_empty: AtomicBool,
460}
461
462impl WaitSet {
463 /// Inserts a node in the wait set.
464 ///
465 /// # Safety
466 ///
467 /// The specified notifier and all notifiers in the wait set must be alive.
468 /// The notifier should not be already in the wait set.
469 unsafe fn insert(&self, notifier: NonNull<Notifier>) {
470 let mut list = self.list.lock().unwrap();
471
472 #[cfg(any(debug_assertions, all(test, async_event_loom)))]
473 if unsafe { notifier.as_ref().in_wait_set.load(Ordering::Relaxed) } {
474 drop(list); // avoids poisoning the lock
475 panic!("the notifier was already in the wait set");
476 }
477
478 // Orderings: Relaxed ordering is sufficient since before this point the
479 // notifier was not in the list and therefore not shared.
480 unsafe { notifier.as_ref().in_wait_set.store(true, Ordering::Relaxed) };
481
482 unsafe { list.push_back(notifier) };
483
484 // Ordering: since this flag is only ever mutated within the
485 // mutex-protected critical section, Relaxed ordering is sufficient.
486 self.is_empty.store(false, Ordering::Relaxed);
487 }
488
489 /// Remove the specified notifier if it is still in the wait set.
490 ///
491 /// After a call to `remove`, the caller is guaranteed that the wait set
492 /// will no longer access the specified notifier.
493 ///
494 /// Note that for performance reasons, the presence of the notifier in the
495 /// list is checked without acquiring the lock. This fast check will never
496 /// lead to a notifier staying in the list as long as there exists an
497 /// happens-before relationship between this call and the earlier call to
498 /// `insert`. A happens-before relationship always exists if these calls are
499 /// made on the same thread or across `await` points.
500 ///
501 /// # Safety
502 ///
503 /// The specified notifier and all notifiers in the wait set must be alive.
504 /// This function may fail to remove the notifier if a happens-before
505 /// relationship does not exist with the previous call to `insert`.
506 unsafe fn remove_relaxed(&self, notifier: NonNull<Notifier>) {
507 // Preliminarily check whether the notifier is already in the list (fast
508 // path).
509 //
510 // This is the only instance where the `in_wait_set` flag is accessed
511 // outside the mutex-protected critical section while the notifier may
512 // still be in the list. The only risk is that the load will be stale
513 // and will read `true` even though the notifier is no longer in the
514 // list, but this is not an issue since in that case the actual state
515 // will be checked again after taking the lock.
516 //
517 // Ordering: Acquire synchronizes with the `Release` orderings in the
518 // `notify` and `cancel` methods; it is necessary to ensure that the
519 // waker is no longer in use by the wait set and can therefore be
520 // modified after returning from `remove`.
521 let in_wait_set = unsafe { notifier.as_ref().in_wait_set.load(Ordering::Acquire) };
522 if !in_wait_set {
523 return;
524 }
525
526 unsafe { self.remove(notifier) };
527 }
528
529 /// Remove the specified notifier if it is still in the wait set.
530 ///
531 /// After a call to `remove`, the caller is guaranteed that the wait set
532 /// will no longer access the specified notifier.
533 ///
534 /// # Safety
535 ///
536 /// The specified notifier and all notifiers in the wait set must be alive.
537 unsafe fn remove(&self, notifier: NonNull<Notifier>) {
538 let mut list = self.list.lock().unwrap();
539
540 // Check again whether the notifier is already in the list
541 //
542 // Ordering: since this flag is only ever mutated within the
543 // mutex-protected critical section and since the wait set also accesses
544 // the waker only in the critical section, even with Relaxed ordering it
545 // is guaranteed that if `in_wait_set` reads `false` then the waker is
546 // no longer in use by the wait set.
547 let in_wait_set = unsafe { notifier.as_ref().in_wait_set.load(Ordering::Relaxed) };
548 if !in_wait_set {
549 return;
550 }
551
552 unsafe { list.remove(notifier) };
553 if list.is_empty() {
554 // Ordering: since this flag is only ever mutated within the
555 // mutex-protected critical section, Relaxed ordering is sufficient.
556 self.is_empty.store(true, Ordering::Relaxed);
557 }
558
559 // Ordering: this flag is only ever mutated within the mutex-protected
560 // critical section and since the waker is not accessed in this method,
561 // it does not need to synchronize with a later call to `remove`;
562 // therefore, Relaxed ordering is sufficient.
563 unsafe {
564 notifier
565 .as_ref()
566 .in_wait_set
567 .store(false, Ordering::Relaxed)
568 };
569 }
570
571 /// Remove the specified notifier if it is still in the wait set, otherwise
572 /// notify another event sink.
573 ///
574 /// After a call to `cancel`, the caller is guaranteed that the wait set
575 /// will no longer access the specified notifier.
576 ///
577 /// # Safety
578 ///
579 /// The specified notifier and all notifiers in the wait set must be alive.
580 /// Wakers of notifiers which pointer is in the wait set may not be accessed
581 /// mutably.
582 unsafe fn cancel(&self, notifier: NonNull<Notifier>) {
583 let mut list = self.list.lock().unwrap();
584
585 let in_wait_set = unsafe { notifier.as_ref().in_wait_set.load(Ordering::Relaxed) };
586 if in_wait_set {
587 unsafe { list.remove(notifier) };
588 if list.is_empty() {
589 self.is_empty.store(true, Ordering::Relaxed);
590 }
591
592 // Ordering: this flag is only ever mutated within the
593 // mutex-protected critical section and since the waker is not
594 // accessed, it does not need to synchronize with the Acquire load
595 // in the `remove` method; therefore, Relaxed ordering is
596 // sufficient.
597 unsafe {
598 notifier
599 .as_ref()
600 .in_wait_set
601 .store(false, Ordering::Relaxed)
602 };
603 } else if let Some(other_notifier) = unsafe { list.pop_front() } {
604 // Safety: the waker can be accessed by reference because the
605 // event sink is not allowed to access the waker mutably before
606 // `in_wait_set` is cleared.
607 unsafe { other_notifier.as_ref().wake() };
608
609 // Ordering: the Release memory ordering synchronizes with the
610 // Acquire ordering in the `remove` method; it is required to
611 // ensure that once `in_wait_set` reads `false` (using Acquire
612 // ordering), the waker is no longer in use by the wait set and
613 // can therefore be modified.
614 unsafe {
615 other_notifier
616 .as_ref()
617 .in_wait_set
618 .store(false, Ordering::Release)
619 };
620 }
621 }
622
623 /// Send a notification to `count` notifiers within the wait set, or to all
624 /// notifiers if the wait set contains less than `count` notifiers.
625 ///
626 /// Note that for performance reasons, list emptiness is checked without
627 /// acquiring the wait set lock. Therefore, in order to prevent the
628 /// possibility that a wait set is seen as empty when it isn't, external
629 /// synchronization is required to make sure that all side effects of a
630 /// previous call to `insert` are fully visible. For instance, an atomic
631 /// memory fence maye be placed before this call and another one after the
632 /// insertion of a notifier.
633 ///
634 /// # Safety
635 ///
636 /// All notifiers in the wait set must be alive. Wakers of notifiers which
637 /// pointer is in the wait set may not be accessed mutably.
638 #[inline(always)]
639 unsafe fn notify_relaxed(&self, count: usize) {
640 let is_empty = self.is_empty.load(Ordering::Relaxed);
641 if is_empty {
642 return;
643 }
644
645 unsafe { self.notify(count) };
646 }
647
648 /// Send a notification to `count` notifiers within the wait set, or to all
649 /// notifiers if the wait set contains less than `count` notifiers.
650 ///
651 /// # Safety
652 ///
653 /// All notifiers in the wait set must be alive. Wakers of notifiers which
654 /// pointer is in the wait set may not be accessed mutably.
655 unsafe fn notify(&self, count: usize) {
656 let mut list = self.list.lock().unwrap();
657 for _ in 0..count {
658 let notifier = {
659 if let Some(notifier) = unsafe { list.pop_front() } {
660 if list.is_empty() {
661 self.is_empty.store(true, Ordering::Relaxed);
662 }
663 notifier
664 } else {
665 return;
666 }
667 };
668
669 // Note: the event sink must be notified before the end of the
670 // mutex-protected critical section. Otherwise, a concurrent call to
671 // `remove` could succeed in taking the lock before the waker has
672 // been called, and seeing that the notifier is no longer in the
673 // list would lead its caller to believe that it has now sole
674 // ownership on the notifier even though the call to `wake` has yet
675 // to be made.
676 //
677 // Safety: the waker can be accessed by reference since the event
678 // sink is not allowed to access the waker mutably before
679 // `in_wait_set` is cleared.
680 unsafe { notifier.as_ref().wake() };
681
682 // Ordering: the Release memory ordering synchronizes with the
683 // Acquire ordering in the `remove` method; it is required to ensure
684 // that once `in_wait_set` reads `false` (using Acquire ordering),
685 // the waker can be safely modified.
686 unsafe {
687 notifier
688 .as_ref()
689 .in_wait_set
690 .store(false, Ordering::Release)
691 };
692 }
693 }
694}
695
696impl Default for WaitSet {
697 fn default() -> Self {
698 Self {
699 list: Default::default(),
700 is_empty: AtomicBool::new(true),
701 }
702 }
703}
704
705#[derive(Default, Debug)]
706struct List {
707 front: Option<NonNull<Notifier>>,
708 back: Option<NonNull<Notifier>>,
709}
710
711impl List {
712 /// Inserts a node at the back of the list.
713 ///
714 /// # Safety
715 ///
716 /// The provided notifier and all notifiers which pointer is in the list
717 /// must be alive.
718 unsafe fn push_back(&mut self, notifier: NonNull<Notifier>) {
719 // Safety: the `prev` and `next` pointers are only be accessed when the
720 // list is locked.
721 let old_back = mem::replace(&mut self.back, Some(notifier));
722 match old_back {
723 None => self.front = Some(notifier),
724 Some(prev) => unsafe { prev.as_ref().next.with_mut(|n| *n = Some(notifier)) },
725 }
726
727 // Link the new notifier.
728 let notifier = unsafe { notifier.as_ref() };
729 notifier.prev.with_mut(|n| unsafe { *n = old_back });
730 notifier.next.with_mut(|n| unsafe { *n = None });
731 }
732
733 /// Removes and returns the notifier at the front of the list, if any.
734 ///
735 /// # Safety
736 ///
737 /// All notifiers which pointer is in the list must be alive.
738 unsafe fn pop_front(&mut self) -> Option<NonNull<Notifier>> {
739 let notifier = self.front?;
740
741 // Unlink from the next notifier.
742 let next = unsafe { notifier.as_ref().next.with(|n| *n) };
743 self.front = next;
744 match next {
745 None => self.back = None,
746 Some(next) => unsafe { next.as_ref().prev.with_mut(|n| *n = None) },
747 }
748
749 Some(notifier)
750 }
751
752 /// Removes the specified notifier.
753 ///
754 /// # Safety
755 ///
756 /// The specified notifier and all notifiers which pointer is in the list
757 /// must be alive.
758 unsafe fn remove(&mut self, notifier: NonNull<Notifier>) {
759 // Unlink from the previous and next notifiers.
760 let prev = unsafe { notifier.as_ref().prev.with(|n| *n) };
761 let next = unsafe { notifier.as_ref().next.with(|n| *n) };
762 match prev {
763 None => self.front = next,
764 Some(prev) => unsafe { prev.as_ref().next.with_mut(|n| *n = next) },
765 }
766 match next {
767 None => self.back = prev,
768 Some(next) => unsafe { next.as_ref().prev.with_mut(|n| *n = prev) },
769 }
770 }
771
772 /// Returns `true` if the list is empty.
773 fn is_empty(&self) -> bool {
774 self.front.is_none()
775 }
776}
777
778/// Non-loom tests.
779#[cfg(all(test, not(async_event_loom)))]
780mod tests {
781 use super::*;
782
783 use std::sync::Arc;
784 use std::sync::atomic::AtomicUsize;
785 use std::thread;
786
787 use crate::future::block_on;
788
789 #[test]
790 fn smoke() {
791 static SIGNAL: AtomicBool = AtomicBool::new(false);
792
793 let event = Arc::new(Event::new());
794
795 let th_recv = {
796 let event = event.clone();
797 thread::spawn(move || {
798 block_on(async move {
799 event
800 .wait_until(|| {
801 if SIGNAL.load(Ordering::Relaxed) {
802 Some(())
803 } else {
804 None
805 }
806 })
807 .await;
808
809 assert!(SIGNAL.load(Ordering::Relaxed));
810 })
811 })
812 };
813
814 SIGNAL.store(true, Ordering::Relaxed);
815 event.notify_one();
816
817 th_recv.join().unwrap();
818 }
819
820 #[test]
821 fn one_to_many() {
822 const RECEIVER_COUNT: usize = 4;
823 static SIGNAL: AtomicBool = AtomicBool::new(false);
824
825 let event = Arc::new(Event::new());
826
827 let th_recv: Vec<_> = (0..RECEIVER_COUNT)
828 .map(|_| {
829 let event = event.clone();
830 thread::spawn(move || {
831 block_on(async move {
832 event
833 .wait_until(|| {
834 if SIGNAL.load(Ordering::Relaxed) {
835 Some(())
836 } else {
837 None
838 }
839 })
840 .await;
841
842 assert!(SIGNAL.load(Ordering::Relaxed));
843 })
844 })
845 })
846 .collect();
847
848 SIGNAL.store(true, Ordering::Relaxed);
849 event.notify_one();
850 event.notify(3);
851
852 for th in th_recv {
853 th.join().unwrap();
854 }
855 }
856
857 #[test]
858 fn many_to_many() {
859 const TOKEN_COUNT: usize = 4;
860 static AVAILABLE_TOKENS: AtomicUsize = AtomicUsize::new(0);
861
862 let event = Arc::new(Event::new());
863
864 // Receive tokens from multiple threads.
865 let th_recv: Vec<_> = (0..TOKEN_COUNT)
866 .map(|_| {
867 let event = event.clone();
868 thread::spawn(move || {
869 block_on(async move {
870 event
871 .wait_until(|| {
872 AVAILABLE_TOKENS
873 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |t| {
874 if t > 0 { Some(t - 1) } else { None }
875 })
876 .ok()
877 })
878 .await;
879 })
880 })
881 })
882 .collect();
883
884 // Make tokens available from multiple threads.
885 let th_send: Vec<_> = (0..TOKEN_COUNT)
886 .map(|_| {
887 let event = event.clone();
888 thread::spawn(move || {
889 AVAILABLE_TOKENS.fetch_add(1, Ordering::Relaxed);
890 event.notify_one();
891 })
892 })
893 .collect();
894
895 for th in th_recv {
896 th.join().unwrap();
897 }
898 for th in th_send {
899 th.join().unwrap();
900 }
901
902 assert!(AVAILABLE_TOKENS.load(Ordering::Relaxed) == 0);
903 }
904
905 #[test]
906 fn notify_all() {
907 const RECEIVER_COUNT: usize = 4;
908 static SIGNAL: AtomicBool = AtomicBool::new(false);
909
910 let event = Arc::new(Event::new());
911
912 let th_recv: Vec<_> = (0..RECEIVER_COUNT)
913 .map(|_| {
914 let event = event.clone();
915 thread::spawn(move || {
916 block_on(async move {
917 event
918 .wait_until(|| {
919 if SIGNAL.load(Ordering::Relaxed) {
920 Some(())
921 } else {
922 None
923 }
924 })
925 .await;
926
927 assert!(SIGNAL.load(Ordering::Relaxed));
928 })
929 })
930 })
931 .collect();
932
933 SIGNAL.store(true, Ordering::Relaxed);
934 event.notify_all();
935
936 for th in th_recv {
937 th.join().unwrap();
938 }
939 }
940}
941
942/// Loom tests.
943#[cfg(all(test, async_event_loom))]
944mod tests {
945 use super::*;
946
947 use std::future::Future;
948 use std::marker::PhantomPinned;
949 use std::task::{Context, Poll};
950
951 use loom::model::Builder;
952 use loom::sync::Arc;
953 use loom::sync::atomic::AtomicUsize;
954 use loom::thread;
955
956 use waker_fn::waker_fn;
957
958 /// A waker factory that accepts notifications from the newest waker only.
959 #[derive(Clone, Default)]
960 struct MultiWaker {
961 state: Arc<AtomicUsize>,
962 }
963
964 impl MultiWaker {
965 /// Clears the notification flag.
966 ///
967 /// This operation has unconditional Relaxed semantic and for this
968 /// reason should be used instead of `take_notification` when the intent
969 /// is only to cancel a notification for book-keeping purposes, e.g. to
970 /// simulate a spurious wake-up, without introducing unwanted
971 /// synchronization.
972 fn clear_notification(&self) {
973 self.state.fetch_and(!1, Ordering::Relaxed);
974 }
975
976 /// Clears the notification flag and returns the former notification
977 /// status.
978 ///
979 /// This operation has Acquire semantic when a notification is indeed
980 /// present, and Relaxed otherwise. It is therefore appropriate to
981 /// simulate a scheduler receiving a notification as it ensures that all
982 /// memory operations preceding the notification of a task are visible.
983 fn take_notification(&self) -> bool {
984 // Clear the notification flag.
985 let mut state = self.state.load(Ordering::Relaxed);
986 loop {
987 let notified_stated = state | 1;
988 let unnotified_stated = state & !1;
989 match self.state.compare_exchange_weak(
990 notified_stated,
991 unnotified_stated,
992 Ordering::Acquire,
993 Ordering::Relaxed,
994 ) {
995 Ok(_) => return true,
996 Err(s) => {
997 state = s;
998 if state == unnotified_stated {
999 return false;
1000 }
1001 }
1002 }
1003 }
1004 }
1005
1006 /// Clears the notification flag and creates a new waker.
1007 fn new_waker(&self) -> Waker {
1008 // Increase the epoch and clear the notification flag.
1009 let mut state = self.state.load(Ordering::Relaxed);
1010 let mut epoch;
1011 loop {
1012 // Increase the epoch by 2.
1013 epoch = (state & !1) + 2;
1014 match self.state.compare_exchange_weak(
1015 state,
1016 epoch,
1017 Ordering::Relaxed,
1018 Ordering::Relaxed,
1019 ) {
1020 Ok(_) => break,
1021 Err(s) => state = s,
1022 }
1023 }
1024
1025 // Create a waker that only notifies if it is the newest waker.
1026 let waker_state = self.state.clone();
1027 waker_fn(move || {
1028 let mut state = waker_state.load(Ordering::Relaxed);
1029 loop {
1030 let new_state = if state & !1 == epoch {
1031 epoch | 1
1032 } else {
1033 break;
1034 };
1035 match waker_state.compare_exchange(
1036 state,
1037 new_state,
1038 Ordering::Release,
1039 Ordering::Relaxed,
1040 ) {
1041 Ok(_) => break,
1042 Err(s) => state = s,
1043 }
1044 }
1045 })
1046 }
1047 }
1048
1049 /// A simple counter that can be used to simulate the availability of a
1050 /// certain number of AVAILABLE_TOKENS. In order to model the weakest possible
1051 /// predicate from the viewpoint of atomic memory ordering, only Relaxed
1052 /// atomic operations are used.
1053 #[derive(Default)]
1054 struct Counter {
1055 count: AtomicUsize,
1056 }
1057
1058 impl Counter {
1059 fn increment(&self) {
1060 self.count.fetch_add(1, Ordering::Relaxed);
1061 }
1062 fn try_decrement(&self) -> Option<()> {
1063 let mut count = self.count.load(Ordering::Relaxed);
1064 loop {
1065 if count == 0 {
1066 return None;
1067 }
1068 match self.count.compare_exchange(
1069 count,
1070 count - 1,
1071 Ordering::Relaxed,
1072 Ordering::Relaxed,
1073 ) {
1074 Ok(_) => return Some(()),
1075 Err(c) => count = c,
1076 }
1077 }
1078 }
1079 }
1080
1081 /// A closure that contains the targets of all references captured by a
1082 /// `WaitUntil` Future.
1083 ///
1084 /// This ugly thing is needed to arbitrarily extend the lifetime of a
1085 /// `WaitUntil` future and thus mimic the behavior of an executor task.
1086 struct WaitUntilClosure {
1087 event: Arc<Event>,
1088 token_counter: Arc<Counter>,
1089 wait_until: Option<Box<dyn Future<Output = ()>>>,
1090 _pin: PhantomPinned,
1091 }
1092
1093 impl WaitUntilClosure {
1094 /// Creates a `WaitUntil` future embedded together with the targets
1095 /// captured by reference.
1096 fn new(event: Arc<Event>, token_counter: Arc<Counter>) -> Pin<Box<Self>> {
1097 let res = Self {
1098 event,
1099 token_counter,
1100 wait_until: None,
1101 _pin: PhantomPinned,
1102 };
1103 let boxed = Box::new(res);
1104
1105 // Artificially extend the lifetimes of the captured references.
1106 let event_ptr = &*boxed.event as *const Event;
1107 let token_counter_ptr = &boxed.token_counter as *const Arc<Counter>;
1108
1109 // Safety: we now commit to never move the closure and to ensure
1110 // that the `WaitUntil` future does not outlive the captured
1111 // references.
1112 let wait_until: Box<dyn Future<Output = _>> = unsafe {
1113 Box::new((*event_ptr).wait_until(move || (*token_counter_ptr).try_decrement()))
1114 };
1115 let mut pinned_box: Pin<Box<WaitUntilClosure>> = boxed.into();
1116
1117 let mut_ref: Pin<&mut Self> = Pin::as_mut(&mut pinned_box);
1118 unsafe {
1119 // This is safe: we are not moving the closure.
1120 Pin::get_unchecked_mut(mut_ref).wait_until = Some(wait_until);
1121 }
1122
1123 pinned_box
1124 }
1125
1126 /// Returns a pinned, type-erased `WaitUntil` future.
1127 fn as_pinned_future(self: Pin<&mut Self>) -> Pin<&mut dyn Future<Output = ()>> {
1128 unsafe { self.map_unchecked_mut(|s| s.wait_until.as_mut().unwrap().as_mut()) }
1129 }
1130 }
1131
1132 impl Drop for WaitUntilClosure {
1133 fn drop(&mut self) {
1134 // Make sure that the `WaitUntil` future does not outlive its
1135 // captured references.
1136 self.wait_until = None;
1137 }
1138 }
1139
1140 /// An enum that registers the final state of a `WaitUntil` future at the
1141 /// completion of a thread.
1142 ///
1143 /// When the future is still in a `Polled` state, this future is moved into
1144 /// the enum so as to extend its lifetime and allow it to be further
1145 /// notified.
1146 #[allow(dead_code)]
1147 enum FutureState {
1148 Completed,
1149 Polled(Pin<Box<WaitUntilClosure>>),
1150 Cancelled,
1151 }
1152
1153 /// Make a certain amount of AVAILABLE_TOKENS available and notify as many waiters
1154 /// among all registered waiters, possibly from several notifier threads.
1155 /// Optionally, it is possible to:
1156 /// - request that `max_spurious_wake` threads will simulate a spurious
1157 /// wake-up if the waiter is polled and returns `Poll::Pending`,
1158 /// - request that `max_cancellations` threads will cancel the waiter if the
1159 /// waiter is polled and returns `Poll::Pending`,
1160 /// - change the waker each time it is polled.
1161 ///
1162 /// Note that the aggregate number of specified cancellations and spurious
1163 /// wake-ups cannot exceed the number of waiters.
1164 fn loom_notify(
1165 token_count: usize,
1166 waiter_count: usize,
1167 notifier_count: usize,
1168 max_spurious_wake: usize,
1169 max_cancellations: usize,
1170 change_waker: bool,
1171 preemption_bound: usize,
1172 ) {
1173 let mut builder = Builder::new();
1174 if builder.preemption_bound.is_none() {
1175 builder.preemption_bound = Some(preemption_bound);
1176 }
1177
1178 builder.check(move || {
1179 let token_counter = Arc::new(Counter::default());
1180 let event = Arc::new(Event::new());
1181
1182 let mut wakers: Vec<MultiWaker> = Vec::new();
1183 wakers.resize_with(waiter_count, Default::default);
1184
1185 let waiter_threads: Vec<_> = wakers
1186 .iter()
1187 .enumerate()
1188 .map(|(i, multi_waker)| {
1189 thread::spawn({
1190 let multi_waker = multi_waker.clone();
1191 let mut wait_until =
1192 WaitUntilClosure::new(event.clone(), token_counter.clone());
1193
1194 move || {
1195 // `max_cancellations` threads will cancel the
1196 // waiter if the waiter returns `Poll::Pending`.
1197 let cancel_waiter = i < max_cancellations;
1198 // `max_spurious_wake` threads will simulate a
1199 // spurious wake-up if the waiter returns
1200 // `Poll::Pending`.
1201 let mut spurious_wake = i >= max_cancellations
1202 && i < (max_cancellations + max_spurious_wake);
1203
1204 let mut waker = multi_waker.new_waker();
1205 loop {
1206 let mut cx = Context::from_waker(&waker);
1207 let poll_state =
1208 wait_until.as_mut().as_pinned_future().poll(&mut cx);
1209
1210 // Return successfully if the predicate was
1211 // checked successfully.
1212 if matches!(poll_state, Poll::Ready(_)) {
1213 return FutureState::Completed;
1214 }
1215
1216 // The future has returned Poll::Pending.
1217 // Depending on the situation, we will either
1218 // cancel the future, return and wait for a
1219 // notification, or poll again.
1220
1221 if cancel_waiter {
1222 // The `wait_until` future is dropped while
1223 // in pending state, which simulates future
1224 // cancellation. Note that the notification
1225 // was intentionally cleared earlier so the
1226 // task will not be counted as a task that
1227 // should eventually succeed.
1228 return FutureState::Cancelled;
1229 }
1230 if spurious_wake {
1231 // Clear the notification, if any.
1232 multi_waker.clear_notification();
1233 } else if !multi_waker.take_notification() {
1234 // The async runtime would normally keep the
1235 // `wait_until` future alive after `poll`
1236 // returns `Pending`. This behavior is
1237 // emulated by returning the `WaitUntil`
1238 // closure from the thread so as to extend
1239 // it lifetime.
1240 return FutureState::Polled(wait_until);
1241 }
1242
1243 // The task was notified or spuriously awaken.
1244 spurious_wake = false;
1245 if change_waker {
1246 waker = multi_waker.new_waker();
1247 }
1248 }
1249 }
1250 })
1251 })
1252 .collect();
1253
1254 // Increment the token count and notify a consumer after each
1255 // increment.
1256 assert!(notifier_count >= 1);
1257 assert!(token_count >= notifier_count);
1258
1259 // Each notifier thread but the last one makes one and only one
1260 // token available.
1261 let notifier_threads: Vec<_> = (0..(notifier_count - 1))
1262 .map(|_| {
1263 let token_counter = token_counter.clone();
1264 let event = event.clone();
1265 thread::spawn(move || {
1266 token_counter.increment();
1267 event.notify(1);
1268 })
1269 })
1270 .collect();
1271
1272 // The last notifier thread completes the number of AVAILABLE_TOKENS as
1273 // needed.
1274 for _ in 0..(token_count - (notifier_count - 1)) {
1275 token_counter.increment();
1276 event.notify(1);
1277 }
1278
1279 // Join the remaining notifier threads.
1280 for th in notifier_threads {
1281 th.join().unwrap();
1282 }
1283
1284 // Join all waiter threads and check which of them have successfully
1285 // checked the predicate. It is important that all `FutureState`
1286 // returned by the threads be kept alive until _all_ threads have
1287 // joined because `FutureState::Polled` items extend the lifetime of
1288 // their future so they can still be notified.
1289 let future_state: Vec<_> = waiter_threads
1290 .into_iter()
1291 .map(|th| th.join().unwrap())
1292 .collect();
1293
1294 // See which threads have successfully completed. It is now OK to drop
1295 // the returned `FutureState`s.
1296 let success: Vec<_> = future_state
1297 .into_iter()
1298 .map(|state| match state {
1299 FutureState::Completed => true,
1300 _ => false,
1301 })
1302 .collect();
1303
1304 // Check which threads have been notified, excluding those which
1305 // future was cancelled.
1306 let notified: Vec<_> = wakers
1307 .iter()
1308 .enumerate()
1309 .map(|(i, test_waker)| {
1310 // Count the notification unless the thread was cancelled
1311 // since in that case the notification would be missed.
1312 test_waker.take_notification() && i >= max_cancellations
1313 })
1314 .collect();
1315
1316 // Count how many threads have either succeeded or have been
1317 // notified.
1318 let actual_aggregate_count =
1319 success
1320 .iter()
1321 .zip(notified.iter())
1322 .fold(0, |count, (&success, ¬ified)| {
1323 if success || notified {
1324 count + 1
1325 } else {
1326 count
1327 }
1328 });
1329
1330 // Compare with the number of event sinks that should eventually succeed.
1331 let min_expected_success_count = token_count.min(waiter_count - max_cancellations);
1332 if actual_aggregate_count < min_expected_success_count {
1333 panic!(
1334 "Successful threads: {:?}; Notified threads: {:?}",
1335 success, notified
1336 );
1337 }
1338 });
1339 }
1340
1341 #[test]
1342 fn loom_two_consumers() {
1343 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1344 loom_notify(2, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1345 }
1346 #[test]
1347 fn loom_two_consumers_spurious() {
1348 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1349 loom_notify(2, 2, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
1350 }
1351 #[test]
1352 fn loom_two_consumers_cancellation() {
1353 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1354 loom_notify(2, 2, 1, 1, 1, false, DEFAULT_PREEMPTION_BOUND);
1355 }
1356 #[test]
1357 fn loom_two_consumers_change_waker() {
1358 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1359 loom_notify(2, 2, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
1360 }
1361 #[test]
1362 fn loom_two_consumers_change_waker_spurious() {
1363 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1364 loom_notify(2, 2, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
1365 }
1366 #[test]
1367 fn loom_two_consumers_change_waker_cancellation() {
1368 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1369 loom_notify(1, 2, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
1370 }
1371 #[test]
1372 fn loom_two_consumers_change_waker_spurious_cancellation() {
1373 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1374 loom_notify(2, 2, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
1375 }
1376 #[test]
1377 fn loom_two_consumers_three_tokens() {
1378 const DEFAULT_PREEMPTION_BOUND: usize = 3;
1379 loom_notify(3, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1380 }
1381 #[test]
1382 fn loom_three_consumers() {
1383 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1384 loom_notify(3, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1385 }
1386 #[test]
1387 fn loom_three_consumers_spurious() {
1388 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1389 loom_notify(3, 3, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
1390 }
1391 #[test]
1392 fn loom_three_consumers_cancellation() {
1393 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1394 loom_notify(2, 3, 1, 0, 1, false, DEFAULT_PREEMPTION_BOUND);
1395 }
1396 #[test]
1397 fn loom_three_consumers_change_waker() {
1398 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1399 loom_notify(3, 3, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
1400 }
1401 #[test]
1402 fn loom_three_consumers_change_waker_spurious() {
1403 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1404 loom_notify(3, 3, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
1405 }
1406 #[test]
1407 fn loom_three_consumers_change_waker_cancellation() {
1408 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1409 loom_notify(3, 3, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
1410 }
1411 #[test]
1412 fn loom_three_consumers_change_waker_spurious_cancellation() {
1413 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1414 loom_notify(3, 3, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
1415 }
1416 #[test]
1417 fn loom_three_consumers_two_tokens() {
1418 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1419 loom_notify(2, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1420 }
1421 #[test]
1422 fn loom_two_consumers_two_notifiers() {
1423 const DEFAULT_PREEMPTION_BOUND: usize = 3;
1424 loom_notify(2, 2, 2, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1425 }
1426 #[test]
1427 fn loom_one_consumer_three_notifiers() {
1428 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1429 loom_notify(3, 1, 3, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1430 }
1431}