use std::any::type_name;
use std::cell::UnsafeCell;
use std::fmt;
use std::future::Future;
use std::marker::{PhantomData, PhantomPinned};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::ptr::NonNull;
use std::rc::Rc;
use std::task::{self, Poll, Waker};
use awaiter_set::{Awaiter, AwaiterSet};
#[derive(Clone)]
pub struct LocalAutoResetEvent {
inner: Rc<Inner>,
}
enum InnerState {
Unset(AwaiterSet),
Set,
}
struct Inner {
state: UnsafeCell<InnerState>,
_not_send: PhantomData<*const ()>,
}
impl UnwindSafe for Inner {}
impl RefUnwindSafe for Inner {}
impl Inner {
fn set(&self) {
let state_ptr = self.state.get();
let waker = {
let state = unsafe { &mut *state_ptr };
match state {
InnerState::Set => None,
InnerState::Unset(waiters) => {
if let Some(w) = waiters.notify_one() {
Some(w)
} else {
*state = InnerState::Set;
None
}
}
}
};
if let Some(w) = waker {
w.wake();
}
}
fn try_wait(&self) -> bool {
let state = unsafe { &mut *self.state.get() };
if matches!(state, InnerState::Set) {
*state = InnerState::Unset(AwaiterSet::new());
true
} else {
false
}
}
unsafe fn poll_wait(&self, mut awaiter: Pin<&mut Awaiter>, waker: Waker) -> Poll<()> {
if awaiter.as_ref().take_notification() {
return Poll::Ready(());
}
let state = unsafe { &mut *self.state.get() };
match state {
InnerState::Set => {
debug_assert!(
!awaiter.is_registered(),
"Set state is exclusive with registered waiters"
);
*state = InnerState::Unset(AwaiterSet::new());
Poll::Ready(())
}
InnerState::Unset(waiters) => {
unsafe {
waiters.register(awaiter.as_mut(), waker);
}
Poll::Pending
}
}
}
unsafe fn drop_wait(&self, mut awaiter: Pin<&mut Awaiter>) {
if !awaiter.is_registered() {
return;
}
if awaiter.as_ref().is_notified() {
let state = unsafe { &mut *self.state.get() };
let waker = match state {
InnerState::Unset(waiters) => match waiters.notify_one() {
Some(w) => Some(w),
None => {
*state = InnerState::Set;
None
}
},
InnerState::Set => None,
};
if let Some(w) = waker {
w.wake();
}
} else {
let state = unsafe { &mut *self.state.get() };
match state {
InnerState::Unset(waiters) => {
unsafe {
waiters.unregister(awaiter.as_mut());
}
}
InnerState::Set => {
debug_assert!(false, "registered non-notified node requires Unset state");
}
}
}
}
}
impl LocalAutoResetEvent {
#[must_use]
pub fn boxed() -> Self {
Self {
inner: Rc::new(Inner {
state: UnsafeCell::new(InnerState::Unset(AwaiterSet::new())),
_not_send: PhantomData,
}),
}
}
#[must_use]
pub unsafe fn embedded(
place: Pin<&EmbeddedLocalAutoResetEvent>,
) -> EmbeddedLocalAutoResetEventRef {
let inner = NonNull::from(&place.get_ref().inner);
EmbeddedLocalAutoResetEventRef { inner }
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn set(&self) {
self.inner.set();
}
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn try_wait(&self) -> bool {
self.inner.try_wait()
}
#[must_use]
pub fn wait(&self) -> LocalAutoResetWaitFuture {
LocalAutoResetWaitFuture {
inner: Rc::clone(&self.inner),
awaiter: Awaiter::new(),
}
}
}
pub struct LocalAutoResetWaitFuture {
inner: Rc<Inner>,
awaiter: Awaiter,
}
impl UnwindSafe for LocalAutoResetWaitFuture {}
impl RefUnwindSafe for LocalAutoResetWaitFuture {}
impl Future for LocalAutoResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
let awaiter = unsafe { Pin::new_unchecked(&mut this.awaiter) };
unsafe { this.inner.poll_wait(awaiter, waker) }
}
}
impl Drop for LocalAutoResetWaitFuture {
fn drop(&mut self) {
let awaiter = unsafe { Pin::new_unchecked(&mut self.awaiter) };
unsafe { self.inner.drop_wait(awaiter) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for LocalAutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for LocalAutoResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.finish_non_exhaustive()
}
}
pub struct EmbeddedLocalAutoResetEvent {
inner: Inner,
_pinned: PhantomPinned,
}
impl EmbeddedLocalAutoResetEvent {
#[must_use]
pub fn new() -> Self {
Self {
inner: Inner {
state: UnsafeCell::new(InnerState::Unset(AwaiterSet::new())),
_not_send: PhantomData,
},
_pinned: PhantomPinned,
}
}
}
impl Default for EmbeddedLocalAutoResetEvent {
#[cfg_attr(coverage_nightly, coverage(off))] fn default() -> Self {
Self::new()
}
}
impl UnwindSafe for EmbeddedLocalAutoResetEvent {}
impl RefUnwindSafe for EmbeddedLocalAutoResetEvent {}
#[derive(Clone, Copy)]
pub struct EmbeddedLocalAutoResetEventRef {
inner: NonNull<Inner>,
}
impl UnwindSafe for EmbeddedLocalAutoResetEventRef {}
impl RefUnwindSafe for EmbeddedLocalAutoResetEventRef {}
impl EmbeddedLocalAutoResetEventRef {
fn inner(&self) -> &Inner {
unsafe { self.inner.as_ref() }
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn set(&self) {
self.inner().set();
}
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn try_wait(&self) -> bool {
self.inner().try_wait()
}
#[must_use]
pub fn wait(&self) -> EmbeddedLocalAutoResetWaitFuture {
EmbeddedLocalAutoResetWaitFuture {
inner: self.inner,
awaiter: Awaiter::new(),
}
}
}
pub struct EmbeddedLocalAutoResetWaitFuture {
inner: NonNull<Inner>,
awaiter: Awaiter,
}
impl UnwindSafe for EmbeddedLocalAutoResetWaitFuture {}
impl RefUnwindSafe for EmbeddedLocalAutoResetWaitFuture {}
impl Future for EmbeddedLocalAutoResetWaitFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<()> {
let waker = cx.waker().clone();
let this = unsafe { self.get_unchecked_mut() };
let inner = unsafe { this.inner.as_ref() };
let awaiter = unsafe { Pin::new_unchecked(&mut this.awaiter) };
unsafe { inner.poll_wait(awaiter, waker) }
}
}
impl Drop for EmbeddedLocalAutoResetWaitFuture {
fn drop(&mut self) {
let inner = unsafe { self.inner.as_ref() };
let awaiter = unsafe { Pin::new_unchecked(&mut self.awaiter) };
unsafe { inner.drop_wait(awaiter) }
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedLocalAutoResetEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedLocalAutoResetEventRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
impl fmt::Debug for EmbeddedLocalAutoResetWaitFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(type_name::<Self>())
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use std::iter;
use std::task::Waker;
use static_assertions::{assert_impl_all, assert_not_impl_any};
use super::*;
assert_impl_all!(LocalAutoResetEvent: Clone, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(LocalAutoResetEvent: Send, Sync);
assert_impl_all!(LocalAutoResetWaitFuture: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(LocalAutoResetWaitFuture: Send, Sync, Unpin);
assert_impl_all!(EmbeddedLocalAutoResetEvent: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedLocalAutoResetEvent: Send, Sync, Unpin);
assert_impl_all!(EmbeddedLocalAutoResetEventRef: Clone, Copy, UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedLocalAutoResetEventRef: Send, Sync);
assert_impl_all!(EmbeddedLocalAutoResetWaitFuture: UnwindSafe, RefUnwindSafe);
assert_not_impl_any!(EmbeddedLocalAutoResetWaitFuture: Send, Sync, Unpin);
#[test]
fn starts_unset() {
let event = LocalAutoResetEvent::boxed();
assert!(!event.try_wait());
}
#[test]
fn set_then_try_wait() {
let event = LocalAutoResetEvent::boxed();
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn clone_shares_state() {
let a = LocalAutoResetEvent::boxed();
let b = a.clone();
a.set();
assert!(b.try_wait());
}
#[test]
fn wait_completes_when_already_set() {
futures::executor::block_on(async {
let event = LocalAutoResetEvent::boxed();
event.set();
event.wait().await;
assert!(!event.try_wait());
});
}
#[test]
fn wait_completes_after_set() {
futures::executor::block_on(async {
let event = LocalAutoResetEvent::boxed();
let future = event.wait();
event.set();
future.await;
});
}
#[test]
fn drop_future_while_waiting() {
futures::executor::block_on(async {
let event = LocalAutoResetEvent::boxed();
{
let _f = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn embedded_set_and_wait() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
event.set();
event.wait().await;
});
}
#[test]
fn embedded_clone_shares_state() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let a = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let b = a;
a.set();
assert!(b.try_wait());
}
#[test]
fn embedded_signal_consumed() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
event.set();
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn embedded_drop_future_while_waiting() {
futures::executor::block_on(async {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
{
let _future = event.wait();
}
event.set();
event.wait().await;
});
}
#[test]
fn wait_registers_then_completes() {
let event = LocalAutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn drop_registered_future() {
let event = LocalAutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
assert!(event.try_wait());
}
#[test]
fn notified_then_dropped_re_sets_event() {
let event = LocalAutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn dropping_notified_future_while_state_already_set_preserves_signal() {
let event = LocalAutoResetEvent::boxed();
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
event.set();
drop(future);
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn notified_then_dropped_forwards_to_next() {
let event = LocalAutoResetEvent::boxed();
let mut future1 = Box::pin(event.wait());
let mut future2 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future1.as_mut().poll(&mut cx).is_pending());
assert!(future2.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future1);
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_wait_registers_then_completes() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(future.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn embedded_drop_registered_future() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
drop(future);
event.set();
assert!(event.try_wait());
}
#[test]
fn embedded_notified_then_dropped_re_sets_event() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future);
assert!(event.try_wait());
}
#[test]
fn embedded_notified_then_dropped_forwards_to_next() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future1 = Box::pin(event.wait());
let mut future2 = Box::pin(event.wait());
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
assert!(future1.as_mut().poll(&mut cx).is_pending());
assert!(future2.as_mut().poll(&mut cx).is_pending());
event.set();
drop(future1);
assert!(future2.as_mut().poll(&mut cx).is_ready());
}
#[test]
fn set_with_reentrant_waker_does_not_alias() {
use testing::ReentrantWakerData;
let event = LocalAutoResetEvent::boxed();
let event_clone = event.clone();
let waker_data = ReentrantWakerData::new(move || {
event_clone.set();
});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
#[test]
fn drop_forwarding_with_reentrant_waker_does_not_alias() {
use testing::ReentrantWakerData;
let event = LocalAutoResetEvent::boxed();
let event_clone = event.clone();
let mut future1 = Box::pin(event.wait());
let noop_waker = Waker::noop();
let mut noop_cx = task::Context::from_waker(noop_waker);
assert!(future1.as_mut().poll(&mut noop_cx).is_pending());
let waker_data = ReentrantWakerData::new(move || {
event_clone.set();
});
let waker = unsafe { waker_data.waker() };
let mut reentrant_cx = task::Context::from_waker(&waker);
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut reentrant_cx).is_pending());
event.set();
drop(future1);
assert!(waker_data.was_woken());
}
#[test]
fn embedded_set_with_reentrant_waker_does_not_alias() {
use testing::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let waker_data = ReentrantWakerData::new(move || {
event.set();
});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
#[test]
fn embedded_drop_forwarding_with_reentrant_waker_does_not_alias() {
use testing::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let mut future1 = Box::pin(event.wait());
let noop_waker = Waker::noop();
let mut noop_cx = task::Context::from_waker(noop_waker);
assert!(future1.as_mut().poll(&mut noop_cx).is_pending());
let waker_data = ReentrantWakerData::new(move || {
event.set();
});
let waker = unsafe { waker_data.waker() };
let mut reentrant_cx = task::Context::from_waker(&waker);
let mut future2 = Box::pin(event.wait());
assert!(future2.as_mut().poll(&mut reentrant_cx).is_pending());
event.set();
drop(future1);
assert!(waker_data.was_woken());
}
#[test]
fn embedded_set_wakes_registered_waiter() {
use testing::ReentrantWakerData;
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let waker_data = ReentrantWakerData::new(|| {});
let waker = unsafe { waker_data.waker() };
let mut cx = task::Context::from_waker(&waker);
let mut future = Box::pin(event.wait());
assert!(future.as_mut().poll(&mut cx).is_pending());
event.set();
assert!(waker_data.was_woken());
}
const WAITER_COUNT: usize = 100;
#[test]
fn many_sets_release_all_waiters() {
let event = LocalAutoResetEvent::boxed();
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut futures: Vec<_> = iter::repeat_with(|| Box::pin(event.wait()))
.take(WAITER_COUNT)
.collect();
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_pending());
}
for _ in 0..WAITER_COUNT {
event.set();
}
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_ready());
}
assert!(!event.try_wait());
}
#[test]
fn embedded_many_sets_release_all_waiters() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::new());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
let waker = Waker::noop();
let mut cx = task::Context::from_waker(waker);
let mut futures: Vec<_> = iter::repeat_with(|| Box::pin(event.wait()))
.take(WAITER_COUNT)
.collect();
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_pending());
}
for _ in 0..WAITER_COUNT {
event.set();
}
for f in &mut futures {
assert!(f.as_mut().poll(&mut cx).is_ready());
}
assert!(!event.try_wait());
}
#[test]
fn many_sets_without_waiters_coalesce() {
let event = LocalAutoResetEvent::boxed();
for _ in 0..WAITER_COUNT {
event.set();
}
assert!(event.try_wait());
assert!(!event.try_wait());
}
#[test]
fn embedded_default_creates_unset_event() {
let container = Box::pin(EmbeddedLocalAutoResetEvent::default());
let event = unsafe { LocalAutoResetEvent::embedded(container.as_ref()) };
assert!(!event.try_wait());
}
}