#[cfg(not(feature = "log"))]
use crate::log;
#[cfg(feature = "time")]
use crate::time::Timing;
use crate::{
actor::{Actor, Future, Handle},
context::Context,
reactor::{inc_poll_budget, pending_polled},
};
use alloc::boxed::Box;
use core::{
any::Any,
pin::Pin,
task::{Context as CoreContext, Poll},
};
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum BlockingState {
Continue,
Abort,
}
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum BlockerState {
Created,
Started,
Running,
Aborted,
Finished,
}
pub trait Blocking<A>
where
A: Actor,
{
fn started(&mut self, _: &mut Context<A>) {}
fn state(&mut self, _: &mut Context<A>) -> BlockingState {
BlockingState::Continue
}
fn aborted(&mut self, _: &mut Context<A>) {}
fn finished(&mut self, _: &mut Context<A>) {}
}
pub struct Blocker<A> {
inner: Pin<Box<dyn Future<A, Output = ()>>>,
handle: Handle,
state: BlockerState,
}
impl<A: Actor> Blocker<A> {
pub fn new<F>(fut: F) -> Self
where
F: Future<A, Output = ()> + 'static,
{
Self {
inner: Box::pin(fut),
handle: Handle::new(),
state: BlockerState::Created,
}
}
pub fn handle(&self) -> Handle {
self.handle
}
#[allow(dead_code)]
pub fn state(&self) -> BlockerState {
self.state
}
#[allow(dead_code)]
pub fn set_state(&mut self, state: BlockerState) {
self.state = state;
}
#[cfg(feature = "time")]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
pub fn from_duration<T: Timing + 'static>(dur: core::time::Duration) -> Self {
let deadline = T::now() + dur;
let inner = move |_: &mut A, _: &mut Context<A>, _: &mut CoreContext<'_>| {
if T::now() > deadline {
Poll::Ready(())
} else {
Poll::Pending
}
};
Self {
inner: Box::pin(inner),
handle: Handle::new(),
state: BlockerState::Created,
}
}
}
impl<A> Future<A> for Blocker<A>
where
A: Actor + Blocking<A>,
{
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
act: &mut A,
ctx: &mut Context<A>,
cx: &mut CoreContext<'_>,
) -> Poll<()> {
match self.state {
BlockerState::Created => {
let state = <A as Blocking<A>>::state(act, ctx);
if self.state != BlockerState::Created {
log::debug!("Blocker state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
match state {
BlockingState::Abort => {
self.state = BlockerState::Aborted;
log::debug!("Blocker Aborted when Created");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
_ => {}
}
self.state = BlockerState::Started;
log::debug!("Blocker has successfully Started");
cx.waker().wake_by_ref();
inc_poll_budget(2);
Poll::Pending
}
BlockerState::Started => {
let state = <A as Blocking<A>>::state(act, ctx);
if self.state != BlockerState::Started {
log::debug!("Blocker state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
match state {
BlockingState::Abort => {
self.state = BlockerState::Aborted;
log::debug!("Blocker Aborted when Started");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
_ => {}
}
self.state = BlockerState::Running;
log::debug!("Blocker is Running");
<A as Blocking<A>>::started(act, ctx);
if self.state != BlockerState::Started {
log::debug!("Blocker state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
cx.waker().wake_by_ref();
inc_poll_budget(2);
Poll::Pending
}
BlockerState::Running => {
let state = <A as Blocking<A>>::state(act, ctx);
if self.state != BlockerState::Running {
log::debug!("Blocker state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
match state {
BlockingState::Continue => match self.inner.as_mut().poll(act, ctx, cx) {
Poll::Pending => {
pending_polled();
Poll::Pending
}
Poll::Ready(_) => {
self.state = BlockerState::Finished;
cx.waker().wake_by_ref();
inc_poll_budget(2);
Poll::Pending
}
},
BlockingState::Abort => {
self.state = BlockerState::Aborted;
cx.waker().wake_by_ref();
inc_poll_budget(2);
Poll::Pending
}
}
}
BlockerState::Aborted => {
ctx.abort_future(self.handle);
<A as Blocking<A>>::aborted(act, ctx);
if self.state != BlockerState::Aborted {
log::debug!("Blocker state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
log::debug!("Blocker is successfully Aborted");
Poll::Ready(())
}
BlockerState::Finished => {
<A as Blocking<A>>::finished(act, ctx);
if self.state != BlockerState::Finished {
log::debug!("Blocker state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
log::debug!("Blcoker is successfully Finished");
Poll::Ready(())
}
}
}
fn downcast_ref(&self) -> Option<&dyn Any> {
Some(self)
}
fn downcast_mut(self: Pin<&mut Self>) -> Option<Pin<&mut dyn Any>> {
Some(self)
}
}