async_event/lib.rs
1//! An efficient async condition variable for lock-free algorithms, a.k.a.
2//! "eventcount".
3//!
4//! [Eventcount][eventcount]-like primitives are useful to make some operations
5//! on a lock-free structure blocking, for instance to transform bounded queues
6//! into bounded channels. Such a primitive allows an interested task to block
7//! until a predicate is satisfied by checking the predicate each time it
8//! receives a notification.
9//!
10//! While functionally similar to the [event_listener] crate, this
11//! implementation is more opinionated and limited to the `async` case. It
12//! strives to be more efficient, however, by limiting the amount of locking
13//! operations on the mutex-protected list of notifiers: the lock is typically
14//! taken only once for each time a waiter is blocked and once for notifying,
15//! thus reducing the need for synchronization operations. Finally, spurious
16//! wake-ups are only generated in very rare circumstances.
17//!
18//! This library is an offshoot of [Asynchronix][asynchronix], an ongoing effort
19//! at a high performance asynchronous computation framework for system
20//! simulation.
21//!
22//! [event_listener]: https://docs.rs/event_listener/latest/event_listener/
23//! [eventcount]:
24//! https://www.1024cores.net/home/lock-free-algorithms/eventcounts
25//! [asynchronix]: https://github.com/asynchronics/asynchronix
26//!
27//! # Examples
28//!
29//! Wait until a non-zero value has been sent asynchronously.
30//!
31//! ```
32//! use std::sync::atomic::{AtomicUsize, Ordering};
33//! use std::sync::Arc;
34//! use std::thread;
35//!
36//! use futures_executor::block_on;
37//!
38//! use async_event::Event;
39//!
40//!
41//! let value = Arc::new(AtomicUsize::new(0));
42//! let event = Arc::new(Event::new());
43//!
44//! // Set a non-zero value concurrently.
45//! thread::spawn({
46//! let value = value.clone();
47//! let event = event.clone();
48//!
49//! move || {
50//! // A relaxed store is sufficient here: `Event::notify*` methods insert
51//! // atomic fences to warrant adequate synchronization.
52//! value.store(42, Ordering::Relaxed);
53//! event.notify_one();
54//! }
55//! });
56//!
57//! // Wait until the value is set.
58//! block_on(async move {
59//! let v = event
60//! .wait_until(|| {
61//! // A relaxed load is sufficient here: `Event::wait_until` inserts
62//! // atomic fences to warrant adequate synchronization.
63//! let v = value.load(Ordering::Relaxed);
64//! if v != 0 { Some(v) } else { None }
65//! })
66//! .await;
67//!
68//! assert_eq!(v, 42);
69//! });
70//! ```
71mod loom_exports;
72
73use std::future::Future;
74use std::mem;
75use std::pin::Pin;
76use std::ptr::NonNull;
77use std::sync::atomic::Ordering;
78use std::task::{Context, Poll, Waker};
79
80use loom_exports::cell::UnsafeCell;
81use loom_exports::sync::atomic::{self, AtomicBool};
82use loom_exports::sync::Mutex;
83use pin_project_lite::pin_project;
84
85/// An object that can receive or send notifications.
86pub struct Event {
87 wait_set: WaitSet,
88}
89
90impl Event {
91 /// Creates a new event.
92 pub fn new() -> Self {
93 Self {
94 wait_set: WaitSet::default(),
95 }
96 }
97
98 /// Notify a number of awaiting events that the predicate should be checked.
99 ///
100 /// If less events than requested are currently awaiting, then all awaiting
101 /// event are notified.
102 #[inline(always)]
103 pub fn notify(&self, n: usize) {
104 // This fence synchronizes with the other fence in `WaitUntil::poll` and
105 // ensures that either the `poll` method will successfully check the
106 // predicate set before this call, or the notifier inserted by `poll`
107 // will be visible in the wait list when calling `WaitSet::notify` (or
108 // both).
109 atomic::fence(Ordering::SeqCst);
110
111 // Safety: all notifiers in the wait set are guaranteed to be alive
112 // since the `WaitUntil` drop handler ensures that notifiers are removed
113 // from the wait set before they are deallocated.
114 unsafe {
115 self.wait_set.notify_relaxed(n);
116 }
117 }
118
119 /// Notify one awaiting event (if any) that the predicate should be checked.
120 #[inline(always)]
121 pub fn notify_one(&self) {
122 self.notify(1);
123 }
124
125 /// Notify all awaiting events that the predicate should be checked.
126 #[inline(always)]
127 pub fn notify_all(&self) {
128 self.notify(usize::MAX);
129 }
130
131 /// Returns a future that can be `await`ed until the provided predicate is
132 /// satisfied.
133 pub fn wait_until<F, T>(&self, predicate: F) -> WaitUntil<F, T>
134 where
135 F: FnMut() -> Option<T>,
136 {
137 WaitUntil::new(&self.wait_set, predicate)
138 }
139
140 /// Returns a future that can be `await`ed until the provided predicate is
141 /// satisfied or until the provided future completes.
142 ///
143 /// The deadline is specified as a `Future` that is expected to resolves to
144 /// `()` after some duration, such as a `tokio::time::Sleep` future.
145 pub fn wait_until_or_timeout<F, T, D>(
146 &self,
147 predicate: F,
148 deadline: D,
149 ) -> WaitUntilOrTimeout<F, T, D>
150 where
151 F: FnMut() -> Option<T>,
152 D: Future<Output = ()>,
153 {
154 WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline)
155 }
156}
157
158impl Default for Event {
159 fn default() -> Self {
160 Self::new()
161 }
162}
163
164unsafe impl Send for Event {}
165unsafe impl Sync for Event {}
166
167/// A waker wrapper that can be inserted in a list.
168///
169/// A notifier always has an exclusive owner or borrower, except in one edge
170/// case: the `WaitSet::remove_relaxed()` method may create a shared reference
171/// while the notifier is concurrently accessed under the `wait_set` mutex by
172/// one of the `WaitSet` methods. So occasionally 2 references to a `Notifier`
173/// will exist at the same time, meaning that even when accessed under the
174/// `wait_set` mutex, a notifier can only be accessed by reference.
175struct Notifier {
176 /// The current waker, if any.
177 waker: Option<Waker>,
178 /// Pointer to the previous wait set notifier.
179 prev: UnsafeCell<Option<NonNull<Notifier>>>,
180 /// Pointer to the next wait set notifier.
181 next: UnsafeCell<Option<NonNull<Notifier>>>,
182 /// Flag indicating whether the notifier is currently in the wait set.
183 in_wait_set: AtomicBool,
184}
185
186impl Notifier {
187 /// Creates a new Notifier without any registered waker.
188 fn new() -> Self {
189 Self {
190 waker: None,
191 prev: UnsafeCell::new(None),
192 next: UnsafeCell::new(None),
193 in_wait_set: AtomicBool::new(false),
194 }
195 }
196
197 /// Stores the specified waker if it differs from the cached waker.
198 fn set_waker(&mut self, waker: &Waker) {
199 if match &self.waker {
200 Some(w) => !w.will_wake(waker),
201 None => true,
202 } {
203 self.waker = Some(waker.clone());
204 }
205 }
206
207 /// Notifies the task.
208 fn wake(&self) {
209 // Safety: the waker is only ever accessed mutably when the notifier is
210 // itself accessed mutably. The caller claims shared (non-mutable)
211 // ownership of the notifier, so there is not possible concurrent
212 // mutable access to the notifier and therefore to the waker.
213 if let Some(w) = &self.waker {
214 w.wake_by_ref();
215 }
216 }
217}
218
219unsafe impl Send for Notifier {}
220unsafe impl Sync for Notifier {}
221
222/// A future that can be `await`ed until a predicate is satisfied.
223pub struct WaitUntil<'a, F: FnMut() -> Option<T>, T> {
224 state: WaitUntilState,
225 predicate: F,
226 wait_set: &'a WaitSet,
227}
228
229impl<'a, F: FnMut() -> Option<T>, T> WaitUntil<'a, F, T> {
230 /// Creates a future associated with the specified event sink that can be
231 /// `await`ed until the specified predicate is satisfied.
232 fn new(wait_set: &'a WaitSet, predicate: F) -> Self {
233 Self {
234 state: WaitUntilState::Idle,
235 predicate,
236 wait_set,
237 }
238 }
239}
240
241impl<F: FnMut() -> Option<T>, T> Drop for WaitUntil<'_, F, T> {
242 fn drop(&mut self) {
243 if let WaitUntilState::Polled(notifier) = self.state {
244 // If we are in the `Polled` stated, it means that the future was
245 // cancelled and its notifier may still be in the wait set: it is
246 // necessary to cancel the notifier so that another event sink can
247 // be notified if one is registered, and then to deallocate the
248 // notifier.
249 //
250 // Safety: all notifiers in the wait set are guaranteed to be alive
251 // since this drop handler ensures that notifiers are removed from
252 // the wait set before they are deallocated. After the notifier is
253 // removed from the list we can claim unique ownership and
254 // deallocate the notifier.
255 unsafe {
256 self.wait_set.cancel(notifier);
257 let _ = Box::from_raw(notifier.as_ptr());
258 }
259 }
260 }
261}
262
263impl<'a, F: FnMut() -> Option<T>, T> Unpin for WaitUntil<'a, F, T> {}
264
265unsafe impl<F: (FnMut() -> Option<T>) + Send, T: Send> Send for WaitUntil<'_, F, T> {}
266
267impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
268 type Output = T;
269
270 #[inline]
271 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
272 assert!(self.state != WaitUntilState::Completed);
273
274 // Remove the notifier if it is in the wait set. In most cases this will
275 // be a cheap no-op because, unless the wake-up is spurious, the
276 // notifier was already removed from the wait set.
277 //
278 // Removing the notifier before checking the predicate is necessary to
279 // avoid races such as this one:
280 //
281 // 1) event sink A unsuccessfully checks the predicate, inserts its
282 // notifier in the wait set, unsuccessfully re-checks the predicate,
283 // returns `Poll::Pending`,
284 // 2) event sink B unsuccessfully checks the predicate, inserts its
285 // notifier in the wait set, unsuccessfully re-checks the predicate,
286 // returns `Poll::Pending`,
287 // 3) the event source makes one predicate satisfiable,
288 // 4) event sink A is spuriously awaken and successfully checks the
289 // predicates, returns `Poll::Ready`,
290 // 5) the event source notifies event sink B,
291 // 6) event sink B is awaken and unsuccessfully checks the predicate,
292 // inserts its notifier in the wait set, unsuccessfully re-checks the
293 // predicate, returns `Poll::Pending`,
294 // 7) the event source makes another predicate satisfiable.
295 // 8) if now the notifier of event sink A was not removed from the wait
296 // set, the event source may notify event sink A (which is no longer
297 // interested) rather than event sink B, meaning that event sink B
298 // will never be notified.
299 if let WaitUntilState::Polled(notifier) = self.state {
300 // Safety: all notifiers in the wait set are guaranteed to be alive
301 // since the `WaitUntil` drop handler ensures that notifiers are
302 // removed from the wait set before they are deallocated. Using the
303 // relaxed version of `notify` is enough since the notifier was
304 // inserted in the same future so there exists a happen-before
305 // relationship with the insertion operation.
306 unsafe { self.wait_set.remove_relaxed(notifier) };
307 }
308
309 // Fast path.
310 if let Some(v) = (self.predicate)() {
311 if let WaitUntilState::Polled(notifier) = self.state {
312 // Safety: the notifier is no longer in the wait set so we can
313 // claim unique ownership and deallocate the notifier.
314 let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
315 }
316
317 self.state = WaitUntilState::Completed;
318
319 return Poll::Ready(v);
320 }
321
322 let mut notifier = if let WaitUntilState::Polled(notifier) = self.state {
323 notifier
324 } else {
325 unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(Notifier::new()))) }
326 };
327
328 // Set or update the notifier.
329 //
330 // Safety: the notifier is not (or no longer) in the wait list so we
331 // have exclusive ownership.
332 let waker = cx.waker();
333 unsafe { notifier.as_mut().set_waker(waker) };
334
335 // Safety: all notifiers in the wait set are guaranteed to be alive
336 // since the `WaitUntil` drop handler ensures that notifiers are removed
337 // from the wait set before they are deallocated.
338 unsafe { self.wait_set.insert(notifier) };
339
340 // This fence synchronizes with the other fence in `Event::notify` and
341 // ensures that either the predicate below will be satisfied or the
342 // event source will see the notifier inserted above in the wait list
343 // after it makes the predicate satisfiable (or both).
344 atomic::fence(Ordering::SeqCst);
345
346 if let Some(v) = (self.predicate)() {
347 // We need to cancel and not merely remove the notifier from the
348 // wait set so that another event sink can be notified in case we
349 // have been notified just after checking the predicate. This is an
350 // example of race that makes this necessary:
351 //
352 // 1) event sink A and event sink B both unsuccessfully check the
353 // predicate,
354 // 2) the event source makes one predicate satisfiable and tries to
355 // notify an event sink but fails since no notifier has been
356 // inserted in the wait set yet,
357 // 3) event sink A and event sink B both insert their notifier in
358 // the wait set,
359 // 4) event sink A re-checks the predicate, successfully,
360 // 5) event sink B re-checks the predicate, unsuccessfully,
361 // 6) the event source makes another predicate satisfiable,
362 // 7) the event source sends a notification for the second predicate
363 // but unfortunately chooses the "wrong" notifier in the wait
364 // set, i.e. that of event sink A -- note that this is always
365 // possible irrespective of FIFO or LIFO ordering because it also
366 // depends on the order of notifier insertion in step 3)
367 // 8) if, before returning, event sink A merely removes itself from
368 // the wait set without notifying another event sink, then event
369 // sink B will never be notified.
370 //
371 // Safety: all notifiers in the wait set are guaranteed to be alive
372 // since the `WaitUntil` drop handler ensures that notifiers are
373 // removed from the wait set before they are deallocated.
374 unsafe {
375 self.wait_set.cancel(notifier);
376 }
377
378 self.state = WaitUntilState::Completed;
379
380 // Safety: the notifier is not longer in the wait set so we can
381 // claim unique ownership and deallocate the notifier.
382 let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
383
384 return Poll::Ready(v);
385 }
386
387 self.state = WaitUntilState::Polled(notifier);
388
389 Poll::Pending
390 }
391}
392
393/// State of the `WaitUntil` future.
394#[derive(PartialEq)]
395enum WaitUntilState {
396 Idle,
397 Polled(NonNull<Notifier>),
398 Completed,
399}
400
401pin_project! {
402 /// A future that can be `await`ed until a predicate is satisfied or until a
403 /// deadline elapses.
404 pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option<T>, T, D: Future<Output = ()>> {
405 wait_until: WaitUntil<'a, F, T>,
406 #[pin]
407 deadline: D,
408 }
409}
410
411impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D>
412where
413 F: FnMut() -> Option<T>,
414 D: Future<Output = ()>,
415{
416 /// Creates a future associated with the specified event sink that can be
417 /// `await`ed until the specified predicate is satisfied, or until the
418 /// specified timeout future completes.
419 fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self {
420 Self {
421 wait_until: WaitUntil::new(wait_set, predicate),
422 deadline,
423 }
424 }
425}
426
427impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D>
428where
429 F: FnMut() -> Option<T>,
430 D: Future<Output = ()>,
431{
432 type Output = Option<T>;
433
434 #[inline]
435 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436 let this = self.project();
437
438 if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) {
439 Poll::Ready(Some(value))
440 } else if this.deadline.poll(cx).is_ready() {
441 Poll::Ready(None)
442 } else {
443 Poll::Pending
444 }
445 }
446}
447
448/// A set of notifiers.
449///
450/// The set wraps a Mutex-protected list of notifiers and manages a flag for
451/// fast assessment of list emptiness.
452struct WaitSet {
453 list: Mutex<List>,
454 is_empty: AtomicBool,
455}
456
457impl WaitSet {
458 /// Inserts a node in the wait set.
459 ///
460 /// # Safety
461 ///
462 /// The specified notifier and all notifiers in the wait set must be alive.
463 /// The notifier should not be already in the wait set.
464 unsafe fn insert(&self, notifier: NonNull<Notifier>) {
465 let mut list = self.list.lock().unwrap();
466
467 #[cfg(any(debug_assertions, async_event_loom))]
468 if notifier.as_ref().in_wait_set.load(Ordering::Relaxed) {
469 drop(list); // avoids poisoning the lock
470 panic!("the notifier was already in the wait set");
471 }
472
473 // Orderings: Relaxed ordering is sufficient since before this point the
474 // notifier was not in the list and therefore not shared.
475 notifier.as_ref().in_wait_set.store(true, Ordering::Relaxed);
476
477 list.push_back(notifier);
478
479 // Ordering: since this flag is only ever mutated within the
480 // mutex-protected critical section, Relaxed ordering is sufficient.
481 self.is_empty.store(false, Ordering::Relaxed);
482 }
483
484 /// Remove the specified notifier if it is still in the wait set.
485 ///
486 /// After a call to `remove`, the caller is guaranteed that the wait set
487 /// will no longer access the specified notifier.
488 ///
489 /// Note that for performance reasons, the presence of the notifier in the
490 /// list is checked without acquiring the lock. This fast check will never
491 /// lead to a notifier staying in the list as long as there exists an
492 /// happens-before relationship between this call and the earlier call to
493 /// `insert`. A happens-before relationship always exists if these calls are
494 /// made on the same thread or across `await` points.
495 ///
496 /// # Safety
497 ///
498 /// The specified notifier and all notifiers in the wait set must be alive.
499 /// This function may fail to remove the notifier if a happens-before
500 /// relationship does not exist with the previous call to `insert`.
501 unsafe fn remove_relaxed(&self, notifier: NonNull<Notifier>) {
502 // Preliminarily check whether the notifier is already in the list (fast
503 // path).
504 //
505 // This is the only instance where the `in_wait_set` flag is accessed
506 // outside the mutex-protected critical section while the notifier may
507 // still be in the list. The only risk is that the load will be stale
508 // and will read `true` even though the notifier is no longer in the
509 // list, but this is not an issue since in that case the actual state
510 // will be checked again after taking the lock.
511 //
512 // Ordering: Acquire synchronizes with the `Release` orderings in the
513 // `notify` and `cancel` methods; it is necessary to ensure that the
514 // waker is no longer in use by the wait set and can therefore be
515 // modified after returning from `remove`.
516 let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Acquire);
517 if !in_wait_set {
518 return;
519 }
520
521 self.remove(notifier);
522 }
523
524 /// Remove the specified notifier if it is still in the wait set.
525 ///
526 /// After a call to `remove`, the caller is guaranteed that the wait set
527 /// will no longer access the specified notifier.
528 ///
529 /// # Safety
530 ///
531 /// The specified notifier and all notifiers in the wait set must be alive.
532 unsafe fn remove(&self, notifier: NonNull<Notifier>) {
533 let mut list = self.list.lock().unwrap();
534
535 // Check again whether the notifier is already in the list
536 //
537 // Ordering: since this flag is only ever mutated within the
538 // mutex-protected critical section and since the wait set also accesses
539 // the waker only in the critical section, even with Relaxed ordering it
540 // is guaranteed that if `in_wait_set` reads `false` then the waker is
541 // no longer in use by the wait set.
542 let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Relaxed);
543 if !in_wait_set {
544 return;
545 }
546
547 list.remove(notifier);
548 if list.is_empty() {
549 // Ordering: since this flag is only ever mutated within the
550 // mutex-protected critical section, Relaxed ordering is sufficient.
551 self.is_empty.store(true, Ordering::Relaxed);
552 }
553
554 // Ordering: this flag is only ever mutated within the mutex-protected
555 // critical section and since the waker is not accessed in this method,
556 // it does not need to synchronize with a later call to `remove`;
557 // therefore, Relaxed ordering is sufficient.
558 notifier
559 .as_ref()
560 .in_wait_set
561 .store(false, Ordering::Relaxed);
562 }
563
564 /// Remove the specified notifier if it is still in the wait set, otherwise
565 /// notify another event sink.
566 ///
567 /// After a call to `cancel`, the caller is guaranteed that the wait set
568 /// will no longer access the specified notifier.
569 ///
570 /// # Safety
571 ///
572 /// The specified notifier and all notifiers in the wait set must be alive.
573 /// Wakers of notifiers which pointer is in the wait set may not be accessed
574 /// mutably.
575 unsafe fn cancel(&self, notifier: NonNull<Notifier>) {
576 let mut list = self.list.lock().unwrap();
577
578 let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Relaxed);
579 if in_wait_set {
580 list.remove(notifier);
581 if list.is_empty() {
582 self.is_empty.store(true, Ordering::Relaxed);
583 }
584
585 // Ordering: this flag is only ever mutated within the
586 // mutex-protected critical section and since the waker is not
587 // accessed, it does not need to synchronize with the Acquire load
588 // in the `remove` method; therefore, Relaxed ordering is
589 // sufficient.
590 notifier
591 .as_ref()
592 .in_wait_set
593 .store(false, Ordering::Relaxed);
594 } else if let Some(other_notifier) = list.pop_front() {
595 // Safety: the waker can be accessed by reference because the
596 // event sink is not allowed to access the waker mutably before
597 // `in_wait_set` is cleared.
598 other_notifier.as_ref().wake();
599
600 // Ordering: the Release memory ordering synchronizes with the
601 // Acquire ordering in the `remove` method; it is required to
602 // ensure that once `in_wait_set` reads `false` (using Acquire
603 // ordering), the waker is no longer in use by the wait set and
604 // can therefore be modified.
605 other_notifier
606 .as_ref()
607 .in_wait_set
608 .store(false, Ordering::Release);
609 }
610 }
611
612 /// Send a notification to `count` notifiers within the wait set, or to all
613 /// notifiers if the wait set contains less than `count` notifiers.
614 ///
615 /// Note that for performance reasons, list emptiness is checked without
616 /// acquiring the wait set lock. Therefore, in order to prevent the
617 /// possibility that a wait set is seen as empty when it isn't, external
618 /// synchronization is required to make sure that all side effects of a
619 /// previous call to `insert` are fully visible. For instance, an atomic
620 /// memory fence maye be placed before this call and another one after the
621 /// insertion of a notifier.
622 ///
623 /// # Safety
624 ///
625 /// All notifiers in the wait set must be alive. Wakers of notifiers which
626 /// pointer is in the wait set may not be accessed mutably.
627 #[inline(always)]
628 unsafe fn notify_relaxed(&self, count: usize) {
629 let is_empty = self.is_empty.load(Ordering::Relaxed);
630 if is_empty {
631 return;
632 }
633
634 self.notify(count);
635 }
636
637 /// Send a notification to `count` notifiers within the wait set, or to all
638 /// notifiers if the wait set contains less than `count` notifiers.
639 ///
640 /// # Safety
641 ///
642 /// All notifiers in the wait set must be alive. Wakers of notifiers which
643 /// pointer is in the wait set may not be accessed mutably.
644 unsafe fn notify(&self, count: usize) {
645 let mut list = self.list.lock().unwrap();
646 for _ in 0..count {
647 let notifier = {
648 if let Some(notifier) = list.pop_front() {
649 if list.is_empty() {
650 self.is_empty.store(true, Ordering::Relaxed);
651 }
652 notifier
653 } else {
654 return;
655 }
656 };
657
658 // Note: the event sink must be notified before the end of the
659 // mutex-protected critical section. Otherwise, a concurrent call to
660 // `remove` could succeed in taking the lock before the waker has
661 // been called, and seeing that the notifier is no longer in the
662 // list would lead its caller to believe that it has now sole
663 // ownership on the notifier even though the call to `wake` has yet
664 // to be made.
665 //
666 // Safety: the waker can be accessed by reference since the event
667 // sink is not allowed to access the waker mutably before
668 // `in_wait_set` is cleared.
669 notifier.as_ref().wake();
670
671 // Ordering: the Release memory ordering synchronizes with the
672 // Acquire ordering in the `remove` method; it is required to ensure
673 // that once `in_wait_set` reads `false` (using Acquire ordering),
674 // the waker can be safely modified.
675 notifier
676 .as_ref()
677 .in_wait_set
678 .store(false, Ordering::Release);
679 }
680 }
681}
682
683impl Default for WaitSet {
684 fn default() -> Self {
685 Self {
686 list: Default::default(),
687 is_empty: AtomicBool::new(true),
688 }
689 }
690}
691
692#[derive(Default)]
693struct List {
694 front: Option<NonNull<Notifier>>,
695 back: Option<NonNull<Notifier>>,
696}
697
698impl List {
699 /// Inserts a node at the back of the list.
700 ///
701 /// # Safety
702 ///
703 /// The provided notifier and all notifiers which pointer is in the list
704 /// must be alive.
705 unsafe fn push_back(&mut self, notifier: NonNull<Notifier>) {
706 // Safety: the `prev` and `next` pointers are only be accessed when the
707 // list is locked.
708 let old_back = mem::replace(&mut self.back, Some(notifier));
709 match old_back {
710 None => self.front = Some(notifier),
711 Some(prev) => prev.as_ref().next.with_mut(|n| *n = Some(notifier)),
712 }
713
714 // Link the new notifier.
715 let notifier = notifier.as_ref();
716 notifier.prev.with_mut(|n| *n = old_back);
717 notifier.next.with_mut(|n| *n = None);
718 }
719
720 /// Removes and returns the notifier at the front of the list, if any.
721 ///
722 /// # Safety
723 ///
724 /// All notifiers which pointer is in the list must be alive.
725 unsafe fn pop_front(&mut self) -> Option<NonNull<Notifier>> {
726 let notifier = self.front?;
727
728 // Unlink from the next notifier.
729 let next = notifier.as_ref().next.with(|n| *n);
730 self.front = next;
731 match next {
732 None => self.back = None,
733 Some(next) => next.as_ref().prev.with_mut(|n| *n = None),
734 }
735
736 Some(notifier)
737 }
738
739 /// Removes the specified notifier.
740 ///
741 /// # Safety
742 ///
743 /// The specified notifier and all notifiers which pointer is in the list
744 /// must be alive.
745 unsafe fn remove(&mut self, notifier: NonNull<Notifier>) {
746 // Unlink from the previous and next notifiers.
747 let prev = notifier.as_ref().prev.with(|n| *n);
748 let next = notifier.as_ref().next.with(|n| *n);
749 match prev {
750 None => self.front = next,
751 Some(prev) => prev.as_ref().next.with_mut(|n| *n = next),
752 }
753 match next {
754 None => self.back = prev,
755 Some(next) => next.as_ref().prev.with_mut(|n| *n = prev),
756 }
757 }
758
759 /// Returns `true` if the list is empty.
760 fn is_empty(&self) -> bool {
761 self.front.is_none()
762 }
763}
764
765/// Non-loom tests.
766#[cfg(all(test, not(async_event_loom)))]
767mod tests {
768 use super::*;
769
770 use std::sync::atomic::AtomicUsize;
771 use std::sync::Arc;
772 use std::thread;
773
774 use futures_executor::block_on;
775
776 #[test]
777 fn smoke() {
778 static SIGNAL: AtomicBool = AtomicBool::new(false);
779
780 let event = Arc::new(Event::new());
781
782 let th_recv = {
783 let event = event.clone();
784 thread::spawn(move || {
785 block_on(async move {
786 event
787 .wait_until(|| {
788 if SIGNAL.load(Ordering::Relaxed) {
789 Some(())
790 } else {
791 None
792 }
793 })
794 .await;
795
796 assert!(SIGNAL.load(Ordering::Relaxed));
797 })
798 })
799 };
800
801 SIGNAL.store(true, Ordering::Relaxed);
802 event.notify_one();
803
804 th_recv.join().unwrap();
805 }
806
807 #[test]
808 fn one_to_many() {
809 const RECEIVER_COUNT: usize = 4;
810 static SIGNAL: AtomicBool = AtomicBool::new(false);
811
812 let event = Arc::new(Event::new());
813
814 let th_recv: Vec<_> = (0..RECEIVER_COUNT)
815 .map(|_| {
816 let event = event.clone();
817 thread::spawn(move || {
818 block_on(async move {
819 event
820 .wait_until(|| {
821 if SIGNAL.load(Ordering::Relaxed) {
822 Some(())
823 } else {
824 None
825 }
826 })
827 .await;
828
829 assert!(SIGNAL.load(Ordering::Relaxed));
830 })
831 })
832 })
833 .collect();
834
835 SIGNAL.store(true, Ordering::Relaxed);
836 event.notify_one();
837 event.notify(3);
838
839 for th in th_recv {
840 th.join().unwrap();
841 }
842 }
843
844 #[test]
845 fn many_to_many() {
846 const TOKEN_COUNT: usize = 4;
847 static AVAILABLE_TOKENS: AtomicUsize = AtomicUsize::new(0);
848
849 let event = Arc::new(Event::new());
850
851 // Receive tokens from multiple threads.
852 let th_recv: Vec<_> = (0..TOKEN_COUNT)
853 .map(|_| {
854 let event = event.clone();
855 thread::spawn(move || {
856 block_on(async move {
857 event
858 .wait_until(|| {
859 AVAILABLE_TOKENS
860 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |t| {
861 if t > 0 {
862 Some(t - 1)
863 } else {
864 None
865 }
866 })
867 .ok()
868 })
869 .await;
870 })
871 })
872 })
873 .collect();
874
875 // Make tokens available from multiple threads.
876 let th_send: Vec<_> = (0..TOKEN_COUNT)
877 .map(|_| {
878 let event = event.clone();
879 thread::spawn(move || {
880 AVAILABLE_TOKENS.fetch_add(1, Ordering::Relaxed);
881 event.notify_one();
882 })
883 })
884 .collect();
885
886 for th in th_recv {
887 th.join().unwrap();
888 }
889 for th in th_send {
890 th.join().unwrap();
891 }
892
893 assert!(AVAILABLE_TOKENS.load(Ordering::Relaxed) == 0);
894 }
895
896 #[test]
897 fn notify_all() {
898 const RECEIVER_COUNT: usize = 4;
899 static SIGNAL: AtomicBool = AtomicBool::new(false);
900
901 let event = Arc::new(Event::new());
902
903 let th_recv: Vec<_> = (0..RECEIVER_COUNT)
904 .map(|_| {
905 let event = event.clone();
906 thread::spawn(move || {
907 block_on(async move {
908 event
909 .wait_until(|| {
910 if SIGNAL.load(Ordering::Relaxed) {
911 Some(())
912 } else {
913 None
914 }
915 })
916 .await;
917
918 assert!(SIGNAL.load(Ordering::Relaxed));
919 })
920 })
921 })
922 .collect();
923
924 SIGNAL.store(true, Ordering::Relaxed);
925 event.notify_all();
926
927 for th in th_recv {
928 th.join().unwrap();
929 }
930 }
931}
932
933/// Loom tests.
934#[cfg(all(test, async_event_loom))]
935mod tests {
936 use super::*;
937
938 use std::future::Future;
939 use std::marker::PhantomPinned;
940 use std::task::{Context, Poll};
941
942 use loom::model::Builder;
943 use loom::sync::atomic::AtomicUsize;
944 use loom::sync::Arc;
945 use loom::thread;
946
947 use waker_fn::waker_fn;
948
949 /// A waker factory that accepts notifications from the newest waker only.
950 #[derive(Clone, Default)]
951 struct MultiWaker {
952 state: Arc<AtomicUsize>,
953 }
954
955 impl MultiWaker {
956 /// Clears the notification flag.
957 ///
958 /// This operation has unconditional Relaxed semantic and for this
959 /// reason should be used instead of `take_notification` when the intent
960 /// is only to cancel a notification for book-keeping purposes, e.g. to
961 /// simulate a spurious wake-up, without introducing unwanted
962 /// synchronization.
963 fn clear_notification(&self) {
964 self.state.fetch_and(!1, Ordering::Relaxed);
965 }
966
967 /// Clears the notification flag and returns the former notification
968 /// status.
969 ///
970 /// This operation has Acquire semantic when a notification is indeed
971 /// present, and Relaxed otherwise. It is therefore appropriate to
972 /// simulate a scheduler receiving a notification as it ensures that all
973 /// memory operations preceding the notification of a task are visible.
974 fn take_notification(&self) -> bool {
975 // Clear the notification flag.
976 let mut state = self.state.load(Ordering::Relaxed);
977 loop {
978 let notified_stated = state | 1;
979 let unnotified_stated = state & !1;
980 match self.state.compare_exchange_weak(
981 notified_stated,
982 unnotified_stated,
983 Ordering::Acquire,
984 Ordering::Relaxed,
985 ) {
986 Ok(_) => return true,
987 Err(s) => {
988 state = s;
989 if state == unnotified_stated {
990 return false;
991 }
992 }
993 }
994 }
995 }
996
997 /// Clears the notification flag and creates a new waker.
998 fn new_waker(&self) -> Waker {
999 // Increase the epoch and clear the notification flag.
1000 let mut state = self.state.load(Ordering::Relaxed);
1001 let mut epoch;
1002 loop {
1003 // Increase the epoch by 2.
1004 epoch = (state & !1) + 2;
1005 match self.state.compare_exchange_weak(
1006 state,
1007 epoch,
1008 Ordering::Relaxed,
1009 Ordering::Relaxed,
1010 ) {
1011 Ok(_) => break,
1012 Err(s) => state = s,
1013 }
1014 }
1015
1016 // Create a waker that only notifies if it is the newest waker.
1017 let waker_state = self.state.clone();
1018 waker_fn(move || {
1019 let mut state = waker_state.load(Ordering::Relaxed);
1020 loop {
1021 let new_state = if state & !1 == epoch {
1022 epoch | 1
1023 } else {
1024 break;
1025 };
1026 match waker_state.compare_exchange(
1027 state,
1028 new_state,
1029 Ordering::Release,
1030 Ordering::Relaxed,
1031 ) {
1032 Ok(_) => break,
1033 Err(s) => state = s,
1034 }
1035 }
1036 })
1037 }
1038 }
1039
1040 /// A simple counter that can be used to simulate the availability of a
1041 /// certain number of AVAILABLE_TOKENS. In order to model the weakest possible
1042 /// predicate from the viewpoint of atomic memory ordering, only Relaxed
1043 /// atomic operations are used.
1044 #[derive(Default)]
1045 struct Counter {
1046 count: AtomicUsize,
1047 }
1048
1049 impl Counter {
1050 fn increment(&self) {
1051 self.count.fetch_add(1, Ordering::Relaxed);
1052 }
1053 fn try_decrement(&self) -> Option<()> {
1054 let mut count = self.count.load(Ordering::Relaxed);
1055 loop {
1056 if count == 0 {
1057 return None;
1058 }
1059 match self.count.compare_exchange(
1060 count,
1061 count - 1,
1062 Ordering::Relaxed,
1063 Ordering::Relaxed,
1064 ) {
1065 Ok(_) => return Some(()),
1066 Err(c) => count = c,
1067 }
1068 }
1069 }
1070 }
1071
1072 /// A closure that contains the targets of all references captured by a
1073 /// `WaitUntil` Future.
1074 ///
1075 /// This ugly thing is needed to arbitrarily extend the lifetime of a
1076 /// `WaitUntil` future and thus mimic the behavior of an executor task.
1077 struct WaitUntilClosure {
1078 event: Arc<Event>,
1079 token_counter: Arc<Counter>,
1080 wait_until: Option<Box<dyn Future<Output = ()>>>,
1081 _pin: PhantomPinned,
1082 }
1083
1084 impl WaitUntilClosure {
1085 /// Creates a `WaitUntil` future embedded together with the targets
1086 /// captured by reference.
1087 fn new(event: Arc<Event>, token_counter: Arc<Counter>) -> Pin<Box<Self>> {
1088 let res = Self {
1089 event,
1090 token_counter,
1091 wait_until: None,
1092 _pin: PhantomPinned,
1093 };
1094 let boxed = Box::new(res);
1095
1096 // Artificially extend the lifetimes of the captured references.
1097 let event_ptr = &*boxed.event as *const Event;
1098 let token_counter_ptr = &boxed.token_counter as *const Arc<Counter>;
1099
1100 // Safety: we now commit to never move the closure and to ensure
1101 // that the `WaitUntil` future does not outlive the captured
1102 // references.
1103 let wait_until: Box<dyn Future<Output = _>> = unsafe {
1104 Box::new((*event_ptr).wait_until(move || (*token_counter_ptr).try_decrement()))
1105 };
1106 let mut pinned_box: Pin<Box<WaitUntilClosure>> = boxed.into();
1107
1108 let mut_ref: Pin<&mut Self> = Pin::as_mut(&mut pinned_box);
1109 unsafe {
1110 // This is safe: we are not moving the closure.
1111 Pin::get_unchecked_mut(mut_ref).wait_until = Some(wait_until);
1112 }
1113
1114 pinned_box
1115 }
1116
1117 /// Returns a pinned, type-erased `WaitUntil` future.
1118 fn as_pinned_future(self: Pin<&mut Self>) -> Pin<&mut dyn Future<Output = ()>> {
1119 unsafe { self.map_unchecked_mut(|s| s.wait_until.as_mut().unwrap().as_mut()) }
1120 }
1121 }
1122
1123 impl Drop for WaitUntilClosure {
1124 fn drop(&mut self) {
1125 // Make sure that the `WaitUntil` future does not outlive its
1126 // captured references.
1127 self.wait_until = None;
1128 }
1129 }
1130
1131 /// An enum that registers the final state of a `WaitUntil` future at the
1132 /// completion of a thread.
1133 ///
1134 /// When the future is still in a `Polled` state, this future is moved into
1135 /// the enum so as to extend its lifetime and allow it to be further
1136 /// notified.
1137 #[allow(dead_code)]
1138 enum FutureState {
1139 Completed,
1140 Polled(Pin<Box<WaitUntilClosure>>),
1141 Cancelled,
1142 }
1143
1144 /// Make a certain amount of AVAILABLE_TOKENS available and notify as many waiters
1145 /// among all registered waiters, possibly from several notifier threads.
1146 /// Optionally, it is possible to:
1147 /// - request that `max_spurious_wake` threads will simulate a spurious
1148 /// wake-up if the waiter is polled and returns `Poll::Pending`,
1149 /// - request that `max_cancellations` threads will cancel the waiter if the
1150 /// waiter is polled and returns `Poll::Pending`,
1151 /// - change the waker each time it is polled.
1152 ///
1153 /// Note that the aggregate number of specified cancellations and spurious
1154 /// wake-ups cannot exceed the number of waiters.
1155 fn loom_notify(
1156 token_count: usize,
1157 waiter_count: usize,
1158 notifier_count: usize,
1159 max_spurious_wake: usize,
1160 max_cancellations: usize,
1161 change_waker: bool,
1162 preemption_bound: usize,
1163 ) {
1164 let mut builder = Builder::new();
1165 if builder.preemption_bound.is_none() {
1166 builder.preemption_bound = Some(preemption_bound);
1167 }
1168
1169 builder.check(move || {
1170 let token_counter = Arc::new(Counter::default());
1171 let event = Arc::new(Event::new());
1172
1173 let mut wakers: Vec<MultiWaker> = Vec::new();
1174 wakers.resize_with(waiter_count, Default::default);
1175
1176 let waiter_threads: Vec<_> = wakers
1177 .iter()
1178 .enumerate()
1179 .map(|(i, multi_waker)| {
1180 thread::spawn({
1181 let multi_waker = multi_waker.clone();
1182 let mut wait_until =
1183 WaitUntilClosure::new(event.clone(), token_counter.clone());
1184
1185 move || {
1186 // `max_cancellations` threads will cancel the
1187 // waiter if the waiter returns `Poll::Pending`.
1188 let cancel_waiter = i < max_cancellations;
1189 // `max_spurious_wake` threads will simulate a
1190 // spurious wake-up if the waiter returns
1191 // `Poll::Pending`.
1192 let mut spurious_wake = i >= max_cancellations
1193 && i < (max_cancellations + max_spurious_wake);
1194
1195 let mut waker = multi_waker.new_waker();
1196 loop {
1197 let mut cx = Context::from_waker(&waker);
1198 let poll_state =
1199 wait_until.as_mut().as_pinned_future().poll(&mut cx);
1200
1201 // Return successfully if the predicate was
1202 // checked successfully.
1203 if matches!(poll_state, Poll::Ready(_)) {
1204 return FutureState::Completed;
1205 }
1206
1207 // The future has returned Poll::Pending.
1208 // Depending on the situation, we will either
1209 // cancel the future, return and wait for a
1210 // notification, or poll again.
1211
1212 if cancel_waiter {
1213 // The `wait_until` future is dropped while
1214 // in pending state, which simulates future
1215 // cancellation. Note that the notification
1216 // was intentionally cleared earlier so the
1217 // task will not be counted as a task that
1218 // should eventually succeed.
1219 return FutureState::Cancelled;
1220 }
1221 if spurious_wake {
1222 // Clear the notification, if any.
1223 multi_waker.clear_notification();
1224 } else if !multi_waker.take_notification() {
1225 // The async runtime would normally keep the
1226 // `wait_until` future alive after `poll`
1227 // returns `Pending`. This behavior is
1228 // emulated by returning the `WaitUntil`
1229 // closure from the thread so as to extend
1230 // it lifetime.
1231 return FutureState::Polled(wait_until);
1232 }
1233
1234 // The task was notified or spuriously awaken.
1235 spurious_wake = false;
1236 if change_waker {
1237 waker = multi_waker.new_waker();
1238 }
1239 }
1240 }
1241 })
1242 })
1243 .collect();
1244
1245 // Increment the token count and notify a consumer after each
1246 // increment.
1247 assert!(notifier_count >= 1);
1248 assert!(token_count >= notifier_count);
1249
1250 // Each notifier thread but the last one makes one and only one
1251 // token available.
1252 let notifier_threads: Vec<_> = (0..(notifier_count - 1))
1253 .map(|_| {
1254 let token_counter = token_counter.clone();
1255 let event = event.clone();
1256 thread::spawn(move || {
1257 token_counter.increment();
1258 event.notify(1);
1259 })
1260 })
1261 .collect();
1262
1263 // The last notifier thread completes the number of AVAILABLE_TOKENS as
1264 // needed.
1265 for _ in 0..(token_count - (notifier_count - 1)) {
1266 token_counter.increment();
1267 event.notify(1);
1268 }
1269
1270 // Join the remaining notifier threads.
1271 for th in notifier_threads {
1272 th.join().unwrap();
1273 }
1274
1275 // Join all waiter threads and check which of them have successfully
1276 // checked the predicate. It is important that all `FutureState`
1277 // returned by the threads be kept alive until _all_ threads have
1278 // joined because `FutureState::Polled` items extend the lifetime of
1279 // their future so they can still be notified.
1280 let future_state: Vec<_> = waiter_threads
1281 .into_iter()
1282 .map(|th| th.join().unwrap())
1283 .collect();
1284
1285 // See which threads have successfully completed. It is now OK to drop
1286 // the returned `FutureState`s.
1287 let success: Vec<_> = future_state
1288 .into_iter()
1289 .map(|state| match state {
1290 FutureState::Completed => true,
1291 _ => false,
1292 })
1293 .collect();
1294
1295 // Check which threads have been notified, excluding those which
1296 // future was cancelled.
1297 let notified: Vec<_> = wakers
1298 .iter()
1299 .enumerate()
1300 .map(|(i, test_waker)| {
1301 // Count the notification unless the thread was cancelled
1302 // since in that case the notification would be missed.
1303 test_waker.take_notification() && i >= max_cancellations
1304 })
1305 .collect();
1306
1307 // Count how many threads have either succeeded or have been
1308 // notified.
1309 let actual_aggregate_count =
1310 success
1311 .iter()
1312 .zip(notified.iter())
1313 .fold(0, |count, (&success, ¬ified)| {
1314 if success || notified {
1315 count + 1
1316 } else {
1317 count
1318 }
1319 });
1320
1321 // Compare with the number of event sinks that should eventually succeed.
1322 let min_expected_success_count = token_count.min(waiter_count - max_cancellations);
1323 if actual_aggregate_count < min_expected_success_count {
1324 panic!(
1325 "Successful threads: {:?}; Notified threads: {:?}",
1326 success, notified
1327 );
1328 }
1329 });
1330 }
1331
1332 #[test]
1333 fn loom_two_consumers() {
1334 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1335 loom_notify(2, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1336 }
1337 #[test]
1338 fn loom_two_consumers_spurious() {
1339 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1340 loom_notify(2, 2, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
1341 }
1342 #[test]
1343 fn loom_two_consumers_cancellation() {
1344 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1345 loom_notify(2, 2, 1, 1, 1, false, DEFAULT_PREEMPTION_BOUND);
1346 }
1347 #[test]
1348 fn loom_two_consumers_change_waker() {
1349 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1350 loom_notify(2, 2, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
1351 }
1352 #[test]
1353 fn loom_two_consumers_change_waker_spurious() {
1354 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1355 loom_notify(2, 2, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
1356 }
1357 #[test]
1358 fn loom_two_consumers_change_waker_cancellation() {
1359 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1360 loom_notify(1, 2, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
1361 }
1362 #[test]
1363 fn loom_two_consumers_change_waker_spurious_cancellation() {
1364 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1365 loom_notify(2, 2, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
1366 }
1367 #[test]
1368 fn loom_two_consumers_three_tokens() {
1369 const DEFAULT_PREEMPTION_BOUND: usize = 3;
1370 loom_notify(3, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1371 }
1372 #[test]
1373 fn loom_three_consumers() {
1374 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1375 loom_notify(3, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1376 }
1377 #[test]
1378 fn loom_three_consumers_spurious() {
1379 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1380 loom_notify(3, 3, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
1381 }
1382 #[test]
1383 fn loom_three_consumers_cancellation() {
1384 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1385 loom_notify(2, 3, 1, 0, 1, false, DEFAULT_PREEMPTION_BOUND);
1386 }
1387 #[test]
1388 fn loom_three_consumers_change_waker() {
1389 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1390 loom_notify(3, 3, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
1391 }
1392 #[test]
1393 fn loom_three_consumers_change_waker_spurious() {
1394 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1395 loom_notify(3, 3, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
1396 }
1397 #[test]
1398 fn loom_three_consumers_change_waker_cancellation() {
1399 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1400 loom_notify(3, 3, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
1401 }
1402 #[test]
1403 fn loom_three_consumers_change_waker_spurious_cancellation() {
1404 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1405 loom_notify(3, 3, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
1406 }
1407 #[test]
1408 fn loom_three_consumers_two_tokens() {
1409 const DEFAULT_PREEMPTION_BOUND: usize = 2;
1410 loom_notify(2, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1411 }
1412 #[test]
1413 fn loom_two_consumers_two_notifiers() {
1414 const DEFAULT_PREEMPTION_BOUND: usize = 3;
1415 loom_notify(2, 2, 2, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1416 }
1417 #[test]
1418 fn loom_one_consumer_three_notifiers() {
1419 const DEFAULT_PREEMPTION_BOUND: usize = 4;
1420 loom_notify(3, 1, 3, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
1421 }
1422}