use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::{ready, stream::Stream};
use log::error;
use pin_project_lite::pin_project;
use crate::actor::{Actor, ActorContext, ActorState, AsyncContext, SpawnHandle};
use crate::fut::ActorFuture;
#[allow(unused_variables)]
pub trait StreamHandler<I>
where
Self: Actor,
{
fn handle(&mut self, item: I, ctx: &mut Self::Context);
fn started(&mut self, ctx: &mut Self::Context) {}
fn finished(&mut self, ctx: &mut Self::Context) {
ctx.stop()
}
fn add_stream<S>(stream: S, ctx: &mut Self::Context) -> SpawnHandle
where
S: Stream + 'static,
Self: StreamHandler<S::Item>,
Self::Context: AsyncContext<Self>,
{
if ctx.state() == ActorState::Stopped {
error!("Context::add_stream called for stopped actor.");
SpawnHandle::default()
} else {
ctx.spawn(ActorStream::new(stream))
}
}
}
pin_project! {
pub(crate) struct ActorStream<S> {
#[pin]
stream: S,
started: bool,
}
}
impl<S> ActorStream<S> {
pub fn new(fut: S) -> Self {
Self {
stream: fut,
started: false,
}
}
}
impl<A, S> ActorFuture<A> for ActorStream<S>
where
S: Stream,
A: Actor + StreamHandler<S::Item>,
A::Context: AsyncContext<A>,
{
type Output = ();
fn poll(
self: Pin<&mut Self>,
act: &mut A,
ctx: &mut A::Context,
task: &mut Context<'_>,
) -> Poll<Self::Output> {
let mut this = self.project();
if !*this.started {
*this.started = true;
<A as StreamHandler<S::Item>>::started(act, ctx);
}
let mut polled = 0;
while let Some(msg) = ready!(this.stream.as_mut().poll_next(task)) {
A::handle(act, msg, ctx);
polled += 1;
if ctx.waiting() {
return Poll::Pending;
} else if polled == 16 {
task.waker().wake_by_ref();
return Poll::Pending;
}
}
A::finished(act, ctx);
Poll::Ready(())
}
}