use crate::{atomic::*, base::{lot::AsyncLotSignature, signal::{ValidityMarker, ValidityState}}, yielder::Yield};
use core::{
ops::ControlFlow,
pin::{Pin, pin},
task::{Context, Poll, Waker},
};
use core::future::Future;
use std::sync::Arc;
pub trait EventSetter<'a> {
type Waiter: Future<Output = ()> + 'a + Unpin;
fn new() -> Self;
fn new_set() -> Self;
fn wait(&'a self) -> Self::Waiter;
fn set_one(&self) -> bool;
fn set_all<F: FnMut()>(&self, functor: F);
fn try_wait(&self) -> bool;
fn has_waiters(&self) -> bool;
}
pub(crate) trait EventState {
fn new(state: u8) -> Self;
fn cmpxchng_weak(
&self,
current: u8,
new: u8,
success: Ordering,
failure: Ordering,
) -> Result<u8, u8>;
fn store(
&self,
value: u8,
ordering: Ordering
);
}
pub(crate) struct RawEvent<S, L>
{
state: S,
lot: L,
}
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RawEventAwaitState {
Init,
Waiting,
Completed,
}
const BACKOFF_MAX: usize = 3;
const SIGNAL_FREE: u8 = 0b00;
const SIGNAL_SET: u8 = 0b10;
pub(crate) struct EventWaker {
cell: Arc<AtomicPtr<Waker>>
}
impl Clone for EventWaker {
fn clone(&self) -> Self {
let ptr = self.cell.load(Acquire);
if !ptr.is_null() {
unsafe {
Arc::increment_strong_count(ptr.cast_const());
}
}
Self {
cell: Arc::clone(&self.cell)
}
}
}
impl Drop for EventWaker {
fn drop(&mut self) {
let ptr = self.cell.load(Acquire);
unsafe { Self::drop_old_ptr(ptr) };
}
}
impl EventWaker {
pub fn new(waker: &Waker) -> Self {
let allocated = Arc::into_raw(Arc::new(waker.to_owned()));
Self {
cell: Arc::new(AtomicPtr::new(allocated.cast_mut()))
}
}
pub fn wake_by_ref(&self) {
let ptr = self.cell.load(Acquire);
if !ptr.is_null() {
unsafe { (&*ptr).wake_by_ref(); }
}
}
pub fn is_null(&self) -> bool {
self.cell.load(Acquire).is_null()
}
pub fn set_null(&self) {
self.cell.store(std::ptr::null_mut(), Release);
}
pub unsafe fn drop_old_ptr(ptr: *mut Waker) {
if !ptr.is_null() {
let _ = unsafe { Arc::from_raw(ptr) };
}
}
pub fn check_and_swap(&self, other: &Waker) {
let ptr = self.cell.load(Acquire);
if !ptr.is_null() {
if !unsafe { (&*ptr).will_wake(other) } {
let new = Arc::into_raw(Arc::new(other.to_owned()));
match self.cell.compare_exchange(ptr, new.cast_mut(), AcqRel, Acquire) {
Ok(old) => {
unsafe { Self::drop_old_ptr(old) };
}
Err(_) => {
let _ = unsafe { Arc::from_raw(new) };
self.check_and_swap(other);
}
}
}
}
}
}
pub(crate) struct RawEventAwait<'a, S, L>
where
L: AsyncLotSignature,
{
event: &'a RawEvent<S, L>,
state: RawEventAwaitState,
backoff: usize,
yielder: Option<Yield>,
waker: Option<EventWaker>,
wait_link: Option<L::ValidityMarker>,
}
impl<'a, S, L> RawEventAwait<'a, S, L>
where
L: AsyncLotSignature,
S: EventState,
Self: Unpin
{
#[inline]
fn park_waker(&mut self, cx: &Context<'_>) -> L::ValidityMarker {
self.state = RawEventAwaitState::Waiting;
let event = EventWaker::new(cx.waker());
self.waker = Some(event.clone());
self.event.lot.park(&event)
}
#[inline]
fn backoff(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.backoff += 1;
if self.backoff <= BACKOFF_MAX {
core::hint::spin_loop();
return self.poll(cx);
} else {
self.yielder = Some(Yield::default());
return self.poll(cx);
}
}
#[inline]
fn perform_waker_swap(&mut self, cx: &mut Context<'_>) {
if self.waker.is_none() {
self.waker = Some(EventWaker::new(cx.waker()));
} else {
let Some(waker) = &mut self.waker else {
unreachable!()
};
waker.check_and_swap(cx.waker());
}
}
#[inline]
fn should_ready(&mut self) -> bool {
if self.state == RawEventAwaitState::Waiting {
match &self.wait_link {
Some(link) => match link.get() {
ValidityState::Idle => {
false
}
ValidityState::Killed => {
false
}
ValidityState::ShouldWake => {
true
}
},
None => {
false
}
}
} else {
false
}
}
#[inline]
fn try_yield(&mut self, cx: &mut Context<'_>) -> ControlFlow<()> {
if let Some(mut yielder) = self.yielder.take() {
match pin!(&mut yielder).poll(cx) {
Poll::Pending => {
self.yielder = Some(yielder);
ControlFlow::Continue(())
}
Poll::Ready(()) => {
ControlFlow::Break(())
}
}
} else {
ControlFlow::Break(())
}
}
}
impl<'a, S, L> Future for RawEventAwait<'a, S, L>
where
S: EventState,
L: AsyncLotSignature,
Self: Unpin
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.state == RawEventAwaitState::Completed {
return Poll::Ready(());
}
if let ControlFlow::Continue(()) = self.try_yield(cx) {
self.perform_waker_swap(cx);
return Poll::Pending;
}
match self
.event
.state
.cmpxchng_weak(SIGNAL_SET, SIGNAL_FREE, Acquire, Acquire)
{
Ok(_) => {
if self.should_ready() {
return release_complete(&mut *self);
} else if self.state == RawEventAwaitState::Waiting {
self.perform_waker_swap(cx);
return Poll::Pending;
}
if self.event.lot.unpark_one() {
self.wait_link = Some(self.park_waker(cx));
return Poll::Pending;
} else {
self.state = RawEventAwaitState::Completed;
return Poll::Ready(());
}
}
Err(x) => {
match x {
SIGNAL_FREE => {
if self.should_ready() {
return release_complete(&mut *self);
} else if self.state == RawEventAwaitState::Waiting {
self.perform_waker_swap(cx);
return Poll::Pending;
}
self.wait_link = Some(self.park_waker(cx));
return Poll::Pending;
}
SIGNAL_SET => {
return self.backoff(cx);
}
_ => {
unsafe { core::hint::unreachable_unchecked() };
}
}
}
}
}
}
impl<'a, S, L> EventSetter<'a> for RawEvent<S, L>
where
S: EventState ,
L: AsyncLotSignature,
Self: 'a
{
type Waiter = RawEventAwait<'a, S, L>;
#[inline]
fn new() -> Self {
Self {
lot: L::default(),
state: S::new(SIGNAL_FREE)
}
}
#[inline]
fn new_set() -> Self {
Self {
lot: L::default(),
state: S::new(SIGNAL_SET)
}
}
#[inline]
fn set_all<F: FnMut()>(&self, mut functor: F) {
let mut did_signal = false;
while self.lot.unpark_one() {
did_signal = true;
functor();
}
if !did_signal {
self.state.store(SIGNAL_SET, Release);
}
}
#[inline]
fn set_one(&self) -> bool {
if !self.lot.unpark_one() {
self.state.store(SIGNAL_SET, Release);
false
} else {
true
}
}
#[inline]
fn try_wait(&self) -> bool {
!self.has_waiters()
&& self.state.cmpxchng_weak(SIGNAL_SET, SIGNAL_FREE, Acquire, Relaxed)
.is_ok()
}
#[inline]
fn wait(&'a self) -> Self::Waiter {
RawEventAwait {
backoff: 0,
event: &self,
state: RawEventAwaitState::Init,
wait_link: None,
yielder: None,
waker: None
}
}
#[inline]
fn has_waiters(&self) -> bool {
self.lot.has_waiters()
}
}
#[inline(always)]
fn release_complete<S, L>(waiter: &mut RawEventAwait<'_, S, L>) -> Poll<()>
where
L: AsyncLotSignature
{
waiter.event.lot.notify_wait_release();
release_partial(waiter)
}
#[inline(always)]
fn release_partial<S, L>(
waiter: &mut RawEventAwait<'_, S, L>
) -> Poll<()>
where
L: AsyncLotSignature
{
waiter.state = RawEventAwaitState::Completed;
Poll::Ready(())
}
impl<'a, S, L> Drop for RawEventAwait<'a, S, L>
where
L: AsyncLotSignature
{
fn drop(&mut self) {
match self.state {
RawEventAwaitState::Init => {
}
RawEventAwaitState::Waiting => {
if let Some(waiter) = &self.wait_link {
waiter.set(ValidityState::Killed);
self.event.lot.notify_wait_release();
}
}
RawEventAwaitState::Completed => {
}
}
}
}
macro_rules! impl_async_event {
(
$(#[$($attrss:tt)*])*
name = $name:ident,
waitername = $waitername:ident,
lot = $lot:ident,
state = $state:ident
) => {
$(#[$($attrss)*])*
pub struct $name(crate::base::event::RawEvent<$state, $lot>);
#[pin_project::pin_project]
pub struct $waitername<'a>(#[pin] crate::base::event::RawEventAwait<'a, $state, $lot>);
impl<'a> Future for $waitername<'a> {
type Output = ();
fn poll(
self: core::pin::Pin<&mut Self>,
ctx: &mut core::task::Context<'_>
) -> core::task::Poll<()> {
self
.project()
.0
.poll(ctx)
}
}
impl<'a> crate::base::event::EventSetter<'a> for $name {
type Waiter = $waitername<'a>;
#[inline]
fn new() -> Self {
Self(crate::base::event::RawEvent::new())
}
#[inline]
fn new_set() -> Self {
Self(crate::base::event::RawEvent::new_set())
}
#[inline]
fn wait(&'a self) -> $waitername<'a> {
$waitername(crate::base::event::RawEvent::wait(&self.0))
}
#[inline]
fn try_wait(&self) -> bool {
crate::base::event::RawEvent::try_wait(&self.0)
}
#[inline]
fn set_one(&self) -> bool {
crate::base::event::RawEvent::set_one(&self.0)
}
#[inline]
fn set_all<F: FnMut()>(&self, functor: F) {
crate::base::event::RawEvent::set_all(&self.0, functor)
}
#[inline]
fn has_waiters(&self) -> bool {
crate::base::event::RawEvent::has_waiters(&self.0)
}
}
impl Default for $name {
fn default() -> Self {
<Self as crate::EventSetter>::new()
}
}
};
}
pub(crate) use impl_async_event;