#[cfg(not(feature = "log"))]
use crate::log;
use crate::{
actor::{Actor, ActorState, Future, Handle},
context::Context,
reactor::{inc_poll_budget, pending_polled},
};
use core::{
any::Any,
marker::PhantomData,
pin::Pin,
task::{Context as CoreContext, Poll},
};
use futures_core::stream;
use pin_project_lite::pin_project;
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum StreamingState {
Abort,
Continue,
Pause,
Resume,
}
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum StreamState {
Created,
Started,
Running,
Aborted,
Paused,
Resumed,
Finished,
}
pub trait Stream<Item>
where
Self: Actor,
{
fn started(&mut self, _: &mut Context<Self>) {}
fn action(&mut self, msg: Item, ctx: &mut Context<Self>);
fn state(&mut self, _: &mut Context<Self>) -> StreamingState {
StreamingState::Continue
}
fn spawn_stream<S>(&mut self, ctx: &mut Context<Self>, stream: S) -> Handle
where
S: stream::Stream + 'static,
Self: Stream<S::Item>,
{
if ctx.state() == ActorState::Stopped {
log::error!("Actor Stopped and Unable to add a stream");
Handle::default()
} else {
ctx.spawn(Streaming::new(stream))
}
}
fn aborted(&mut self, _: &mut Context<Self>) {}
fn paused(&mut self, _: &mut Context<Self>) {}
fn resumed(&mut self, _: &mut Context<Self>) {}
fn finished(&mut self, _: &mut Context<Self>) {}
}
pin_project! {
pub struct Streaming<A: Actor, S: stream::Stream>
{
#[pin]
stream: S,
state: StreamState,
handle: Handle,
_marker: PhantomData::<A>,
}
}
impl<A: Actor, S: stream::Stream> Streaming<A, S> {
pub fn new(stream: S) -> Self {
Self {
stream,
state: StreamState::Created,
handle: Handle::new(),
_marker: PhantomData::<A>,
}
}
pub fn handle(&self) -> Handle {
self.handle
}
pub fn state(&self) -> StreamState {
self.state
}
pub fn set_state(&mut self, state: StreamState) {
self.state = state;
}
}
impl<A, S> Future<A> for Streaming<A, S>
where
S: stream::Stream + 'static,
A: Actor + Stream<S::Item>,
{
type Output = ();
fn poll(
self: Pin<&mut Self>,
act: &mut A,
ctx: &mut Context<A>,
cx: &mut CoreContext<'_>,
) -> Poll<Self::Output> {
let mut this = self.project();
match *this.state {
StreamState::Created => {
let state = <A as Stream<S::Item>>::state(act, ctx);
if *this.state != StreamState::Created {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
match state {
StreamingState::Abort => {
*this.state = StreamState::Aborted;
log::debug!("Stream is Aborting");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
StreamingState::Pause => {
*this.state = StreamState::Paused;
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
_ => {}
}
*this.state = StreamState::Started;
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
StreamState::Started => {
let state = <A as Stream<S::Item>>::state(act, ctx);
if *this.state != StreamState::Started {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
match state {
StreamingState::Abort => {
*this.state = StreamState::Aborted;
log::debug!("Stream is Aborting");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
StreamingState::Pause => {
*this.state = StreamState::Paused;
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
_ => {}
}
<A as Stream<S::Item>>::started(act, ctx);
if *this.state != StreamState::Started {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
log::debug!("Stream has successfully started");
*this.state = StreamState::Running;
log::debug!("Actor Streaming");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
StreamState::Running => {
let state = <A as Stream<S::Item>>::state(act, ctx);
if *this.state != StreamState::Running {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
match state {
StreamingState::Abort => {
*this.state = StreamState::Aborted;
log::debug!("Stream is Aborting");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
StreamingState::Pause => {
*this.state = StreamState::Paused;
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
_ => {}
}
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(msg)) => {
<A as Stream<S::Item>>::action(act, msg, ctx);
if *this.state != StreamState::Running {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
Poll::Pending
}
Poll::Ready(None) => {
*this.state = StreamState::Finished;
cx.waker().wake_by_ref();
inc_poll_budget(2);
Poll::Pending
}
Poll::Pending => {
pending_polled();
Poll::Pending
}
}
}
StreamState::Aborted => {
ctx.abort_future(*this.handle);
<A as Stream<S::Item>>::aborted(act, ctx);
if *this.state != StreamState::Aborted {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
log::debug!("Stream is successfully Aborted");
Poll::Ready(())
}
StreamState::Paused => {
<A as Stream<S::Item>>::paused(act, ctx);
log::debug!("Stream is successfully Paused");
let state = <A as Stream<S::Item>>::state(act, ctx);
if *this.state != StreamState::Paused {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
if let StreamingState::Resume = state {
*this.state = StreamState::Resumed;
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
Poll::Pending
}
StreamState::Resumed => {
<A as Stream<S::Item>>::resumed(act, ctx);
if *this.state != StreamState::Resumed {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
log::debug!("Stream is successfully Resumed");
*this.state = StreamState::Running;
cx.waker().wake_by_ref();
inc_poll_budget(2);
Poll::Pending
}
StreamState::Finished => {
<A as Stream<S::Item>>::finished(act, ctx);
if *this.state != StreamState::Finished {
log::debug!("Stream state changed");
cx.waker().wake_by_ref();
inc_poll_budget(2);
return Poll::Pending;
}
log::debug!("Stream successfully finished");
Poll::Ready(())
}
}
}
fn downcast_ref(&self) -> Option<&dyn Any> {
Some(self)
}
fn downcast_mut(self: Pin<&mut Self>) -> Option<Pin<&mut dyn Any>> {
Some(self)
}
}