use std::cell::UnsafeCell;
use std::fmt;
use std::future::Future;
use std::marker::{PhantomData, PhantomPinned};
use std::mem;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::ptr::NonNull;
use std::rc::Rc;
use std::task::{self, Poll, Waker};
use crate::waiter_list::{WaiterList, WaiterNode};
#[derive(Clone)]
pub struct LocalAutoResetEvent {
inner: Rc<Inner>,
}
enum InnerState {
Unset(WaiterList),
Set,
}
struct Inner {
state: UnsafeCell<InnerState>,
_not_send: PhantomData<*const ()>,
}
impl UnwindSafe for Inner {}
impl RefUnwindSafe for Inner {}
impl Inner {
#[cfg_attr(test, mutants::skip)]
fn set(&self) {
let state_ptr = self.state.get();
let waker = {
let state = unsafe { &mut *state_ptr };
match state {
InnerState::Set => None,
InnerState::Unset(waiters) => {
if let Some(node_ptr) = unsafe { waiters.pop_front() } {
unsafe {
(*node_ptr).notified = true;
}
unsafe { (*node_ptr).waker.take() }
} else {
*state = InnerState::Set;
None
}
}
}
};
if let Some(w) = waker {
w.wake();
}
}
fn try_wait(&self) -> bool {
let state = unsafe { &mut *self.state.get() };
if matches!(state, InnerState::Set) {
*state = InnerState::Unset(WaiterList::new());
true
} else {
false
}
}
unsafe fn poll_wait(
&self,
node: &UnsafeCell<WaiterNode>,
registered: &mut bool,
waker: Waker,
) -> Poll<()> {
let node_ptr = node.get();
if unsafe { (*node_ptr).notified } {
*registered = false;
return Poll::Ready(());
}
let state = unsafe { &mut *self.state.get() };
match state {
InnerState::Set => {
debug_assert!(
!*registered,
"Set state is exclusive with registered waiters"
);
*state = InnerState::Unset(WaiterList::new());
Poll::Ready(())
}
InnerState::Unset(waiters) => {
unsafe {
(*node_ptr).waker = Some(waker);
}
if !*registered {
unsafe {
waiters.push_back(node_ptr);
}
*registered = true;
}
Poll::Pending
}
}
}
unsafe fn drop_wait(&self, node: &UnsafeCell<WaiterNode>, registered: bool) {
if !registered {
return;
}
let node_ptr = node.get();
if unsafe { (*node_ptr).notified } {
let state_ptr = self.state.get();
let old_state =
unsafe { mem::replace(&mut *state_ptr, InnerState::Unset(WaiterList::new())) };
let waker = match old_state {
InnerState::Unset(mut waiters) => {
if let Some(next_node) = unsafe { waiters.pop_front() } {
unsafe {
(*next_node).notified = true;
}
let waker = unsafe { (*next_node).waker.take() };
unsafe {
*state_ptr = InnerState::Unset(waiters);
}
waker
} else {
unsafe {
*state_ptr = InnerState::Set;
}
None
}
}
InnerState::Set => {
unsafe {
*state_ptr = InnerState::Set;
}
None
}
};
if let Some(w) = waker {
w.wake();
}
} else {
let state = unsafe { &mut *self.state.get() };
match state {
InnerState::Unset(waiters) => {
unsafe {
waiters.remove(node_ptr);
}
}
InnerState::Set => {
debug_assert!(false, "registered non-notified node requires Unset state");
}
}
}
}
}
impl LocalAutoResetEvent {
#[must_use]
pub fn boxed() -> Self {
Self {
inner: Rc::new(Inner {
state: UnsafeCell::new(InnerState::Unset(WaiterList::new())),
_not_send: PhantomData,
}),
}
}
#[must_use]
pub unsafe fn embedded(place: Pin<&EmbeddedLocalAutoResetEvent>) -> RawLocalAutoResetEvent {
let inner = NonNull::from(&place.get_ref().inner);
RawLocalAutoResetEvent { inner }
}
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn set(&self) {
self.inner.set();
}
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn try_wait(&self) -> bool {
self.inner.try_wait()
}
#[must_use]
pub fn wait(&self) -> LocalAutoResetWaitFuture {
LocalAutoResetWaitFuture {
inner: Rc::clone(&self.inner),
node: UnsafeCell::new(WaiterNode::new()),
registered: false,
_pinned: PhantomPinned,
}
}
}
pub struct LocalAutoResetWaitFuture {
inner: Rc<Inner>,
node: UnsafeCell<WaiterNode>,
registered: bool,
_pinned: PhantomPinned,
}
impl UnwindSafe for LocalAutoResetWaitFuture {}
impl RefUnwindSafe for LocalAutoResetWaitFuture {}
impl Future for LocalAutoResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let this = unsafe { self.get_unchecked_mut() };
unsafe {
this.inner
.poll_wait(&this.node, &mut this.registered, cx.waker().clone())
}
}
}
impl Drop for LocalAutoResetWaitFuture {
fn drop(&mut self) {
unsafe { self.inner.drop_wait(&self.node, self.registered) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for LocalAutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalAutoResetEvent")
.finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for LocalAutoResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalAutoResetWaitFuture")
.field("registered", &self.registered)
.finish_non_exhaustive()
}
}
pub struct EmbeddedLocalAutoResetEvent {
inner: Inner,
_pinned: PhantomPinned,
}
impl EmbeddedLocalAutoResetEvent {
#[must_use]
pub fn new() -> Self {
Self {
inner: Inner {
state: UnsafeCell::new(InnerState::Unset(WaiterList::new())),
_not_send: PhantomData,
},
_pinned: PhantomPinned,
}
}
}
impl Default for EmbeddedLocalAutoResetEvent {
#[cfg_attr(coverage_nightly, coverage(off))] fn default() -> Self {
Self::new()
}
}
impl UnwindSafe for EmbeddedLocalAutoResetEvent {}
impl RefUnwindSafe for EmbeddedLocalAutoResetEvent {}
#[derive(Clone, Copy)]
pub struct RawLocalAutoResetEvent {
inner: NonNull<Inner>,
}
impl UnwindSafe for RawLocalAutoResetEvent {}
impl RefUnwindSafe for RawLocalAutoResetEvent {}
impl RawLocalAutoResetEvent {
fn inner(&self) -> &Inner {
unsafe { self.inner.as_ref() }
}
#[cfg_attr(test, mutants::skip)]
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn set(&self) {
self.inner().set();
}
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn try_wait(&self) -> bool {
self.inner().try_wait()
}
#[must_use]
pub fn wait(&self) -> RawLocalAutoResetWaitFuture {
RawLocalAutoResetWaitFuture {
inner: self.inner,
node: UnsafeCell::new(WaiterNode::new()),
registered: false,
_pinned: PhantomPinned,
}
}
}
pub struct RawLocalAutoResetWaitFuture {
inner: NonNull<Inner>,
node: UnsafeCell<WaiterNode>,
registered: bool,
_pinned: PhantomPinned,
}
impl UnwindSafe for RawLocalAutoResetWaitFuture {}
impl RefUnwindSafe for RawLocalAutoResetWaitFuture {}
impl Future for RawLocalAutoResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let this = unsafe { self.get_unchecked_mut() };
let inner = unsafe { this.inner.as_ref() };
unsafe { inner.poll_wait(&this.node, &mut this.registered, cx.waker().clone()) }
}
}
impl Drop for RawLocalAutoResetWaitFuture {
fn drop(&mut self) {
let inner = unsafe { self.inner.as_ref() };
unsafe { inner.drop_wait(&self.node, self.registered) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedLocalAutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EmbeddedLocalAutoResetEvent")
.finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for RawLocalAutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawLocalAutoResetEvent")
.finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for RawLocalAutoResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawLocalAutoResetWaitFuture")
.field("registered", &self.registered)
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::iter;
use std::task::Waker;
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
assert_impl_all!(LocalAutoResetEvent: Clone, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(LocalAutoResetEvent: Send, Sync);
assert_impl_all!(LocalAutoResetWaitFuture: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(LocalAutoResetWaitFuture: Send, Sync, Unpin);
assert_impl_all!(EmbeddedLocalAutoResetEvent: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedLocalAutoResetEvent: Send, Sync, Unpin);
assert_impl_all!(RawLocalAutoResetEvent: Clone, Copy, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(RawLocalAutoResetEvent: Send, Sync);
assert_impl_all!(RawLocalAutoResetWaitFuture: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(RawLocalAutoResetWaitFuture: Send, Sync, Unpin);
#[test]
fn starts_unset() {
let event = LocalAutoResetEvent::boxed();
assert!(!event.try_wait());
}
#[test]
fn set_then_try_wait() {
let event = LocalAutoResetEvent::boxed();
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn clone_shares_state() {
let a = LocalAutoResetEvent::boxed();
let b = a.clone();
a.set();
assert!(b.try_wait());
}
#[test]
fn wait_completes_when_already_set() {
futures::executor::block_on(async {
let event = LocalAutoResetEvent::boxed();
event.set();
event.wait().await;
assert!(!event.try_wait());
});
}
#[test]
fn wait_completes_after_set() {
futures::executor::block_on(async {
let event = LocalAutoResetEvent::boxed();
let future = event.wait();
event.set();
future.await;
});
}
#[test]
fn drop_future_while_waiting() {
futures::executor::block_on(async {
let event = LocalAutoResetEvent::boxed();
{
let _f = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn embedded_set_and_wait() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
event.set();
event.wait().await;
});
}
#[test]
fn embedded_clone_shares_state() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let a = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let b = a;
a.set();
assert!(b.try_wait());
}
#[test]
fn embedded_signal_consumed() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn embedded_drop_future_while_waiting() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn wait_registers_then_completes() {
let event = LocalAutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn drop_registered_future() {
let event = LocalAutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
assert!(event.try_wait());
}
#[test]
fn notified_then_dropped_re_sets_event() {
let event = LocalAutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn notified_then_dropped_forwards_to_next() {
let event = LocalAutoResetEvent::boxed();
let mut future1 = Box::pin(event.wait());
let mut future2 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future1.as_mut().poll(&mut cx).is_pending());
assert!(future2.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future1);
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_wait_registers_then_completes() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_drop_registered_future() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_notified_then_dropped_re_sets_event() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn embedded_notified_then_dropped_forwards_to_next() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future1 = Box::pin(event.wait());
let mut future2 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future1.as_mut().poll(&mut cx).is_pending());
assert!(future2.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future1);
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn set_with_reentrant_waker_does_not_alias() {
use crate::test_helpers::ReentrantWakerData;
let event = LocalAutoResetEvent::boxed();
let event_clone = event.clone();
let waker_data = ReentrantWakerData::new(move || {
event_clone.set();
});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
#[test]
fn drop_forwarding_with_reentrant_waker_does_not_alias() {
use crate::test_helpers::ReentrantWakerData;
let event = LocalAutoResetEvent::boxed();
let event_clone = event.clone();
let mut future1 = Box::pin(event.wait());
let noop_waker = Waker::noop();
let mut noop_cx = task::Context::from_waker(noop_waker);
assert!(future1.as_mut().poll(&mut noop_cx).is_pending());
let waker_data = ReentrantWakerData::new(move || {
event_clone.set();
});
let waker = unsafe { waker_data.waker() };
let mut reentrant_cx = task::Context::from_waker(&waker);
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut reentrant_cx).is_pending());
event.set();
drop(future1);
assert!(waker_data.was_woken());
}
#[test]
fn embedded_set_with_reentrant_waker_does_not_alias() {
use crate::test_helpers::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let waker_data = ReentrantWakerData::new(move || {
event.set();
});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
#[test]
fn embedded_drop_forwarding_with_reentrant_waker_does_not_alias() {
use crate::test_helpers::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future1 = Box::pin(event.wait());
let noop_waker = Waker::noop();
let mut noop_cx = task::Context::from_waker(noop_waker);
assert!(future1.as_mut().poll(&mut noop_cx).is_pending());
let waker_data = ReentrantWakerData::new(move || {
event.set();
});
let waker = unsafe { waker_data.waker() };
let mut reentrant_cx = task::Context::from_waker(&waker);
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut reentrant_cx).is_pending());
event.set();
drop(future1);
assert!(waker_data.was_woken());
}
#[test]
fn embedded_set_wakes_registered_waiter() {
use crate::test_helpers::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let waker_data = ReentrantWakerData::new(|| {});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
const WAITER_COUNT: usize = 100;
#[test]
fn many_sets_release_all_waiters() {
let event = LocalAutoResetEvent::boxed();
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut futures: Vec<_> = iter::repeat_with(|| Box::pin(event.wait()))
.take(WAITER_COUNT)
.collect();
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_pending());
}
for f in &mut futures {
event.set();
assert!(f.as_mut().poll(&mut cx).is_ready());
}
assert!(!event.try_wait());
}
#[test]
fn embedded_many_sets_release_all_waiters() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut futures: Vec<_> = iter::repeat_with(|| Box::pin(event.wait()))
.take(WAITER_COUNT)
.collect();
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_pending());
}
for f in &mut futures {
event.set();
assert!(f.as_mut().poll(&mut cx).is_ready());
}
assert!(!event.try_wait());
}
#[test]
fn many_sets_without_waiters_coalesce() {
let event = LocalAutoResetEvent::boxed();
for _ in 0..WAITER_COUNT {
event.set();
}
assert!(event.try_wait());
assert!(!event.try_wait());
}
}