use std::cell::UnsafeCell;
use std::fmt;
use std::future::Future;
use std::marker::PhantomPinned;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex};
use std::task::{self, Poll, Waker};
use crate::NEVER_POISONED;
use crate::waiter_list::{WaiterList, WaiterNode};
#[derive(Clone)]
pub struct ManualResetEvent {
state: Arc<Mutex<State>>,
}
struct State {
is_set: bool,
waiters: WaiterList,
}
unsafe impl Send for State {}
#[cfg(test)]
type SetHookFn = dyn Fn() + Send + Sync;
#[cfg(test)]
static HOOK_SET_AFTER_WAKE: Mutex<Option<Arc<SetHookFn>>> = Mutex::new(None);
#[cfg_attr(test, mutants::skip)]
fn set(mutex: &Mutex<State>) {
let mut state = mutex.lock().expect(NEVER_POISONED);
if state.is_set {
return;
}
state.is_set = true;
loop {
let waker = {
let mut cursor = state.waiters.head();
loop {
if cursor.is_null() {
break None;
}
let w = unsafe { (*cursor).waker.take() };
if w.is_some() {
break w;
}
cursor = unsafe { (*cursor).next };
}
};
let Some(w) = waker else { break };
drop(state);
w.wake();
#[cfg(test)]
{
let hook = HOOK_SET_AFTER_WAKE.lock().expect(NEVER_POISONED).clone();
if let Some(hook) = hook {
hook();
}
}
state = mutex.lock().expect(NEVER_POISONED);
if !state.is_set {
break;
}
}
}
fn reset(mutex: &Mutex<State>) {
let mut state = mutex.lock().expect(NEVER_POISONED);
state.is_set = false;
}
#[cfg_attr(test, mutants::skip)]
fn try_wait(mutex: &Mutex<State>) -> bool {
let state = mutex.lock().expect(NEVER_POISONED);
state.is_set
}
unsafe fn poll_wait(
mutex: &Mutex<State>,
node: &UnsafeCell<WaiterNode>,
registered: &mut bool,
waker: Waker,
) -> Poll<()> {
let mut state = mutex.lock().expect(NEVER_POISONED);
let node_ptr = node.get();
if state.is_set {
if *registered {
unsafe {
state.waiters.remove(node_ptr);
}
*registered = false;
}
return Poll::Ready(());
}
unsafe {
(*node_ptr).waker = Some(waker);
}
if !*registered {
unsafe {
state.waiters.push_back(node_ptr);
}
*registered = true;
}
Poll::Pending
}
unsafe fn drop_wait(mutex: &Mutex<State>, node: &UnsafeCell<WaiterNode>, registered: bool) {
if registered {
let mut state = mutex.lock().expect(NEVER_POISONED);
unsafe {
state.waiters.remove(node.get());
}
}
}
impl ManualResetEvent {
#[must_use]
pub fn boxed() -> Self {
Self {
state: Arc::new(Mutex::new(State {
is_set: false,
waiters: WaiterList::new(),
})),
}
}
#[must_use]
pub unsafe fn embedded(place: Pin<&EmbeddedManualResetEvent>) -> RawManualResetEvent {
let state = NonNull::from(&place.get_ref().state);
RawManualResetEvent { state }
}
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn set(&self) {
set(&self.state);
}
#[cfg_attr(coverage_nightly, coverage(off))] pub fn reset(&self) {
reset(&self.state);
}
#[must_use]
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn try_wait(&self) -> bool {
try_wait(&self.state)
}
#[must_use]
pub fn wait(&self) -> ManualResetWaitFuture {
ManualResetWaitFuture {
state: Arc::clone(&self.state),
node: UnsafeCell::new(WaiterNode::new()),
registered: false,
_pinned: PhantomPinned,
}
}
}
pub struct ManualResetWaitFuture {
state: Arc<Mutex<State>>,
node: UnsafeCell<WaiterNode>,
registered: bool,
_pinned: PhantomPinned,
}
unsafe impl Send for ManualResetWaitFuture {}
impl UnwindSafe for ManualResetWaitFuture {}
impl RefUnwindSafe for ManualResetWaitFuture {}
impl Future for ManualResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
unsafe { poll_wait(&this.state, &this.node, &mut this.registered, waker) }
}
}
impl Drop for ManualResetWaitFuture {
fn drop(&mut self) {
unsafe { drop_wait(&self.state, &self.node, self.registered) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl fmt::Debug for ManualResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let is_set = self.try_wait();
f.debug_struct("ManualResetEvent")
.field("is_set", &is_set)
.finish()
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl fmt::Debug for ManualResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ManualResetWaitFuture")
.field("registered", &self.registered)
.finish_non_exhaustive()
}
}
pub struct EmbeddedManualResetEvent {
state: Mutex<State>,
_pinned: PhantomPinned,
}
impl EmbeddedManualResetEvent {
#[must_use]
pub fn new() -> Self {
Self {
state: Mutex::new(State {
is_set: false,
waiters: WaiterList::new(),
}),
_pinned: PhantomPinned,
}
}
}
impl Default for EmbeddedManualResetEvent {
#[cfg_attr(coverage_nightly, coverage(off))] fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Copy)]
pub struct RawManualResetEvent {
state: NonNull<Mutex<State>>,
}
unsafe impl Send for RawManualResetEvent {}
unsafe impl Sync for RawManualResetEvent {}
impl UnwindSafe for RawManualResetEvent {}
impl RefUnwindSafe for RawManualResetEvent {}
impl RawManualResetEvent {
fn state(&self) -> &Mutex<State> {
unsafe { self.state.as_ref() }
}
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn set(&self) {
set(self.state());
}
#[cfg_attr(coverage_nightly, coverage(off))] pub fn reset(&self) {
reset(self.state());
}
#[must_use]
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn try_wait(&self) -> bool {
try_wait(self.state())
}
#[must_use]
pub fn wait(&self) -> RawManualResetWaitFuture {
RawManualResetWaitFuture {
state: self.state,
node: UnsafeCell::new(WaiterNode::new()),
registered: false,
_pinned: PhantomPinned,
}
}
}
pub struct RawManualResetWaitFuture {
state: NonNull<Mutex<State>>,
node: UnsafeCell<WaiterNode>,
registered: bool,
_pinned: PhantomPinned,
}
unsafe impl Send for RawManualResetWaitFuture {}
impl UnwindSafe for RawManualResetWaitFuture {}
impl RefUnwindSafe for RawManualResetWaitFuture {}
impl Future for RawManualResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
let state = unsafe { this.state.as_ref() };
unsafe { poll_wait(state, &this.node, &mut this.registered, waker) }
}
}
impl Drop for RawManualResetWaitFuture {
fn drop(&mut self) {
let state = unsafe { self.state.as_ref() };
unsafe { drop_wait(state, &self.node, self.registered) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedManualResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EmbeddedManualResetEvent")
.finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for RawManualResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let is_set = self.try_wait();
f.debug_struct("RawManualResetEvent")
.field("is_set", &is_set)
.finish()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for RawManualResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawManualResetWaitFuture")
.field("registered", &self.registered)
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::sync::Barrier;
use std::task::Waker;
use std::{iter, thread};
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
assert_impl_all!(ManualResetEvent: Send, Sync, Clone, UnwindSafe, RefUnwindSafe);
assert_impl_all!(ManualResetWaitFuture: Send, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(ManualResetWaitFuture: Sync, Unpin);
assert_impl_all!(EmbeddedManualResetEvent: Send, Sync, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedManualResetEvent: Unpin);
assert_impl_all!(RawManualResetEvent: Send, Sync, Clone, Copy, UnwindSafe, RefUnwindSafe);
assert_impl_all!(RawManualResetWaitFuture: Send, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(RawManualResetWaitFuture: Sync, Unpin);
#[test]
fn starts_unset() {
let event = ManualResetEvent::boxed();
assert!(!event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn set_makes_is_set_true() {
let event = ManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
}
#[test]
fn reset_after_set() {
let event = ManualResetEvent::boxed();
event.set();
event.reset();
assert!(!event.try_wait());
}
#[test]
fn clone_shares_state() {
let a = ManualResetEvent::boxed();
let b = a.clone();
a.set();
assert!(b.try_wait());
}
#[test]
fn wait_completes_when_already_set() {
futures::executor::block_on(async {
let event = ManualResetEvent::boxed();
event.set();
event.wait().await;
});
}
#[test]
fn wait_completes_after_set() {
let event = ManualResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn multiple_waiters_all_released() {
let event = ManualResetEvent::boxed();
let mut f1 = Box::pin(event.wait());
let mut f2 = Box::pin(event.wait());
let mut f3 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(f1.as_mut().poll(&mut cx).is_pending());
assert!(f2.as_mut().poll(&mut cx).is_pending());
assert!(f3.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(f1.as_mut().poll(&mut cx).is_ready());
assert!(f2.as_mut().poll(&mut cx).is_ready());
assert!(f3.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn drop_future_while_waiting() {
futures::executor::block_on(async {
let event = ManualResetEvent::boxed();
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn drop_polled_future_while_waiting() {
let event = ManualResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn set_wakes_registered_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let event = ManualResetEvent::boxed();
let tracker = AtomicWakeTracker::new();
let waker = unsafe { tracker.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(tracker.was_woken());
}
#[test]
fn set_from_another_thread() {
testing::with_watchdog(|| {
let event = ManualResetEvent::boxed();
let setter = event.clone();
let barrier = Arc::new(Barrier::new(2));
let b2 = Arc::clone(&barrier);
let handle = thread::spawn(move || {
b2.wait();
setter.set();
});
barrier.wait();
while !event.try_wait() {
std::hint::spin_loop();
}
handle.join().unwrap();
});
}
#[test]
fn multiple_waiters_from_different_threads() {
testing::with_watchdog(|| {
let event = ManualResetEvent::boxed();
let waiter_count = 4;
let barrier = Arc::new(Barrier::new(waiter_count + 1));
let all_done = Arc::new(Barrier::new(waiter_count + 1));
let handles: Vec<_> = iter::repeat_with(|| {
let e = event.clone();
let b = Arc::clone(&barrier);
let done = Arc::clone(&all_done);
thread::spawn(move || {
b.wait();
while !e.try_wait() {
std::hint::spin_loop();
}
done.wait();
})
})
.take(waiter_count)
.collect();
barrier.wait();
event.set();
all_done.wait();
for h in handles {
h.join().unwrap();
}
});
}
#[test]
fn set_reset_race_across_threads() {
testing::with_watchdog(|| {
let event = ManualResetEvent::boxed();
let barrier = Arc::new(Barrier::new(3));
let setter = event.clone();
let b1 = Arc::clone(&barrier);
let h1 = thread::spawn(move || {
b1.wait();
for _ in 0..100 {
setter.set();
std::hint::spin_loop();
}
});
let resetter = event;
let b2 = Arc::clone(&barrier);
let h2 = thread::spawn(move || {
b2.wait();
for _ in 0..100 {
resetter.reset();
std::hint::spin_loop();
}
});
barrier.wait();
h1.join().unwrap();
h2.join().unwrap();
});
}
#[test]
fn embedded_set_from_another_thread() {
testing::with_watchdog(|| {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let setter = event;
let barrier = Arc::new(Barrier::new(2));
let b2 = Arc::clone(&barrier);
let handle = thread::spawn(move || {
b2.wait();
setter.set();
});
barrier.wait();
while !event.try_wait() {
std::hint::spin_loop();
}
handle.join().unwrap();
});
}
#[test]
fn embedded_set_and_wait() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
event.set();
event.wait().await;
});
}
#[test]
fn embedded_clone_shares_state() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedManualResetEvent::new());
let a = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let b = a;
a.set();
assert!(b.try_wait());
b.wait().await;
});
}
#[test]
fn embedded_reset_after_set() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
event.set();
event.reset();
assert!(!event.try_wait());
}
#[test]
fn embedded_drop_future_while_waiting() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn embedded_wait_registers_then_completes() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_drop_registered_future() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_multiple_waiters_released() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let mut f1 = Box::pin(event.wait());
let mut f2 = Box::pin(event.wait());
let mut f3 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(f1.as_mut().poll(&mut cx).is_pending());
assert!(f2.as_mut().poll(&mut cx).is_pending());
assert!(f3.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(f1.as_mut().poll(&mut cx).is_ready());
assert!(f2.as_mut().poll(&mut cx).is_ready());
assert!(f3.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn try_wait_returns_true_when_set() {
let event = ManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_try_wait_returns_false_when_unset() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
assert!(!event.try_wait());
}
#[test]
fn embedded_try_wait_returns_true_when_set() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_set_wakes_registered_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let tracker = AtomicWakeTracker::new();
let waker = unsafe { tracker.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(tracker.was_woken());
}
#[test]
fn drop_unlinks_registered_waiter_from_list() {
use crate::test_helpers::AtomicWakeTracker;
let event = ManualResetEvent::boxed();
let tracker1 = AtomicWakeTracker::new();
let waker1 = unsafe { tracker1.waker() };
let mut cx1 = task::Context::from_waker(&waker1);
let tracker2 = AtomicWakeTracker::new();
let waker2 = unsafe { tracker2.waker() };
let mut cx2 = task::Context::from_waker(&waker2);
let mut future1 = Box::pin(event.wait());
assert!(future1.as_mut().poll(&mut cx1).is_pending());
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx2).is_pending());
drop(future1);
event.set();
assert!(!tracker1.was_woken());
assert!(tracker2.was_woken());
}
#[test]
fn embedded_drop_unlinks_registered_waiter_from_list() {
use crate::test_helpers::AtomicWakeTracker;
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let tracker1 = AtomicWakeTracker::new();
let waker1 = unsafe { tracker1.waker() };
let mut cx1 = task::Context::from_waker(&waker1);
let tracker2 = AtomicWakeTracker::new();
let waker2 = unsafe { tracker2.waker() };
let mut cx2 = task::Context::from_waker(&waker2);
let mut future1 = Box::pin(event.wait());
assert!(future1.as_mut().poll(&mut cx1).is_pending());
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx2).is_pending());
drop(future1);
event.set();
assert!(!tracker1.was_woken());
assert!(tracker2.was_woken());
}
#[test]
fn set_when_already_set_is_noop() {
let event = ManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_set_when_already_set_is_noop() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
event.set();
assert!(event.try_wait());
}
#[test]
fn set_terminates_when_reset_called_during_wake_loop() {
testing::with_watchdog(|| {
let event = ManualResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
let event_for_hook = event.clone();
let future_shared: Arc<Mutex<Pin<Box<ManualResetWaitFuture>>>> =
Arc::new(Mutex::new(future));
let future_for_hook = Arc::clone(&future_shared);
*HOOK_SET_AFTER_WAKE.lock().unwrap() = Some(Arc::new(move || {
event_for_hook.reset();
let w = Waker::noop();
let mut cx = task::Context::from_waker(w);
let _poll = future_for_hook.lock().unwrap().as_mut().poll(&mut cx);
}));
event.set();
*HOOK_SET_AFTER_WAKE.lock().unwrap() = None;
});
}
}