use std::cell::{Cell, UnsafeCell};
use std::fmt;
use std::future::Future;
use std::marker::{PhantomData, PhantomPinned};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::ptr::NonNull;
use std::rc::Rc;
use std::task::{self, Poll, Waker};
use crate::waiter_list::{WaiterList, WaiterNode};
#[derive(Clone)]
pub struct LocalManualResetEvent {
inner: Rc<Inner>,
}
struct Inner {
is_set: Cell<bool>,
waiters: UnsafeCell<WaiterList>,
_not_send: PhantomData<*const ()>,
}
impl UnwindSafe for Inner {}
impl RefUnwindSafe for Inner {}
#[cfg(test)]
thread_local! {
static HOOK_SET_AFTER_WAKE: std::cell::RefCell<Option<Box<dyn Fn()>>> =
const { std::cell::RefCell::new(None) };
}
impl Inner {
#[cfg_attr(test, mutants::skip)]
fn set(&self) {
if self.is_set.get() {
return;
}
self.is_set.set(true);
let waiters_ptr = self.waiters.get();
loop {
let waker = {
let mut cursor = unsafe { (*waiters_ptr).head() };
loop {
if cursor.is_null() {
break None;
}
let w = unsafe { (*cursor).waker.take() };
if w.is_some() {
break w;
}
cursor = unsafe { (*cursor).next };
}
};
let Some(w) = waker else { break };
w.wake();
#[cfg(test)]
HOOK_SET_AFTER_WAKE.with(|hook| {
if let Some(f) = hook.borrow().as_ref() {
f();
}
});
if !self.is_set.get() {
break;
}
}
}
fn reset(&self) {
self.is_set.set(false);
}
fn try_wait(&self) -> bool {
self.is_set.get()
}
unsafe fn poll_wait(
&self,
node: &UnsafeCell<WaiterNode>,
registered: &mut bool,
waker: Waker,
) -> Poll<()> {
let node_ptr = node.get();
if self.is_set.get() {
if *registered {
let waiters = unsafe { &mut *self.waiters.get() };
unsafe {
waiters.remove(node_ptr);
}
*registered = false;
}
return Poll::Ready(());
}
unsafe {
(*node_ptr).waker = Some(waker);
}
if !*registered {
let waiters = unsafe { &mut *self.waiters.get() };
unsafe {
waiters.push_back(node_ptr);
}
*registered = true;
}
Poll::Pending
}
unsafe fn drop_wait(&self, node: &UnsafeCell<WaiterNode>, registered: bool) {
if registered {
let waiters = unsafe { &mut *self.waiters.get() };
unsafe {
waiters.remove(node.get());
}
}
}
}
impl LocalManualResetEvent {
#[must_use]
pub fn boxed() -> Self {
Self {
inner: Rc::new(Inner {
is_set: Cell::new(false),
waiters: UnsafeCell::new(WaiterList::new()),
_not_send: PhantomData,
}),
}
}
#[must_use]
pub unsafe fn embedded(place: Pin<&EmbeddedLocalManualResetEvent>) -> RawLocalManualResetEvent {
let inner = NonNull::from(&place.get_ref().inner);
RawLocalManualResetEvent { inner }
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg_attr(test, mutants::skip)]
pub fn set(&self) {
self.inner.set();
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn reset(&self) {
self.inner.reset();
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[must_use]
pub fn try_wait(&self) -> bool {
self.inner.try_wait()
}
#[must_use]
pub fn wait(&self) -> LocalManualResetWaitFuture {
LocalManualResetWaitFuture {
inner: Rc::clone(&self.inner),
node: UnsafeCell::new(WaiterNode::new()),
registered: false,
_pinned: PhantomPinned,
}
}
}
pub struct LocalManualResetWaitFuture {
inner: Rc<Inner>,
node: UnsafeCell<WaiterNode>,
registered: bool,
_pinned: PhantomPinned,
}
impl UnwindSafe for LocalManualResetWaitFuture {}
impl RefUnwindSafe for LocalManualResetWaitFuture {}
impl Future for LocalManualResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let this = unsafe { self.get_unchecked_mut() };
unsafe {
this.inner
.poll_wait(&this.node, &mut this.registered, cx.waker().clone())
}
}
}
impl Drop for LocalManualResetWaitFuture {
fn drop(&mut self) {
unsafe {
self.inner.drop_wait(&self.node, self.registered);
}
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for LocalManualResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalManualResetEvent")
.field("is_set", &self.inner.is_set.get())
.finish()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for LocalManualResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalManualResetWaitFuture")
.field("registered", &self.registered)
.finish_non_exhaustive()
}
}
pub struct EmbeddedLocalManualResetEvent {
inner: Inner,
_pinned: PhantomPinned,
}
impl EmbeddedLocalManualResetEvent {
#[must_use]
pub fn new() -> Self {
Self {
inner: Inner {
is_set: Cell::new(false),
waiters: UnsafeCell::new(WaiterList::new()),
_not_send: PhantomData,
},
_pinned: PhantomPinned,
}
}
}
impl Default for EmbeddedLocalManualResetEvent {
#[cfg_attr(coverage_nightly, coverage(off))] fn default() -> Self {
Self::new()
}
}
impl UnwindSafe for EmbeddedLocalManualResetEvent {}
impl RefUnwindSafe for EmbeddedLocalManualResetEvent {}
#[derive(Clone, Copy)]
pub struct RawLocalManualResetEvent {
inner: NonNull<Inner>,
}
impl UnwindSafe for RawLocalManualResetEvent {}
impl RefUnwindSafe for RawLocalManualResetEvent {}
impl RawLocalManualResetEvent {
fn inner(&self) -> &Inner {
unsafe { self.inner.as_ref() }
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg_attr(test, mutants::skip)]
pub fn set(&self) {
self.inner().set();
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn reset(&self) {
self.inner().reset();
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[must_use]
pub fn try_wait(&self) -> bool {
self.inner().try_wait()
}
#[must_use]
pub fn wait(&self) -> RawLocalManualResetWaitFuture {
RawLocalManualResetWaitFuture {
inner: self.inner,
node: UnsafeCell::new(WaiterNode::new()),
registered: false,
_pinned: PhantomPinned,
}
}
}
pub struct RawLocalManualResetWaitFuture {
inner: NonNull<Inner>,
node: UnsafeCell<WaiterNode>,
registered: bool,
_pinned: PhantomPinned,
}
impl UnwindSafe for RawLocalManualResetWaitFuture {}
impl RefUnwindSafe for RawLocalManualResetWaitFuture {}
impl Future for RawLocalManualResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let this = unsafe { self.get_unchecked_mut() };
let inner = unsafe { this.inner.as_ref() };
unsafe { inner.poll_wait(&this.node, &mut this.registered, cx.waker().clone()) }
}
}
impl Drop for RawLocalManualResetWaitFuture {
fn drop(&mut self) {
let inner = unsafe { self.inner.as_ref() };
unsafe {
inner.drop_wait(&self.node, self.registered);
}
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedLocalManualResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EmbeddedLocalManualResetEvent")
.finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for RawLocalManualResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let is_set = self.try_wait();
f.debug_struct("RawLocalManualResetEvent")
.field("is_set", &is_set)
.finish()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for RawLocalManualResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawLocalManualResetWaitFuture")
.field("registered", &self.registered)
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::task::Waker;
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
assert_impl_all!(LocalManualResetEvent: Clone, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(LocalManualResetEvent: Send, Sync);
assert_impl_all!(LocalManualResetWaitFuture: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(LocalManualResetWaitFuture: Send, Sync, Unpin);
assert_impl_all!(EmbeddedLocalManualResetEvent: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedLocalManualResetEvent: Send, Sync, Unpin);
assert_impl_all!(RawLocalManualResetEvent: Clone, Copy, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(RawLocalManualResetEvent: Send, Sync);
assert_impl_all!(RawLocalManualResetWaitFuture: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(RawLocalManualResetWaitFuture: Send, Sync, Unpin);
#[test]
fn starts_unset() {
let event = LocalManualResetEvent::boxed();
assert!(!event.try_wait());
}
#[test]
fn set_and_reset() {
let event = LocalManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
event.reset();
assert!(!event.try_wait());
}
#[test]
fn clone_shares_state() {
let a = LocalManualResetEvent::boxed();
let b = a.clone();
a.set();
assert!(b.try_wait());
}
#[test]
fn wait_completes_when_already_set() {
futures::executor::block_on(async {
let event = LocalManualResetEvent::boxed();
event.set();
event.wait().await;
});
}
#[test]
fn wait_completes_after_set() {
futures::executor::block_on(async {
let event = LocalManualResetEvent::boxed();
let future = event.wait();
event.set();
future.await;
});
}
#[test]
fn drop_future_while_waiting() {
futures::executor::block_on(async {
let event = LocalManualResetEvent::boxed();
{
let _f = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn embedded_set_and_wait() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
event.set();
event.wait().await;
});
}
#[test]
fn embedded_clone_shares_state() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let a = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
let b = a;
a.set();
assert!(b.try_wait());
b.wait().await;
});
}
#[test]
fn embedded_reset_after_set() {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
event.set();
event.reset();
assert!(!event.try_wait());
}
#[test]
fn embedded_drop_future_while_waiting() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn wait_registers_then_completes() {
let event = LocalManualResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn drop_registered_future() {
let event = LocalManualResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_wait_registers_then_completes() {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_drop_registered_future() {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_multiple_waiters_released() {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
let mut f1 = Box::pin(event.wait());
let mut f2 = Box::pin(event.wait());
let mut f3 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(f1.as_mut().poll(&mut cx).is_pending());
assert!(f2.as_mut().poll(&mut cx).is_pending());
assert!(f3.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(f1.as_mut().poll(&mut cx).is_ready());
assert!(f2.as_mut().poll(&mut cx).is_ready());
assert!(f3.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn set_with_reentrant_waker_does_not_alias() {
use crate::test_helpers::ReentrantWakerData;
let event = LocalManualResetEvent::boxed();
let event_clone = event.clone();
let waker_data = ReentrantWakerData::new(move || {
event_clone.reset();
let mut new_future = Box::pin(event_clone.wait());
let noop = Waker::noop();
let mut cx = task::Context::from_waker(noop);
assert!(new_future.as_mut().poll(&mut cx).is_pending());
});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
#[test]
fn embedded_set_with_reentrant_waker_does_not_alias() {
use crate::test_helpers::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
let waker_data = ReentrantWakerData::new(move || {
event.reset();
let mut new_future = Box::pin(event.wait());
let noop = Waker::noop();
let mut cx = task::Context::from_waker(noop);
assert!(new_future.as_mut().poll(&mut cx).is_pending());
});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
#[test]
fn try_wait_returns_false_when_unset() {
let event = LocalManualResetEvent::boxed();
assert!(!event.try_wait());
}
#[test]
fn try_wait_returns_true_when_set() {
let event = LocalManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_try_wait_returns_false_when_unset() {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
assert!(!event.try_wait());
}
#[test]
fn embedded_try_wait_returns_true_when_set() {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_set_wakes_registered_waiter() {
use crate::test_helpers::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
let waker_data = ReentrantWakerData::new(|| {});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
#[test]
fn drop_unlinks_registered_waiter_from_list() {
use crate::test_helpers::ReentrantWakerData;
let event = LocalManualResetEvent::boxed();
let tracker1 = ReentrantWakerData::new(|| {});
let waker1 = unsafe { tracker1.waker() };
let mut cx1 = task::Context::from_waker(&waker1);
let tracker2 = ReentrantWakerData::new(|| {});
let waker2 = unsafe { tracker2.waker() };
let mut cx2 = task::Context::from_waker(&waker2);
let mut future1 = Box::pin(event.wait());
assert!(future1.as_mut().poll(&mut cx1).is_pending());
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx2).is_pending());
drop(future1);
event.set();
assert!(!tracker1.was_woken());
assert!(tracker2.was_woken());
}
#[test]
fn embedded_drop_unlinks_registered_waiter_from_list() {
use crate::test_helpers::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
let tracker1 = ReentrantWakerData::new(|| {});
let waker1 = unsafe { tracker1.waker() };
let mut cx1 = task::Context::from_waker(&waker1);
let tracker2 = ReentrantWakerData::new(|| {});
let waker2 = unsafe { tracker2.waker() };
let mut cx2 = task::Context::from_waker(&waker2);
let mut future1 = Box::pin(event.wait());
assert!(future1.as_mut().poll(&mut cx1).is_pending());
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut cx2).is_pending());
drop(future1);
event.set();
assert!(!tracker1.was_woken());
assert!(tracker2.was_woken());
}
#[test]
fn set_when_already_set_is_noop() {
let event = LocalManualResetEvent::boxed();
event.set();
assert!(event.try_wait());
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_set_when_already_set_is_noop() {
let container = Box::pin(EmbeddedLocalManualResetEvent::new());
let event = unsafe { LocalManualResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
event.set();
assert!(event.try_wait());
}
#[test]
fn set_terminates_when_reset_called_during_wake_loop() {
use std::cell::RefCell;
use std::rc::Rc;
testing::with_watchdog(|| {
let event = LocalManualResetEvent::boxed();
let future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let future_cell: Rc<RefCell<Pin<Box<LocalManualResetWaitFuture>>>> =
Rc::new(RefCell::new(future));
assert!(future_cell.borrow_mut().as_mut().poll(&mut cx).is_pending());
let event_for_hook = event.clone();
let future_for_hook = Rc::clone(&future_cell);
HOOK_SET_AFTER_WAKE.with(|hook| {
*hook.borrow_mut() = Some(Box::new(move || {
event_for_hook.reset();
let w = Waker::noop();
let mut cx = task::Context::from_waker(w);
let _poll = future_for_hook.borrow_mut().as_mut().poll(&mut cx);
}));
});
event.set();
HOOK_SET_AFTER_WAKE.with(|hook| {
*hook.borrow_mut() = None;
});
});
}
}