mod loom_exports;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll, Waker};
use loom_exports::cell::UnsafeCell;
use loom_exports::sync::atomic::{self, AtomicBool};
use loom_exports::sync::Mutex;
use pin_project_lite::pin_project;
pub struct Event {
wait_set: WaitSet,
}
impl Event {
pub fn new() -> Self {
Self {
wait_set: WaitSet::default(),
}
}
#[inline(always)]
pub fn notify(&self, n: usize) {
atomic::fence(Ordering::SeqCst);
unsafe {
self.wait_set.notify_relaxed(n);
}
}
#[inline(always)]
pub fn notify_one(&self) {
self.notify(1);
}
#[inline(always)]
pub fn notify_all(&self) {
self.notify(usize::MAX);
}
pub fn wait_until<F, T>(&self, predicate: F) -> WaitUntil<F, T>
where
F: FnMut() -> Option<T>,
{
WaitUntil::new(&self.wait_set, predicate)
}
pub fn wait_until_or_timeout<F, T, D>(
&self,
predicate: F,
deadline: D,
) -> WaitUntilOrTimeout<F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline)
}
}
impl Default for Event {
fn default() -> Self {
Self::new()
}
}
unsafe impl Send for Event {}
unsafe impl Sync for Event {}
struct Notifier {
waker: Option<Waker>,
prev: UnsafeCell<Option<NonNull<Notifier>>>,
next: UnsafeCell<Option<NonNull<Notifier>>>,
in_wait_set: AtomicBool,
}
impl Notifier {
fn new() -> Self {
Self {
waker: None,
prev: UnsafeCell::new(None),
next: UnsafeCell::new(None),
in_wait_set: AtomicBool::new(false),
}
}
fn set_waker(&mut self, waker: &Waker) {
if match &self.waker {
Some(w) => !w.will_wake(waker),
None => true,
} {
self.waker = Some(waker.clone());
}
}
fn wake(&self) {
if let Some(w) = &self.waker {
w.wake_by_ref();
}
}
}
unsafe impl Send for Notifier {}
unsafe impl Sync for Notifier {}
pub struct WaitUntil<'a, F: FnMut() -> Option<T>, T> {
state: WaitUntilState,
predicate: F,
wait_set: &'a WaitSet,
}
impl<'a, F: FnMut() -> Option<T>, T> WaitUntil<'a, F, T> {
fn new(wait_set: &'a WaitSet, predicate: F) -> Self {
Self {
state: WaitUntilState::Idle,
predicate,
wait_set,
}
}
}
impl<F: FnMut() -> Option<T>, T> Drop for WaitUntil<'_, F, T> {
fn drop(&mut self) {
if let WaitUntilState::Polled(notifier) = self.state {
unsafe {
self.wait_set.cancel(notifier);
let _ = Box::from_raw(notifier.as_ptr());
}
}
}
}
impl<'a, F: FnMut() -> Option<T>, T> Unpin for WaitUntil<'a, F, T> {}
unsafe impl<F: (FnMut() -> Option<T>) + Send, T: Send> Send for WaitUntil<'_, F, T> {}
impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
type Output = T;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(self.state != WaitUntilState::Completed);
if let WaitUntilState::Polled(notifier) = self.state {
unsafe { self.wait_set.remove_relaxed(notifier) };
}
if let Some(v) = (self.predicate)() {
if let WaitUntilState::Polled(notifier) = self.state {
let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
}
self.state = WaitUntilState::Completed;
return Poll::Ready(v);
}
let mut notifier = if let WaitUntilState::Polled(notifier) = self.state {
notifier
} else {
unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(Notifier::new()))) }
};
let waker = cx.waker();
unsafe { notifier.as_mut().set_waker(waker) };
unsafe { self.wait_set.insert(notifier) };
atomic::fence(Ordering::SeqCst);
if let Some(v) = (self.predicate)() {
unsafe {
self.wait_set.cancel(notifier);
}
self.state = WaitUntilState::Completed;
let _ = unsafe { Box::from_raw(notifier.as_ptr()) };
return Poll::Ready(v);
}
self.state = WaitUntilState::Polled(notifier);
Poll::Pending
}
}
#[derive(PartialEq)]
enum WaitUntilState {
Idle,
Polled(NonNull<Notifier>),
Completed,
}
pin_project! {
pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option<T>, T, D: Future<Output = ()>> {
wait_until: WaitUntil<'a, F, T>,
#[pin]
deadline: D,
}
}
impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self {
Self {
wait_until: WaitUntil::new(wait_set, predicate),
deadline,
}
}
}
impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
type Output = Option<T>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) {
Poll::Ready(Some(value))
} else if this.deadline.poll(cx).is_ready() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
struct WaitSet {
list: Mutex<List>,
is_empty: AtomicBool,
}
impl WaitSet {
unsafe fn insert(&self, notifier: NonNull<Notifier>) {
let mut list = self.list.lock().unwrap();
#[cfg(any(debug_assertions, async_event_loom))]
if notifier.as_ref().in_wait_set.load(Ordering::Relaxed) {
drop(list); panic!("the notifier was already in the wait set");
}
notifier.as_ref().in_wait_set.store(true, Ordering::Relaxed);
list.push_back(notifier);
self.is_empty.store(false, Ordering::Relaxed);
}
unsafe fn remove_relaxed(&self, notifier: NonNull<Notifier>) {
let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Acquire);
if !in_wait_set {
return;
}
self.remove(notifier);
}
unsafe fn remove(&self, notifier: NonNull<Notifier>) {
let mut list = self.list.lock().unwrap();
let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Relaxed);
if !in_wait_set {
return;
}
list.remove(notifier);
if list.is_empty() {
self.is_empty.store(true, Ordering::Relaxed);
}
notifier
.as_ref()
.in_wait_set
.store(false, Ordering::Relaxed);
}
unsafe fn cancel(&self, notifier: NonNull<Notifier>) {
let mut list = self.list.lock().unwrap();
let in_wait_set = notifier.as_ref().in_wait_set.load(Ordering::Relaxed);
if in_wait_set {
list.remove(notifier);
if list.is_empty() {
self.is_empty.store(true, Ordering::Relaxed);
}
notifier
.as_ref()
.in_wait_set
.store(false, Ordering::Relaxed);
} else if let Some(other_notifier) = list.pop_front() {
other_notifier.as_ref().wake();
other_notifier
.as_ref()
.in_wait_set
.store(false, Ordering::Release);
}
}
#[inline(always)]
unsafe fn notify_relaxed(&self, count: usize) {
let is_empty = self.is_empty.load(Ordering::Relaxed);
if is_empty {
return;
}
self.notify(count);
}
unsafe fn notify(&self, count: usize) {
let mut list = self.list.lock().unwrap();
for _ in 0..count {
let notifier = {
if let Some(notifier) = list.pop_front() {
if list.is_empty() {
self.is_empty.store(true, Ordering::Relaxed);
}
notifier
} else {
return;
}
};
notifier.as_ref().wake();
notifier
.as_ref()
.in_wait_set
.store(false, Ordering::Release);
}
}
}
impl Default for WaitSet {
fn default() -> Self {
Self {
list: Default::default(),
is_empty: AtomicBool::new(true),
}
}
}
#[derive(Default)]
struct List {
front: Option<NonNull<Notifier>>,
back: Option<NonNull<Notifier>>,
}
impl List {
unsafe fn push_back(&mut self, notifier: NonNull<Notifier>) {
let old_back = mem::replace(&mut self.back, Some(notifier));
match old_back {
None => self.front = Some(notifier),
Some(prev) => prev.as_ref().next.with_mut(|n| *n = Some(notifier)),
}
let notifier = notifier.as_ref();
notifier.prev.with_mut(|n| *n = old_back);
notifier.next.with_mut(|n| *n = None);
}
unsafe fn pop_front(&mut self) -> Option<NonNull<Notifier>> {
let notifier = self.front?;
let next = notifier.as_ref().next.with(|n| *n);
self.front = next;
match next {
None => self.back = None,
Some(next) => next.as_ref().prev.with_mut(|n| *n = None),
}
Some(notifier)
}
unsafe fn remove(&mut self, notifier: NonNull<Notifier>) {
let prev = notifier.as_ref().prev.with(|n| *n);
let next = notifier.as_ref().next.with(|n| *n);
match prev {
None => self.front = next,
Some(prev) => prev.as_ref().next.with_mut(|n| *n = next),
}
match next {
None => self.back = prev,
Some(next) => next.as_ref().prev.with_mut(|n| *n = prev),
}
}
fn is_empty(&self) -> bool {
self.front.is_none()
}
}
#[cfg(all(test, not(async_event_loom)))]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::thread;
use futures_executor::block_on;
#[test]
fn smoke() {
static SIGNAL: AtomicBool = AtomicBool::new(false);
let event = Arc::new(Event::new());
let th_recv = {
let event = event.clone();
thread::spawn(move || {
block_on(async move {
event
.wait_until(|| {
if SIGNAL.load(Ordering::Relaxed) {
Some(())
} else {
None
}
})
.await;
assert!(SIGNAL.load(Ordering::Relaxed));
})
})
};
SIGNAL.store(true, Ordering::Relaxed);
event.notify_one();
th_recv.join().unwrap();
}
#[test]
fn one_to_many() {
const RECEIVER_COUNT: usize = 4;
static SIGNAL: AtomicBool = AtomicBool::new(false);
let event = Arc::new(Event::new());
let th_recv: Vec<_> = (0..RECEIVER_COUNT)
.map(|_| {
let event = event.clone();
thread::spawn(move || {
block_on(async move {
event
.wait_until(|| {
if SIGNAL.load(Ordering::Relaxed) {
Some(())
} else {
None
}
})
.await;
assert!(SIGNAL.load(Ordering::Relaxed));
})
})
})
.collect();
SIGNAL.store(true, Ordering::Relaxed);
event.notify_one();
event.notify(3);
for th in th_recv {
th.join().unwrap();
}
}
#[test]
fn many_to_many() {
const TOKEN_COUNT: usize = 4;
static AVAILABLE_TOKENS: AtomicUsize = AtomicUsize::new(0);
let event = Arc::new(Event::new());
let th_recv: Vec<_> = (0..TOKEN_COUNT)
.map(|_| {
let event = event.clone();
thread::spawn(move || {
block_on(async move {
event
.wait_until(|| {
AVAILABLE_TOKENS
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |t| {
if t > 0 {
Some(t - 1)
} else {
None
}
})
.ok()
})
.await;
})
})
})
.collect();
let th_send: Vec<_> = (0..TOKEN_COUNT)
.map(|_| {
let event = event.clone();
thread::spawn(move || {
AVAILABLE_TOKENS.fetch_add(1, Ordering::Relaxed);
event.notify_one();
})
})
.collect();
for th in th_recv {
th.join().unwrap();
}
for th in th_send {
th.join().unwrap();
}
assert!(AVAILABLE_TOKENS.load(Ordering::Relaxed) == 0);
}
#[test]
fn notify_all() {
const RECEIVER_COUNT: usize = 4;
static SIGNAL: AtomicBool = AtomicBool::new(false);
let event = Arc::new(Event::new());
let th_recv: Vec<_> = (0..RECEIVER_COUNT)
.map(|_| {
let event = event.clone();
thread::spawn(move || {
block_on(async move {
event
.wait_until(|| {
if SIGNAL.load(Ordering::Relaxed) {
Some(())
} else {
None
}
})
.await;
assert!(SIGNAL.load(Ordering::Relaxed));
})
})
})
.collect();
SIGNAL.store(true, Ordering::Relaxed);
event.notify_all();
for th in th_recv {
th.join().unwrap();
}
}
}
#[cfg(all(test, async_event_loom))]
mod tests {
use super::*;
use std::future::Future;
use std::marker::PhantomPinned;
use std::task::{Context, Poll};
use loom::model::Builder;
use loom::sync::atomic::AtomicUsize;
use loom::sync::Arc;
use loom::thread;
use waker_fn::waker_fn;
#[derive(Clone, Default)]
struct MultiWaker {
state: Arc<AtomicUsize>,
}
impl MultiWaker {
fn clear_notification(&self) {
self.state.fetch_and(!1, Ordering::Relaxed);
}
fn take_notification(&self) -> bool {
let mut state = self.state.load(Ordering::Relaxed);
loop {
let notified_stated = state | 1;
let unnotified_stated = state & !1;
match self.state.compare_exchange_weak(
notified_stated,
unnotified_stated,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(s) => {
state = s;
if state == unnotified_stated {
return false;
}
}
}
}
}
fn new_waker(&self) -> Waker {
let mut state = self.state.load(Ordering::Relaxed);
let mut epoch;
loop {
epoch = (state & !1) + 2;
match self.state.compare_exchange_weak(
state,
epoch,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(s) => state = s,
}
}
let waker_state = self.state.clone();
waker_fn(move || {
let mut state = waker_state.load(Ordering::Relaxed);
loop {
let new_state = if state & !1 == epoch {
epoch | 1
} else {
break;
};
match waker_state.compare_exchange(
state,
new_state,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(s) => state = s,
}
}
})
}
}
#[derive(Default)]
struct Counter {
count: AtomicUsize,
}
impl Counter {
fn increment(&self) {
self.count.fetch_add(1, Ordering::Relaxed);
}
fn try_decrement(&self) -> Option<()> {
let mut count = self.count.load(Ordering::Relaxed);
loop {
if count == 0 {
return None;
}
match self.count.compare_exchange(
count,
count - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return Some(()),
Err(c) => count = c,
}
}
}
}
struct WaitUntilClosure {
event: Arc<Event>,
token_counter: Arc<Counter>,
wait_until: Option<Box<dyn Future<Output = ()>>>,
_pin: PhantomPinned,
}
impl WaitUntilClosure {
fn new(event: Arc<Event>, token_counter: Arc<Counter>) -> Pin<Box<Self>> {
let res = Self {
event,
token_counter,
wait_until: None,
_pin: PhantomPinned,
};
let boxed = Box::new(res);
let event_ptr = &*boxed.event as *const Event;
let token_counter_ptr = &boxed.token_counter as *const Arc<Counter>;
let wait_until: Box<dyn Future<Output = _>> = unsafe {
Box::new((*event_ptr).wait_until(move || (*token_counter_ptr).try_decrement()))
};
let mut pinned_box: Pin<Box<WaitUntilClosure>> = boxed.into();
let mut_ref: Pin<&mut Self> = Pin::as_mut(&mut pinned_box);
unsafe {
Pin::get_unchecked_mut(mut_ref).wait_until = Some(wait_until);
}
pinned_box
}
fn as_pinned_future(self: Pin<&mut Self>) -> Pin<&mut dyn Future<Output = ()>> {
unsafe { self.map_unchecked_mut(|s| s.wait_until.as_mut().unwrap().as_mut()) }
}
}
impl Drop for WaitUntilClosure {
fn drop(&mut self) {
self.wait_until = None;
}
}
#[allow(dead_code)]
enum FutureState {
Completed,
Polled(Pin<Box<WaitUntilClosure>>),
Cancelled,
}
fn loom_notify(
token_count: usize,
waiter_count: usize,
notifier_count: usize,
max_spurious_wake: usize,
max_cancellations: usize,
change_waker: bool,
preemption_bound: usize,
) {
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(preemption_bound);
}
builder.check(move || {
let token_counter = Arc::new(Counter::default());
let event = Arc::new(Event::new());
let mut wakers: Vec<MultiWaker> = Vec::new();
wakers.resize_with(waiter_count, Default::default);
let waiter_threads: Vec<_> = wakers
.iter()
.enumerate()
.map(|(i, multi_waker)| {
thread::spawn({
let multi_waker = multi_waker.clone();
let mut wait_until =
WaitUntilClosure::new(event.clone(), token_counter.clone());
move || {
let cancel_waiter = i < max_cancellations;
let mut spurious_wake = i >= max_cancellations
&& i < (max_cancellations + max_spurious_wake);
let mut waker = multi_waker.new_waker();
loop {
let mut cx = Context::from_waker(&waker);
let poll_state =
wait_until.as_mut().as_pinned_future().poll(&mut cx);
if matches!(poll_state, Poll::Ready(_)) {
return FutureState::Completed;
}
if cancel_waiter {
return FutureState::Cancelled;
}
if spurious_wake {
multi_waker.clear_notification();
} else if !multi_waker.take_notification() {
return FutureState::Polled(wait_until);
}
spurious_wake = false;
if change_waker {
waker = multi_waker.new_waker();
}
}
}
})
})
.collect();
assert!(notifier_count >= 1);
assert!(token_count >= notifier_count);
let notifier_threads: Vec<_> = (0..(notifier_count - 1))
.map(|_| {
let token_counter = token_counter.clone();
let event = event.clone();
thread::spawn(move || {
token_counter.increment();
event.notify(1);
})
})
.collect();
for _ in 0..(token_count - (notifier_count - 1)) {
token_counter.increment();
event.notify(1);
}
for th in notifier_threads {
th.join().unwrap();
}
let future_state: Vec<_> = waiter_threads
.into_iter()
.map(|th| th.join().unwrap())
.collect();
let success: Vec<_> = future_state
.into_iter()
.map(|state| match state {
FutureState::Completed => true,
_ => false,
})
.collect();
let notified: Vec<_> = wakers
.iter()
.enumerate()
.map(|(i, test_waker)| {
test_waker.take_notification() && i >= max_cancellations
})
.collect();
let actual_aggregate_count =
success
.iter()
.zip(notified.iter())
.fold(0, |count, (&success, ¬ified)| {
if success || notified {
count + 1
} else {
count
}
});
let min_expected_success_count = token_count.min(waiter_count - max_cancellations);
if actual_aggregate_count < min_expected_success_count {
panic!(
"Successful threads: {:?}; Notified threads: {:?}",
success, notified
);
}
});
}
#[test]
fn loom_two_consumers() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
loom_notify(2, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_two_consumers_spurious() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
loom_notify(2, 2, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_two_consumers_cancellation() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
loom_notify(2, 2, 1, 1, 1, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_two_consumers_change_waker() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
loom_notify(2, 2, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_two_consumers_change_waker_spurious() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
loom_notify(2, 2, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_two_consumers_change_waker_cancellation() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
loom_notify(1, 2, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_two_consumers_change_waker_spurious_cancellation() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
loom_notify(2, 2, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_two_consumers_three_tokens() {
const DEFAULT_PREEMPTION_BOUND: usize = 3;
loom_notify(3, 2, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_three_consumers() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_notify(3, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_three_consumers_spurious() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_notify(3, 3, 1, 1, 0, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_three_consumers_cancellation() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_notify(2, 3, 1, 0, 1, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_three_consumers_change_waker() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_notify(3, 3, 1, 0, 0, true, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_three_consumers_change_waker_spurious() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_notify(3, 3, 1, 1, 0, true, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_three_consumers_change_waker_cancellation() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_notify(3, 3, 1, 0, 1, true, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_three_consumers_change_waker_spurious_cancellation() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_notify(3, 3, 1, 1, 1, true, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_three_consumers_two_tokens() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_notify(2, 3, 1, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_two_consumers_two_notifiers() {
const DEFAULT_PREEMPTION_BOUND: usize = 3;
loom_notify(2, 2, 2, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_one_consumer_three_notifiers() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
loom_notify(3, 1, 3, 0, 0, false, DEFAULT_PREEMPTION_BOUND);
}
}