use std::any::type_name;
use std::fmt;
use std::future::Future;
use std::marker::PhantomPinned;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{self, Poll, Waker};
use awaiter_set::{Awaiter, AwaiterSet};
use crate::NEVER_POISONED;
#[derive(Clone)]
pub struct AutoResetEvent {
inner: Arc<EventInner>,
}
const IDLE: u8 = 0;
const SIGNALED: u8 = 0x1;
const HAS_WAITERS: u8 = 0x2;
struct EventInner {
state: AtomicU8,
slow: Mutex<AwaiterSet>,
}
impl EventInner {
fn set(&self) {
if self
.state
.compare_exchange(IDLE, SIGNALED, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
return;
}
let prev = self.state.load(Ordering::Relaxed);
if prev & SIGNALED != 0 {
return;
}
#[cfg(test)]
crate::test_hooks::run(&crate::test_hooks::AUTO_SET_PRE_LOCK);
let waker: Option<Waker>;
{
let mut waiters = self.slow.lock().expect(NEVER_POISONED);
if let Some(w) = waiters.notify_one() {
if waiters.is_empty() {
self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed);
}
waker = Some(w);
} else {
self.state.store(SIGNALED, Ordering::Release);
waker = None;
}
}
if let Some(w) = waker {
w.wake();
}
}
fn try_wait(&self) -> bool {
self.state.fetch_and(!SIGNALED, Ordering::Acquire) & SIGNALED != 0
}
unsafe fn poll_wait(&self, awaiter: *mut Awaiter, waker: Waker) -> Poll<()> {
if self.try_wait() {
return Poll::Ready(());
}
let awaiter_ref = unsafe { &*awaiter };
if awaiter_ref.take_notification() {
return Poll::Ready(());
}
#[cfg(test)]
crate::test_hooks::run(&crate::test_hooks::AUTO_PRE_MUTEX);
let mut waiters = self.slow.lock().expect(NEVER_POISONED);
if awaiter_ref.take_notification() {
return Poll::Ready(());
}
#[cfg(test)]
crate::test_hooks::run(&crate::test_hooks::AUTO_PRE_TRY_WAIT);
if self.try_wait() {
return Poll::Ready(());
}
#[cfg(test)]
crate::test_hooks::run(&crate::test_hooks::AUTO_PRE_FETCH_OR);
self.state.fetch_or(HAS_WAITERS, Ordering::Relaxed);
if self.try_wait() {
self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed);
return Poll::Ready(());
}
let awaiter_mut = unsafe { &mut *awaiter };
let awaiter_mut = unsafe { Pin::new_unchecked(awaiter_mut) };
unsafe {
waiters.register(awaiter_mut, waker);
}
Poll::Pending
}
unsafe fn drop_wait(&self, awaiter: *mut Awaiter) {
let awaiter_ref = unsafe { &*awaiter };
if !awaiter_ref.is_registered() {
return;
}
let mut waiters = self.slow.lock().expect(NEVER_POISONED);
if awaiter_ref.is_notified() {
if let Some(waker) = waiters.notify_one() {
if waiters.is_empty() {
self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed);
}
drop(waiters);
waker.wake();
} else {
self.state.store(SIGNALED, Ordering::Release);
}
} else {
let awaiter_mut = unsafe { &mut *awaiter };
let awaiter_mut = unsafe { Pin::new_unchecked(awaiter_mut) };
unsafe {
waiters.unregister(awaiter_mut);
}
if waiters.is_empty() {
self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed);
}
}
}
}
impl AutoResetEvent {
#[must_use]
pub fn boxed() -> Self {
Self {
inner: Arc::new(EventInner {
state: AtomicU8::new(IDLE),
slow: Mutex::new(AwaiterSet::new()),
}),
}
}
#[must_use]
pub unsafe fn embedded(place: Pin<&EmbeddedAutoResetEvent>) -> EmbeddedAutoResetEventRef {
let inner = NonNull::from(&place.get_ref().inner);
EmbeddedAutoResetEventRef { inner }
}
#[inline]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn set(&self) {
self.inner.set();
}
#[inline]
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn try_wait(&self) -> bool {
self.inner.try_wait()
}
#[must_use]
pub fn wait(&self) -> AutoResetWaitFuture {
AutoResetWaitFuture {
inner: Arc::clone(&self.inner),
awaiter: Awaiter::new(),
}
}
}
pub struct AutoResetWaitFuture {
inner: Arc<EventInner>,
awaiter: Awaiter,
}
unsafe impl Send for AutoResetWaitFuture {}
impl UnwindSafe for AutoResetWaitFuture {}
impl RefUnwindSafe for AutoResetWaitFuture {}
impl Future for AutoResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
let awaiter: *mut Awaiter = &raw mut this.awaiter;
unsafe { this.inner.poll_wait(awaiter, waker) }
}
}
impl Drop for AutoResetWaitFuture {
fn drop(&mut self) {
let awaiter: *mut Awaiter = &raw mut self.awaiter;
unsafe { self.inner.drop_wait(awaiter) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl fmt::Debug for AutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))] impl fmt::Debug for AutoResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.finish_non_exhaustive()
}
}
pub struct EmbeddedAutoResetEvent {
inner: EventInner,
_pinned: PhantomPinned,
}
impl EmbeddedAutoResetEvent {
#[must_use]
pub fn new() -> Self {
Self {
inner: EventInner {
state: AtomicU8::new(IDLE),
slow: Mutex::new(AwaiterSet::new()),
},
_pinned: PhantomPinned,
}
}
}
impl Default for EmbeddedAutoResetEvent {
#[cfg_attr(coverage_nightly, coverage(off))] fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Copy)]
pub struct EmbeddedAutoResetEventRef {
inner: NonNull<EventInner>,
}
unsafe impl Send for EmbeddedAutoResetEventRef {}
unsafe impl Sync for EmbeddedAutoResetEventRef {}
impl UnwindSafe for EmbeddedAutoResetEventRef {}
impl RefUnwindSafe for EmbeddedAutoResetEventRef {}
impl EmbeddedAutoResetEventRef {
fn inner(&self) -> &EventInner {
unsafe { self.inner.as_ref() }
}
#[inline]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn set(&self) {
self.inner().set();
}
#[inline]
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))] pub fn try_wait(&self) -> bool {
self.inner().try_wait()
}
#[must_use]
pub fn wait(&self) -> EmbeddedAutoResetWaitFuture {
EmbeddedAutoResetWaitFuture {
inner: self.inner,
awaiter: Awaiter::new(),
}
}
}
pub struct EmbeddedAutoResetWaitFuture {
inner: NonNull<EventInner>,
awaiter: Awaiter,
}
unsafe impl Send for EmbeddedAutoResetWaitFuture {}
impl UnwindSafe for EmbeddedAutoResetWaitFuture {}
impl RefUnwindSafe for EmbeddedAutoResetWaitFuture {}
impl Future for EmbeddedAutoResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
let inner = unsafe { this.inner.as_ref() };
let awaiter: *mut Awaiter = &raw mut this.awaiter;
unsafe { inner.poll_wait(awaiter, waker) }
}
}
impl Drop for EmbeddedAutoResetWaitFuture {
fn drop(&mut self) {
let inner = unsafe { self.inner.as_ref() };
let awaiter: *mut Awaiter = &raw mut self.awaiter;
unsafe { inner.drop_wait(awaiter) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedAutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedAutoResetEventRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedAutoResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::sync::Barrier;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{iter, thread};
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
use crate::test_hooks::BarrierHook;
assert_impl_all!(AutoResetEvent: Send, Sync, Clone, UnwindSafe, RefUnwindSafe);
assert_impl_all!(AutoResetWaitFuture: Send, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(AutoResetWaitFuture: Sync, Unpin);
assert_impl_all!(EmbeddedAutoResetEvent: Send, Sync, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedAutoResetEvent: Unpin);
assert_impl_all!(EmbeddedAutoResetEventRef: Send, Sync, Clone, Copy, UnwindSafe, RefUnwindSafe);
assert_impl_all!(EmbeddedAutoResetWaitFuture: Send, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedAutoResetWaitFuture: Sync, Unpin);
#[test]
fn starts_unset() {
let event = AutoResetEvent::boxed();
assert!(!event.try_wait());
}
#[test]
fn set_then_try_wait() {
let event = AutoResetEvent::boxed();
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn clone_shares_state() {
let a = AutoResetEvent::boxed();
let b = a.clone();
a.set();
assert!(b.try_wait());
}
#[test]
fn double_set_without_waiter_only_stores_one_signal() {
let event = AutoResetEvent::boxed();
event.set();
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn wait_completes_when_already_set() {
futures::executor::block_on(async {
let event = AutoResetEvent::boxed();
event.set();
event.wait().await;
assert!(!event.try_wait());
});
}
#[test]
fn wait_completes_after_set() {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn only_one_waiter_released_per_set() {
let event = AutoResetEvent::boxed();
let mut f1 = Box::pin(event.wait());
let mut f2 = Box::pin(event.wait());
let mut f3 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(f1.as_mut().poll(&mut cx).is_pending());
assert!(f2.as_mut().poll(&mut cx).is_pending());
assert!(f3.as_mut().poll(&mut cx).is_pending());
event.set();
let mut ready_count = 0_u32;
for f in [f1.as_mut(), f2.as_mut(), f3.as_mut()] {
if f.poll(&mut cx).is_ready() {
ready_count = ready_count.checked_add(1).unwrap();
}
}
assert_eq!(ready_count, 1);
event.set();
event.set();
for f in [f1.as_mut(), f2.as_mut(), f3.as_mut()] {
if f.poll(&mut cx).is_ready() {
ready_count = ready_count.checked_add(1).unwrap();
}
}
assert_eq!(ready_count, 3);
}
#[test]
fn cancelled_waiter_forwards_notification() {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn drop_unpolled_future_is_safe() {
let event = AutoResetEvent::boxed();
{
let _future = event.wait();
}
event.set();
futures::executor::block_on(event.wait());
}
#[test]
fn set_from_another_thread() {
testing::with_watchdog(|| {
let event = AutoResetEvent::boxed();
let setter = event.clone();
let barrier = Arc::new(Barrier::new(2));
let b2 = Arc::clone(&barrier);
let handle = thread::spawn(move || {
b2.wait();
setter.set();
});
barrier.wait();
while !event.try_wait() {
std::hint::spin_loop();
}
handle.join().unwrap();
});
}
#[test]
fn only_one_thread_acquires_signal() {
testing::with_watchdog(|| {
let event = AutoResetEvent::boxed();
let waiter_count = 4;
let barrier = Arc::new(Barrier::new(waiter_count + 1));
let acquired_count = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = iter::repeat_with(|| {
let e = event.clone();
let b = Arc::clone(&barrier);
let count = Arc::clone(&acquired_count);
thread::spawn(move || {
b.wait();
for _ in 0..200 {
if e.try_wait() {
count.fetch_add(1, Ordering::Relaxed);
}
std::hint::spin_loop();
}
})
})
.take(waiter_count)
.collect();
event.set();
barrier.wait();
for h in handles {
h.join().unwrap();
}
assert_eq!(acquired_count.load(Ordering::Relaxed), 1);
});
}
#[test]
fn multiple_sets_release_multiple_threads() {
testing::with_watchdog(|| {
let event = AutoResetEvent::boxed();
let signal_count = 4;
let barrier = Arc::new(Barrier::new(signal_count + 1));
let acquired_count = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = iter::repeat_with(|| {
let e = event.clone();
let b = Arc::clone(&barrier);
let count = Arc::clone(&acquired_count);
thread::spawn(move || {
b.wait();
while !e.try_wait() {
std::hint::spin_loop();
}
count.fetch_add(1, Ordering::Relaxed);
})
})
.take(signal_count)
.collect();
barrier.wait();
while acquired_count.load(Ordering::Relaxed) < signal_count {
event.set();
std::hint::spin_loop();
}
for h in handles {
h.join().unwrap();
}
});
}
#[test]
fn poll_wait_post_mutex_take_notification_branch() {
testing::with_watchdog(|| {
let BarrierHook {
entered,
proceed,
hook,
} = crate::test_hooks::barrier_hook();
crate::test_hooks::with_hook(&crate::test_hooks::AUTO_PRE_MUTEX, hook, || {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
let producer = thread::spawn(move || {
crate::test_hooks::HOOK_PARTICIPANT.with(|c| c.set(true));
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
future.as_mut().poll(&mut cx)
});
entered.wait();
event.set();
proceed.wait();
assert!(producer.join().unwrap().is_ready());
});
});
}
#[test]
fn poll_wait_post_mutex_try_wait_branch() {
testing::with_watchdog(|| {
let BarrierHook {
entered,
proceed,
hook,
} = crate::test_hooks::barrier_hook();
crate::test_hooks::with_hook(&crate::test_hooks::AUTO_PRE_TRY_WAIT, hook, || {
let event = AutoResetEvent::boxed();
let producer = thread::spawn({
let event = event.clone();
move || {
crate::test_hooks::HOOK_PARTICIPANT.with(|c| c.set(true));
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
future.as_mut().poll(&mut cx)
}
});
entered.wait();
event.set();
proceed.wait();
assert!(producer.join().unwrap().is_ready());
assert!(!event.try_wait());
});
});
}
#[test]
fn poll_wait_post_fetch_or_try_wait_branch() {
testing::with_watchdog(|| {
let BarrierHook {
entered,
proceed,
hook,
} = crate::test_hooks::barrier_hook();
crate::test_hooks::with_hook(&crate::test_hooks::AUTO_PRE_FETCH_OR, hook, || {
let event = AutoResetEvent::boxed();
let producer = thread::spawn({
let event = event.clone();
move || {
crate::test_hooks::HOOK_PARTICIPANT.with(|c| c.set(true));
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
future.as_mut().poll(&mut cx)
}
});
entered.wait();
event.set();
proceed.wait();
assert!(producer.join().unwrap().is_ready());
assert!(!event.try_wait());
});
});
}
#[test]
fn set_no_waiters_despite_has_waiters_branch() {
testing::with_watchdog(|| {
let BarrierHook {
entered,
proceed,
hook,
} = crate::test_hooks::barrier_hook();
crate::test_hooks::with_hook(&crate::test_hooks::AUTO_SET_PRE_LOCK, hook, || {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
let producer = thread::spawn({
let event = event.clone();
move || {
crate::test_hooks::HOOK_PARTICIPANT.with(|c| c.set(true));
event.set();
}
});
entered.wait();
drop(future);
proceed.wait();
producer.join().unwrap();
assert!(event.try_wait());
});
});
}
#[test]
fn await_races_with_set_across_threads() {
testing::with_watchdog(|| {
const ITERATIONS: usize = 200;
let event = AutoResetEvent::boxed();
for _ in 0..ITERATIONS {
let barrier = Arc::new(Barrier::new(2));
let setter_handle = thread::spawn({
let event = event.clone();
let barrier = Arc::clone(&barrier);
move || {
barrier.wait();
event.set();
}
});
let waiter_handle = thread::spawn({
let event = event.clone();
let barrier = Arc::clone(&barrier);
move || {
barrier.wait();
futures::executor::block_on(event.wait());
}
});
setter_handle.join().unwrap();
waiter_handle.join().unwrap();
}
});
}
#[test]
fn embedded_set_from_another_thread() {
testing::with_watchdog(|| {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let setter = event;
let barrier = Arc::new(Barrier::new(2));
let b2 = Arc::clone(&barrier);
let handle = thread::spawn(move || {
b2.wait();
setter.set();
});
barrier.wait();
while !event.try_wait() {
std::hint::spin_loop();
}
handle.join().unwrap();
});
}
#[test]
fn embedded_set_and_wait() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
event.set();
event.wait().await;
});
}
#[test]
fn embedded_clone_shares_state() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let a = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let b = a;
a.set();
assert!(b.try_wait());
}
#[test]
fn embedded_signal_consumed() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn embedded_drop_future_while_waiting() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn notified_then_dropped_re_sets_event() {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn notified_then_dropped_while_set_preserves_signal() {
let event = AutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn notified_then_dropped_forwards_to_next() {
let event = AutoResetEvent::boxed();
let mut future1 = Box::pin(event.wait());
let mut future2 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future1.as_mut().poll(&mut cx).is_pending());
assert!(future2.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future1);
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn set_wakes_registered_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let event = AutoResetEvent::boxed();
let tracker = AtomicWakeTracker::new();
let waker = unsafe { tracker.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(tracker.was_woken());
}
#[test]
fn embedded_wait_registers_then_completes() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_drop_registered_future() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_notified_then_dropped_re_sets_event() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn embedded_notified_then_dropped_forwards_to_next() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let mut future1 = Box::pin(event.wait());
let mut future2 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future1.as_mut().poll(&mut cx).is_pending());
assert!(future2.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future1);
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_set_wakes_registered_waiter() {
use crate::test_helpers::AtomicWakeTracker;
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let tracker = AtomicWakeTracker::new();
let waker = unsafe { tracker.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(tracker.was_woken());
}
const WAITER_COUNT: usize = 100;
#[test]
fn many_sets_release_all_waiters() {
let event = AutoResetEvent::boxed();
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut futures: Vec<_> = iter::repeat_with(|| Box::pin(event.wait()))
.take(WAITER_COUNT)
.collect();
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_pending());
}
for _ in 0..WAITER_COUNT {
event.set();
}
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_ready());
}
assert!(!event.try_wait());
}
#[test]
fn embedded_many_sets_release_all_waiters() {
let container = Box::pin(EmbeddedAutoResetEvent::new());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut futures: Vec<_> = iter::repeat_with(|| Box::pin(event.wait()))
.take(WAITER_COUNT)
.collect();
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_pending());
}
for _ in 0..WAITER_COUNT {
event.set();
}
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_ready());
}
assert!(!event.try_wait());
}
#[test]
fn many_sets_without_waiters_coalesce() {
let event = AutoResetEvent::boxed();
for _ in 0..WAITER_COUNT {
event.set();
}
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn set_with_reentrant_waker_does_not_deadlock() {
use testing::ReentrantWakerData;
let event = AutoResetEvent::boxed();
let event_for_waker = event.clone();
let waker_data = ReentrantWakerData::new(move || {
event_for_waker.set();
});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
assert!(event.try_wait());
}
#[test]
fn drop_forwarding_with_reentrant_waker_does_not_alias() {
use testing::ReentrantWakerData;
let event = AutoResetEvent::boxed();
let event_clone = event.clone();
let mut future1 = Box::pin(event.wait());
let noop_waker = Waker::noop();
let mut noop_cx = task::Context::from_waker(noop_waker);
assert!(future1.as_mut().poll(&mut noop_cx).is_pending());
let waker_data = ReentrantWakerData::new(move || {
event_clone.set();
});
let waker = unsafe { waker_data.waker() };
let mut reentrant_cx = task::Context::from_waker(&waker);
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut reentrant_cx).is_pending());
event.set();
drop(future1);
assert!(waker_data.was_woken());
}
#[test]
fn embedded_default_creates_unset_event() {
let container = Box::pin(EmbeddedAutoResetEvent::default());
let event = unsafe { AutoResetEvent::embedded(container.as_ref()) };
assert!(!event.try_wait());
}
}