use std::any::type_name;
use std::fmt;
use std::future::Future;
use std::marker::PhantomPinned;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{self, Poll, Waker};
use awaiter_set::{Awaiter, AwaiterSet};
use crate::NEVER_POISONED;
#[derive(Clone)]
pub struct ManualResetEvent {
inner: Arc<EventInner>,
}
const IDLE: u8 = 0;
const IS_SET: u8 = 0x1;
const HAS_WAITERS: u8 = 0x2;
struct EventInner {
state: AtomicU8,
slow: Mutex<AwaiterSet>,
}
impl EventInner {
fn set(&self) {
let prev = self.state.fetch_or(IS_SET, Ordering::Release);
if prev & HAS_WAITERS == 0 {
return;
}
self.slow.lock().expect(NEVER_POISONED).advance_generation();
loop {
let mut waiters = self.slow.lock().expect(NEVER_POISONED);
let waker = waiters.notify_one_prior_generation();
if waiters.is_empty() {
self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed);
}
let Some(w) = waker else { break };
drop(waiters);
w.wake();
}
}
fn reset(&self) {
self.state.fetch_and(!IS_SET, Ordering::Release);
}
fn try_wait(&self) -> bool {
self.state.load(Ordering::Acquire) & IS_SET != 0
}
unsafe fn poll_wait(&self, awaiter: *mut Awaiter, waker: Waker) -> Poll<()> {
if self.state.load(Ordering::Acquire) & IS_SET != 0 {
return Poll::Ready(());
}
let awaiter_ref = unsafe { &*awaiter };
if awaiter_ref.take_notification() {
return Poll::Ready(());
}
#[cfg(test)]
crate::test_hooks::run(&crate::test_hooks::MANUAL_PRE_MUTEX);
let mut waiters = self.slow.lock().expect(NEVER_POISONED);
if awaiter_ref.take_notification() {
return Poll::Ready(());
}
#[cfg(test)]
crate::test_hooks::run(&crate::test_hooks::MANUAL_PRE_LOAD);
if self.state.load(Ordering::Acquire) & IS_SET != 0 {
return Poll::Ready(());
}
#[cfg(test)]
crate::test_hooks::run(&crate::test_hooks::MANUAL_PRE_FETCH_OR);
self.state.fetch_or(HAS_WAITERS, Ordering::Relaxed);
if self.state.load(Ordering::Acquire) & IS_SET != 0 {
self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed);
return Poll::Ready(());
}
let awaiter_mut = unsafe { &mut *awaiter };
let awaiter_mut = unsafe { Pin::new_unchecked(awaiter_mut) };
unsafe {
waiters.register(awaiter_mut, waker);
}
Poll::Pending
}
unsafe fn drop_wait(&self, awaiter: *mut Awaiter) {
let awaiter_ref = unsafe { &*awaiter };
if !awaiter_ref.is_registered() {
return;
}
let mut waiters = self.slow.lock().expect(NEVER_POISONED);
let awaiter_mut = unsafe { &mut *awaiter };
let awaiter_mut = unsafe { Pin::new_unchecked(awaiter_mut) };
unsafe {
waiters.unregister(awaiter_mut);
}
if waiters.is_empty() {
self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed);
}
}
}
impl ManualResetEvent {
#[must_use]
pub fn boxed() -> Self {
Self {
inner: Arc::new(EventInner {
state: AtomicU8::new(IDLE),
slow: Mutex::new(AwaiterSet::new()),
}),
}
}
#[must_use]
pub unsafe fn embedded(place: Pin<&EmbeddedManualResetEvent>) -> EmbeddedManualResetEventRef {
let inner = NonNull::from(&place.get_ref().inner);
EmbeddedManualResetEventRef { inner }
}
#[cfg_attr(coverage_nightly, coverage(off))] pub fn set(&self) {
self.inner.set();
}
#[cfg_attr(coverage_nightly, coverage(off))] pub fn reset(&self) {
self.inner.reset();
}
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn try_wait(&self) -> bool {
self.inner.try_wait()
}
#[must_use]
pub fn wait(&self) -> ManualResetWaitFuture {
ManualResetWaitFuture {
inner: Arc::clone(&self.inner),
awaiter: Awaiter::new(),
}
}
}
pub struct ManualResetWaitFuture {
inner: Arc<EventInner>,
awaiter: Awaiter,
}
unsafe impl Send for ManualResetWaitFuture {}
impl UnwindSafe for ManualResetWaitFuture {}
impl RefUnwindSafe for ManualResetWaitFuture {}
impl Future for ManualResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
let awaiter: *mut Awaiter = &raw mut this.awaiter;
unsafe { this.inner.poll_wait(awaiter, waker) }
}
}
impl Drop for ManualResetWaitFuture {
fn drop(&mut self) {
let awaiter: *mut Awaiter = &raw mut self.awaiter;
unsafe { self.inner.drop_wait(awaiter) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl fmt::Debug for ManualResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let is_set = self.try_wait();
f.debug_struct(type_name::<Self>())
.field("is_set", &is_set)
.finish()
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl fmt::Debug for ManualResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.finish_non_exhaustive()
}
}
pub struct EmbeddedManualResetEvent {
inner: EventInner,
_pinned: PhantomPinned,
}
impl EmbeddedManualResetEvent {
#[must_use]
pub fn new() -> Self {
Self {
inner: EventInner {
state: AtomicU8::new(IDLE),
slow: Mutex::new(AwaiterSet::new()),
},
_pinned: PhantomPinned,
}
}
}
impl Default for EmbeddedManualResetEvent {
#[cfg_attr(coverage_nightly, coverage(off))] fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Copy)]
pub struct EmbeddedManualResetEventRef {
inner: NonNull<EventInner>,
}
unsafe impl Send for EmbeddedManualResetEventRef {}
unsafe impl Sync for EmbeddedManualResetEventRef {}
impl UnwindSafe for EmbeddedManualResetEventRef {}
impl RefUnwindSafe for EmbeddedManualResetEventRef {}
impl EmbeddedManualResetEventRef {
fn inner(&self) -> &EventInner {
unsafe { self.inner.as_ref() }
}
#[cfg_attr(coverage_nightly, coverage(off))] pub fn set(&self) {
self.inner().set();
}
#[cfg_attr(coverage_nightly, coverage(off))] pub fn reset(&self) {
self.inner().reset();
}
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn try_wait(&self) -> bool {
self.inner().try_wait()
}
#[must_use]
pub fn wait(&self) -> EmbeddedManualResetWaitFuture {
EmbeddedManualResetWaitFuture {
inner: self.inner,
awaiter: Awaiter::new(),
}
}
}
pub struct EmbeddedManualResetWaitFuture {
inner: NonNull<EventInner>,
awaiter: Awaiter,
}
unsafe impl Send for EmbeddedManualResetWaitFuture {}
impl UnwindSafe for EmbeddedManualResetWaitFuture {}
impl RefUnwindSafe for EmbeddedManualResetWaitFuture {}
impl Future for EmbeddedManualResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
let inner = unsafe { this.inner.as_ref() };
let awaiter: *mut Awaiter = &raw mut this.awaiter;
unsafe { inner.poll_wait(awaiter, waker) }
}
}
impl Drop for EmbeddedManualResetWaitFuture {
fn drop(&mut self) {
let inner = unsafe { self.inner.as_ref() };
let awaiter: *mut Awaiter = &raw mut self.awaiter;
unsafe { inner.drop_wait(awaiter) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedManualResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedManualResetEventRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let is_set = self.try_wait();
f.debug_struct(type_name::<Self>())
.field("is_set", &is_set)
.finish()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedManualResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::sync::Barrier;
use std::task::Waker;
use std::{iter, thread};
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
use crate::test_hooks::BarrierHook;
assert_impl_all!(ManualResetEvent: Send, Sync, Clone, UnwindSafe, RefUnwindSafe);
assert_impl_all!(ManualResetWaitFuture: Send, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(ManualResetWaitFuture: Sync, Unpin);
assert_impl_all!(EmbeddedManualResetEvent: Send, Sync, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedManualResetEvent: Unpin);
assert_impl_all!(
EmbeddedManualResetEventRef: Send, Sync, Clone, Copy, UnwindSafe, RefUnwindSafe
);
assert_impl_all!(EmbeddedManualResetWaitFuture: Send, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedManualResetWaitFuture: Sync, Unpin);
#[test]
fn starts_unset() {
let event = ManualResetEvent::boxed();
assert!(!event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn set_makes_is_set_true() {
let event = ManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
}
#[test]
fn reset_after_set() {
let event = ManualResetEvent::boxed();
event.set();
event.reset();
assert!(!event.try_wait());
}
#[test]
fn clone_shares_state() {
let a = ManualResetEvent::boxed();
let b = a.clone();
a.set();
assert!(b.try_wait());
}
#[test]
fn wait_completes_when_already_set() {
futures::executor::block_on(async {
let event = ManualResetEvent::boxed();
event.set();
event.wait().await;
});
}
#[test]
fn wait_completes_after_set() {
let event = ManualResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn multiple_waiters_all_released() {
let event = ManualResetEvent::boxed();
let mut f1 = Box::pin(event.wait());
let mut f2 = Box::pin(event.wait());
let mut f3 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(f1.as_mut().poll(&mut cx).is_pending());
assert!(f2.as_mut().poll(&mut cx).is_pending());
assert!(f3.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(f1.as_mut().poll(&mut cx).is_ready());
assert!(f2.as_mut().poll(&mut cx).is_ready());
assert!(f3.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn drop_future_while_waiting() {
futures::executor::block_on(async {
let event = ManualResetEvent::boxed();
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn drop_polled_future_while_waiting() {
let event = ManualResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn set_wakes_registered_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let event = ManualResetEvent::boxed();
let tracker = AtomicWakeTracker::new();
let waker = unsafe { tracker.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(tracker.was_woken());
}
#[test]
fn set_from_another_thread() {
testing::with_watchdog(|| {
let event = ManualResetEvent::boxed();
let setter = event.clone();
let barrier = Arc::new(Barrier::new(2));
let b2 = Arc::clone(&barrier);
let handle = thread::spawn(move || {
b2.wait();
setter.set();
});
barrier.wait();
while !event.try_wait() {
std::hint::spin_loop();
}
handle.join().unwrap();
});
}
#[test]
fn multiple_waiters_from_different_threads() {
testing::with_watchdog(|| {
let event = ManualResetEvent::boxed();
let waiter_count = 4;
let barrier = Arc::new(Barrier::new(waiter_count + 1));
let all_done = Arc::new(Barrier::new(waiter_count + 1));
let handles: Vec<_> = iter::repeat_with(|| {
let e = event.clone();
let b = Arc::clone(&barrier);
let done = Arc::clone(&all_done);
thread::spawn(move || {
b.wait();
while !e.try_wait() {
std::hint::spin_loop();
}
done.wait();
})
})
.take(waiter_count)
.collect();
barrier.wait();
event.set();
all_done.wait();
for h in handles {
h.join().unwrap();
}
});
}
#[test]
fn set_reset_race_across_threads() {
testing::with_watchdog(|| {
let event = ManualResetEvent::boxed();
let barrier = Arc::new(Barrier::new(3));
let setter = event.clone();
let b1 = Arc::clone(&barrier);
let h1 = thread::spawn(move || {
b1.wait();
for _ in 0..100 {
setter.set();
std::hint::spin_loop();
}
});
let resetter = event;
let b2 = Arc::clone(&barrier);
let h2 = thread::spawn(move || {
b2.wait();
for _ in 0..100 {
resetter.reset();
std::hint::spin_loop();
}
});
barrier.wait();
h1.join().unwrap();
h2.join().unwrap();
});
}
#[test]
fn poll_wait_post_mutex_take_notification_branch() {
testing::with_watchdog(|| {
let BarrierHook {
entered,
proceed,
hook,
} = crate::test_hooks::barrier_hook();
crate::test_hooks::with_hook(&crate::test_hooks::MANUAL_PRE_MUTEX, hook, || {
let event = ManualResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
let producer = thread::spawn(move || {
crate::test_hooks::HOOK_PARTICIPANT.with(|c| c.set(true));
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
future.as_mut().poll(&mut cx)
});
entered.wait();
event.set();
proceed.wait();
assert!(producer.join().unwrap().is_ready());
});
});
}
#[test]
fn poll_wait_post_mutex_load_branch() {
testing::with_watchdog(|| {
let BarrierHook {
entered,
proceed,
hook,
} = crate::test_hooks::barrier_hook();
crate::test_hooks::with_hook(&crate::test_hooks::MANUAL_PRE_LOAD, hook, || {
let event = ManualResetEvent::boxed();
let producer = thread::spawn({
let event = event.clone();
move || {
crate::test_hooks::HOOK_PARTICIPANT.with(|c| c.set(true));
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
future.as_mut().poll(&mut cx)
}
});
entered.wait();
event.set();
proceed.wait();
assert!(producer.join().unwrap().is_ready());
assert!(event.try_wait());
});
});
}
#[test]
fn poll_wait_post_fetch_or_load_branch() {
testing::with_watchdog(|| {
let BarrierHook {
entered,
proceed,
hook,
} = crate::test_hooks::barrier_hook();
crate::test_hooks::with_hook(&crate::test_hooks::MANUAL_PRE_FETCH_OR, hook, || {
let event = ManualResetEvent::boxed();
let producer = thread::spawn({
let event = event.clone();
move || {
crate::test_hooks::HOOK_PARTICIPANT.with(|c| c.set(true));
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
future.as_mut().poll(&mut cx)
}
});
entered.wait();
event.set();
proceed.wait();
assert!(producer.join().unwrap().is_ready());
assert!(event.try_wait());
});
});
}
#[test]
fn embedded_set_from_another_thread() {
testing::with_watchdog(|| {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let setter = event;
let barrier = Arc::new(Barrier::new(2));
let b2 = Arc::clone(&barrier);
let handle = thread::spawn(move || {
b2.wait();
setter.set();
});
barrier.wait();
while !event.try_wait() {
std::hint::spin_loop();
}
handle.join().unwrap();
});
}
#[test]
fn embedded_set_and_wait() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
event.set();
event.wait().await;
});
}
#[test]
fn embedded_clone_shares_state() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedManualResetEvent::new());
let a = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let b = a;
a.set();
assert!(b.try_wait());
b.wait().await;
});
}
#[test]
fn embedded_reset_after_set() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
event.set();
event.reset();
assert!(!event.try_wait());
}
#[test]
fn embedded_drop_future_while_waiting() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn embedded_wait_registers_then_completes() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_drop_registered_future() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_multiple_waiters_released() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let mut f1 = Box::pin(event.wait());
let mut f2 = Box::pin(event.wait());
let mut f3 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(f1.as_mut().poll(&mut cx).is_pending());
assert!(f2.as_mut().poll(&mut cx).is_pending());
assert!(f3.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(f1.as_mut().poll(&mut cx).is_ready());
assert!(f2.as_mut().poll(&mut cx).is_ready());
assert!(f3.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn try_wait_returns_true_when_set() {
let event = ManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_try_wait_returns_false_when_unset() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
assert!(!event.try_wait());
}
#[test]
fn embedded_try_wait_returns_true_when_set() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_set_wakes_registered_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let tracker = AtomicWakeTracker::new();
let waker = unsafe { tracker.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(tracker.was_woken());
}
#[test]
fn drop_unregisters_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let event = ManualResetEvent::boxed();
let tracker1 = AtomicWakeTracker::new();
let waker1 = unsafe { tracker1.waker() };
let mut cx1 = task::Context::from_waker(&waker1);
let tracker2 = AtomicWakeTracker::new();
let waker2 = unsafe { tracker2.waker() };
let mut cx2 = task::Context::from_waker(&waker2);
let mut future1 = Box::pin(event.wait());
assert!(future1.as_mut().poll(&mut cx1).is_pending());
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx2).is_pending());
drop(future1);
event.set();
assert!(!tracker1.was_woken());
assert!(tracker2.was_woken());
}
#[test]
fn embedded_drop_unregisters_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
let tracker1 = AtomicWakeTracker::new();
let waker1 = unsafe { tracker1.waker() };
let mut cx1 = task::Context::from_waker(&waker1);
let tracker2 = AtomicWakeTracker::new();
let waker2 = unsafe { tracker2.waker() };
let mut cx2 = task::Context::from_waker(&waker2);
let mut future1 = Box::pin(event.wait());
assert!(future1.as_mut().poll(&mut cx1).is_pending());
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx2).is_pending());
drop(future1);
event.set();
assert!(!tracker1.was_woken());
assert!(tracker2.was_woken());
}
#[test]
fn set_when_already_set_is_noop() {
let event = ManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_set_when_already_set_is_noop() {
let container = Box::pin(EmbeddedManualResetEvent::new());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
event.set();
assert!(event.try_wait());
}
#[test]
fn reset_while_waiters_registered() {
let event = ManualResetEvent::boxed();
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.reset();
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_default_creates_unset_event() {
let container = Box::pin(EmbeddedManualResetEvent::default());
let event = unsafe { ManualResetEvent::embedded(container.as_ref()) };
assert!(!event.try_wait());
}
#[test]
fn set_with_reentrant_waker_does_not_alias() {
use testing::ReentrantWakerData;
let event = ManualResetEvent::boxed();
let event_clone = event.clone();
let waker_data = ReentrantWakerData::new(move || {
event_clone.reset();
let mut new_future = Box::pin(event_clone.wait());
let noop = Waker::noop();
let mut cx = task::Context::from_waker(noop);
assert!(new_future.as_mut().poll(&mut cx).is_pending());
});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
#[test]
fn reentrant_reset_does_not_skip_awaiters() {
use testing::ReentrantWakerData;
let event = ManualResetEvent::boxed();
let event_for_waker = event.clone();
let waker_data_a = ReentrantWakerData::new(move || {
event_for_waker.reset();
});
let waker_a = unsafe { waker_data_a.waker() };
let mut cx_a = task::Context::from_waker(&waker_a);
let noop = Waker::noop();
let mut cx_b = task::Context::from_waker(noop);
let mut future_a = Box::pin(event.wait());
assert!(future_a.as_mut().poll(&mut cx_a).is_pending());
let mut future_b = Box::pin(event.wait());
assert!(future_b.as_mut().poll(&mut cx_b).is_pending());
event.set();
assert!(waker_data_a.was_woken());
assert!(future_b.as_mut().poll(&mut cx_b).is_ready());
}
#[test]
fn reentrant_drop_of_middle_sibling_does_not_skip_others() {
use std::cell::RefCell;
use std::rc::Rc;
use testing::ReentrantWakerData;
let event = ManualResetEvent::boxed();
let event_for_waker = event.clone();
let future_b_holder: Rc<RefCell<Option<Pin<Box<ManualResetWaitFuture>>>>> =
Rc::new(RefCell::new(None));
let holder_for_waker = Rc::clone(&future_b_holder);
let waker_data_a = ReentrantWakerData::new(move || {
drop(holder_for_waker.borrow_mut().take());
event_for_waker.reset();
});
let waker_a = unsafe { waker_data_a.waker() };
let mut cx_a = task::Context::from_waker(&waker_a);
let noop = Waker::noop();
let mut cx_noop = task::Context::from_waker(noop);
let mut future_a = Box::pin(event.wait());
assert!(future_a.as_mut().poll(&mut cx_a).is_pending());
let mut future_b = Box::pin(event.wait());
assert!(future_b.as_mut().poll(&mut cx_noop).is_pending());
*future_b_holder.borrow_mut() = Some(future_b);
let mut future_c = Box::pin(event.wait());
assert!(future_c.as_mut().poll(&mut cx_noop).is_pending());
event.set();
assert!(waker_data_a.was_woken());
assert!(future_b_holder.borrow().is_none());
assert!(future_c.as_mut().poll(&mut cx_noop).is_ready());
}
#[test]
fn reentrant_drop_of_tail_sibling_does_not_skip_others() {
use std::cell::RefCell;
use std::rc::Rc;
use testing::ReentrantWakerData;
let event = ManualResetEvent::boxed();
let event_for_waker = event.clone();
let future_c_holder: Rc<RefCell<Option<Pin<Box<ManualResetWaitFuture>>>>> =
Rc::new(RefCell::new(None));
let holder_for_waker = Rc::clone(&future_c_holder);
let waker_data_a = ReentrantWakerData::new(move || {
drop(holder_for_waker.borrow_mut().take());
event_for_waker.reset();
});
let waker_a = unsafe { waker_data_a.waker() };
let mut cx_a = task::Context::from_waker(&waker_a);
let noop = Waker::noop();
let mut cx_noop = task::Context::from_waker(noop);
let mut future_a = Box::pin(event.wait());
assert!(future_a.as_mut().poll(&mut cx_a).is_pending());
let mut future_b = Box::pin(event.wait());
assert!(future_b.as_mut().poll(&mut cx_noop).is_pending());
let future_c = Box::pin(event.wait());
let mut future_c = future_c;
assert!(future_c.as_mut().poll(&mut cx_noop).is_pending());
*future_c_holder.borrow_mut() = Some(future_c);
event.set();
assert!(waker_data_a.was_woken());
assert!(future_c_holder.borrow().is_none());
assert!(future_b.as_mut().poll(&mut cx_noop).is_ready());
}
#[test]
fn reentrant_register_during_set_does_not_notify_new_awaiter() {
use std::cell::RefCell;
use std::rc::Rc;
use testing::ReentrantWakerData;
let event = ManualResetEvent::boxed();
let event_for_waker = event.clone();
let late_future_holder: Rc<RefCell<Option<Pin<Box<ManualResetWaitFuture>>>>> =
Rc::new(RefCell::new(None));
let holder_for_waker = Rc::clone(&late_future_holder);
let waker_data_a = ReentrantWakerData::new(move || {
event_for_waker.reset();
let noop = Waker::noop();
let mut cx_noop = task::Context::from_waker(noop);
let mut new_future = Box::pin(event_for_waker.wait());
assert!(new_future.as_mut().poll(&mut cx_noop).is_pending());
*holder_for_waker.borrow_mut() = Some(new_future);
});
let waker_a = unsafe { waker_data_a.waker() };
let mut cx_a = task::Context::from_waker(&waker_a);
let mut future_a = Box::pin(event.wait());
assert!(future_a.as_mut().poll(&mut cx_a).is_pending());
event.set();
assert!(waker_data_a.was_woken());
let noop = Waker::noop();
let mut cx_noop = task::Context::from_waker(noop);
let mut late_future = late_future_holder
.borrow_mut()
.take()
.expect("reentrant waker registers the late future");
assert!(late_future.as_mut().poll(&mut cx_noop).is_pending());
event.set();
assert!(late_future.as_mut().poll(&mut cx_noop).is_ready());
}
}