use std::cell::UnsafeCell;
use std::fmt;
use std::future::Future;
use std::marker::PhantomPinned;
use std::mem;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex};
use std::task::{self, Poll, Waker};
use crate::NEVER_POISONED;
use crate::waiter_list::{WaiterList, WaiterNode};
#[derive(Clone)]
pub struct AutoResetEvent {
state: Arc<Mutex<State>>,
}
enum State {
Unset(WaiterList),
Set,
}
unsafe impl Send for State {}
#[cfg_attr(test, mutants::skip)]
fn set(mutex: &Mutex<State>) {
let waker: Option<Waker>;
{
let mut state = mutex.lock().expect(NEVER_POISONED);
match &mut *state {
State::Set => {
waker = None;
}
State::Unset(waiters) => {
if let Some(node_ptr) = unsafe { waiters.pop_front() } {
unsafe {
(*node_ptr).notified = true;
}
waker = unsafe { (*node_ptr).waker.take() };
} else {
*state = State::Set;
waker = None;
}
}
}
}
if let Some(w) = waker {
w.wake();
}
}
#[cfg_attr(test, mutants::skip)]
fn try_wait(mutex: &Mutex<State>) -> bool {
let mut state = mutex.lock().expect(NEVER_POISONED);
if matches!(*state, State::Set) {
*state = State::Unset(WaiterList::new());
true
} else {
false
}
}
unsafe fn poll_wait(
mutex: &Mutex<State>,
node: &UnsafeCell<WaiterNode>,
registered: &mut bool,
waker: Waker,
) -> Poll<()> {
let node_ptr = node.get();
let mut state = mutex.lock().expect(NEVER_POISONED);
if unsafe { (*node_ptr).notified } {
*registered = false;
return Poll::Ready(());
}
match &mut *state {
State::Set => {
debug_assert!(
!*registered,
"Set state is exclusive with registered waiters"
);
*state = State::Unset(WaiterList::new());
Poll::Ready(())
}
State::Unset(waiters) => {
unsafe {
(*node_ptr).waker = Some(waker);
}
if !*registered {
unsafe {
waiters.push_back(node_ptr);
}
*registered = true;
}
Poll::Pending
}
}
}
unsafe fn drop_wait(mutex: &Mutex<State>, node: &UnsafeCell<WaiterNode>, registered: bool) {
debug_assert!(registered);
let node_ptr = node.get();
let mut state = mutex.lock().expect(NEVER_POISONED);
if unsafe { (*node_ptr).notified } {
let old_state = mem::replace(&mut *state, State::Unset(WaiterList::new()));
match old_state {
State::Unset(mut waiters) => {
if let Some(next_node) = unsafe { waiters.pop_front() } {
unsafe {
(*next_node).notified = true;
}
let waker = unsafe { (*next_node).waker.take() };
*state = State::Unset(waiters);
drop(state);
if let Some(w) = waker {
w.wake();
}
} else {
*state = State::Set;
}
}
State::Set => {
*state = State::Set;
}
}
} else {
match &mut *state {
State::Unset(waiters) => {
unsafe {
waiters.remove(node_ptr);
}
}
State::Set => {
debug_assert!(false, "registered non-notified node requires Unset state");
}
}
}
}
impl AutoResetEvent {
#[must_use]
pub fn boxed() -> Self {
Self {
state: Arc::new(Mutex::new(State::Unset(WaiterList::new()))),
}
}
#[must_use]
pub unsafe fn embedded(place: Pin<&EmbeddedAutoResetEvent>) -> RawAutoResetEvent {
let state = NonNull::from(&place.get_ref().state);
RawAutoResetEvent { state }
}
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn set(&self) {
set(&self.state);
}
#[must_use]
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn try_wait(&self) -> bool {
try_wait(&self.state)
}
#[must_use]
pub fn wait(&self) -> AutoResetWaitFuture {
AutoResetWaitFuture {
state: Arc::clone(&self.state),
node: UnsafeCell::new(WaiterNode::new()),
registered: false,
_pinned: PhantomPinned,
}
}
}
pub struct AutoResetWaitFuture {
state: Arc<Mutex<State>>,
node: UnsafeCell<WaiterNode>,
registered: bool,
_pinned: PhantomPinned,
}
unsafe impl Send for AutoResetWaitFuture {}
impl UnwindSafe for AutoResetWaitFuture {}
impl RefUnwindSafe for AutoResetWaitFuture {}
impl Future for AutoResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
unsafe { poll_wait(&this.state, &this.node, &mut this.registered, waker) }
}
}
impl Drop for AutoResetWaitFuture {
fn drop(&mut self) {
if !self.registered {
return;
}
unsafe { drop_wait(&self.state, &self.node, self.registered) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl fmt::Debug for AutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AutoResetEvent").finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl fmt::Debug for AutoResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AutoResetWaitFuture")
.field("registered", &self.registered)
.finish_non_exhaustive()
}
}
pub struct EmbeddedAutoResetEvent {
state: Mutex<State>,
_pinned: PhantomPinned,
}
impl EmbeddedAutoResetEvent {
#[must_use]
pub fn new() -> Self {
Self {
state: Mutex::new(State::Unset(WaiterList::new())),
_pinned: PhantomPinned,
}
}
}
impl Default for EmbeddedAutoResetEvent {
#[cfg_attr(coverage_nightly, coverage(off))] fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Copy)]
pub struct RawAutoResetEvent {
state: NonNull<Mutex<State>>,
}
unsafe impl Send for RawAutoResetEvent {}
unsafe impl Sync for RawAutoResetEvent {}
impl UnwindSafe for RawAutoResetEvent {}
impl RefUnwindSafe for RawAutoResetEvent {}
impl RawAutoResetEvent {
fn state(&self) -> &Mutex<State> {
unsafe { self.state.as_ref() }
}
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn set(&self) {
set(self.state());
}
#[must_use]
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn try_wait(&self) -> bool {
try_wait(self.state())
}
#[must_use]
pub fn wait(&self) -> RawAutoResetWaitFuture {
RawAutoResetWaitFuture {
state: self.state,
node: UnsafeCell::new(WaiterNode::new()),
registered: false,
_pinned: PhantomPinned,
}
}
}
pub struct RawAutoResetWaitFuture {
state: NonNull<Mutex<State>>,
node: UnsafeCell<WaiterNode>,
registered: bool,
_pinned: PhantomPinned,
}
unsafe impl Send for RawAutoResetWaitFuture {}
impl UnwindSafe for RawAutoResetWaitFuture {}
impl RefUnwindSafe for RawAutoResetWaitFuture {}
impl Future for RawAutoResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
let state = unsafe { this.state.as_ref() };
unsafe { poll_wait(state, &this.node, &mut this.registered, waker) }
}
}
impl Drop for RawAutoResetWaitFuture {
fn drop(&mut self) {
if !self.registered {
return;
}
let state = unsafe { self.state.as_ref() };
unsafe { drop_wait(state, &self.node, self.registered) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedAutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EmbeddedAutoResetEvent")
.finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for RawAutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawAutoResetEvent").finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for RawAutoResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawAutoResetWaitFuture")
.field("registered", &self.registered)
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::sync::Barrier;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{iter, thread};
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
assert_impl_all!(AutoResetEvent: Send, Sync, Clone, UnwindSafe, RefUnwindSafe);
assert_impl_all!(AutoResetWaitFuture: Send, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(AutoResetWaitFuture: Sync, Unpin);
assert_impl_all!(EmbeddedAutoResetEvent: Send, Sync, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedAutoResetEvent: Unpin);
assert_impl_all!(RawAutoResetEvent: Send, Sync, Clone, Copy, UnwindSafe, RefUnwindSafe);
assert_impl_all!(RawAutoResetWaitFuture: Send, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(RawAutoResetWaitFuture: Sync, Unpin);
#[test]
fn starts_unset() {
let event = AutoResetEvent::boxed();
assert!(!event.try_wait());
}
#[test]
fn set_then_try_wait() {
let event = AutoResetEvent::boxed();
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn clone_shares_state() {
let a = AutoResetEvent::boxed();
let b = a.clone();
a.set();
assert!(b.try_wait());
}
#[test]
fn double_set_without_waiter_only_stores_one_signal() {
let event = AutoResetEvent::boxed();
event.set();
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn wait_completes_when_already_set() {
futures::executor::block_on(async {
let event = AutoResetEvent::boxed();
event.set();
event.wait().await;
assert!(!event.try_wait());
});
}
#[test]
fn wait_completes_after_set() {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn only_one_waiter_released_per_set() {
let event = AutoResetEvent::boxed();
let mut f1 = Box::pin(event.wait());
let mut f2 = Box::pin(event.wait());
let mut f3 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(f1.as_mut().poll(&mut cx).is_pending());
assert!(f2.as_mut().poll(&mut cx).is_pending());
assert!(f3.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(f1.as_mut().poll(&mut cx).is_ready());
assert!(f2.as_mut().poll(&mut cx).is_pending());
assert!(f3.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(f2.as_mut().poll(&mut cx).is_ready());
event.set();
assert!(f3.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn cancelled_waiter_forwards_notification() {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn drop_unpolled_future_is_safe() {
let event = AutoResetEvent::boxed();
{
let _future = event.wait();
}
event.set();
futures::executor::block_on(event.wait());
}
#[test]
fn set_from_another_thread() {
testing::with_watchdog(|| {
let event = AutoResetEvent::boxed();
let setter = event.clone();
let barrier = Arc::new(Barrier::new(2));
let b2 = Arc::clone(&barrier);
let handle = thread::spawn(move || {
b2.wait();
setter.set();
});
barrier.wait();
while !event.try_wait() {
std::hint::spin_loop();
}
handle.join().unwrap();
});
}
#[test]
fn only_one_thread_acquires_signal() {
testing::with_watchdog(|| {
let event = AutoResetEvent::boxed();
let waiter_count = 4;
let barrier = Arc::new(Barrier::new(waiter_count + 1));
let acquired_count = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = iter::repeat_with(|| {
let e = event.clone();
let b = Arc::clone(&barrier);
let count = Arc::clone(&acquired_count);
thread::spawn(move || {
b.wait();
for _ in 0..200 {
if e.try_wait() {
count.fetch_add(1, Ordering::Relaxed);
}
std::hint::spin_loop();
}
})
})
.take(waiter_count)
.collect();
event.set();
barrier.wait();
for h in handles {
h.join().unwrap();
}
assert_eq!(acquired_count.load(Ordering::Relaxed), 1);
});
}
#[test]
fn multiple_sets_release_multiple_threads() {
testing::with_watchdog(|| {
let event = AutoResetEvent::boxed();
let signal_count = 4;
let barrier = Arc::new(Barrier::new(signal_count + 1));
let acquired_count = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = iter::repeat_with(|| {
let e = event.clone();
let b = Arc::clone(&barrier);
let count = Arc::clone(&acquired_count);
thread::spawn(move || {
b.wait();
while !e.try_wait() {
std::hint::spin_loop();
}
count.fetch_add(1, Ordering::Relaxed);
})
})
.take(signal_count)
.collect();
barrier.wait();
while acquired_count.load(Ordering::Relaxed) < signal_count {
event.set();
std::hint::spin_loop();
}
for h in handles {
h.join().unwrap();
}
});
}
#[test]
fn embedded_set_from_another_thread() {
testing::with_watchdog(|| {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let setter = event;
let barrier = Arc::new(Barrier::new(2));
let b2 = Arc::clone(&barrier);
let handle = thread::spawn(move || {
b2.wait();
setter.set();
});
barrier.wait();
while !event.try_wait() {
std::hint::spin_loop();
}
handle.join().unwrap();
});
}
#[test]
fn embedded_set_and_wait() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
event.set();
event.wait().await;
});
}
#[test]
fn embedded_clone_shares_state() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let a = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let b = a;
a.set();
assert!(b.try_wait());
}
#[test]
fn embedded_signal_consumed() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn embedded_drop_future_while_waiting() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn notified_then_dropped_re_sets_event() {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn notified_then_dropped_forwards_to_next() {
let event = AutoResetEvent::boxed();
let mut future1 = Box::pin(event.wait());
let mut future2 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future1.as_mut().poll(&mut cx).is_pending());
assert!(future2.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future1);
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn set_wakes_registered_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let event = AutoResetEvent::boxed();
let tracker = AtomicWakeTracker::new();
let waker = unsafe { tracker.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(tracker.was_woken());
}
#[test]
fn embedded_wait_registers_then_completes() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_drop_registered_future() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_notified_then_dropped_re_sets_event() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn embedded_notified_then_dropped_forwards_to_next() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let mut future1 = Box::pin(event.wait());
let mut future2 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future1.as_mut().poll(&mut cx).is_pending());
assert!(future2.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future1);
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_set_wakes_registered_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let tracker = AtomicWakeTracker::new();
let waker = unsafe { tracker.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(tracker.was_woken());
}
const WAITER_COUNT: usize = 100;
#[test]
fn many_sets_release_all_waiters() {
let event = AutoResetEvent::boxed();
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut futures: Vec<_> = iter::repeat_with(|| Box::pin(event.wait()))
.take(WAITER_COUNT)
.collect();
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_pending());
}
for f in &mut futures {
event.set();
assert!(f.as_mut().poll(&mut cx).is_ready());
}
assert!(!event.try_wait());
}
#[test]
fn embedded_many_sets_release_all_waiters() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut futures: Vec<_> = iter::repeat_with(|| Box::pin(event.wait()))
.take(WAITER_COUNT)
.collect();
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_pending());
}
for f in &mut futures {
event.set();
assert!(f.as_mut().poll(&mut cx).is_ready());
}
assert!(!event.try_wait());
}
#[test]
fn many_sets_without_waiters_coalesce() {
let event = AutoResetEvent::boxed();
for _ in 0..WAITER_COUNT {
event.set();
}
assert!(event.try_wait());
assert!(!event.try_wait());
}
}