use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::stream::Stream;
use log::error;
use pin_project::pin_project;
use crate::actor::{Actor, ActorContext, ActorState, AsyncContext, SpawnHandle};
use crate::fut::ActorFuture;
#[allow(unused_variables)]
pub trait StreamHandler<I>
where
Self: Actor,
{
fn handle(&mut self, item: I, ctx: &mut Self::Context);
fn started(&mut self, ctx: &mut Self::Context) {}
fn finished(&mut self, ctx: &mut Self::Context) {
ctx.stop()
}
fn add_stream<S>(fut: S, ctx: &mut Self::Context) -> SpawnHandle
where
Self::Context: AsyncContext<Self>,
S: Stream<Item = I> + 'static,
I: 'static,
{
if ctx.state() == ActorState::Stopped {
error!("Context::add_stream called for stopped actor.");
SpawnHandle::default()
} else {
ctx.spawn(ActorStream::new(fut))
}
}
}
#[pin_project]
pub(crate) struct ActorStream<A, M, S> {
#[pin]
stream: S,
started: bool,
act: PhantomData<A>,
msg: PhantomData<M>,
}
impl<A, M, S> ActorStream<A, M, S> {
pub fn new(fut: S) -> Self {
Self {
stream: fut,
started: false,
act: PhantomData,
msg: PhantomData,
}
}
}
impl<A, M, S> ActorFuture for ActorStream<A, M, S>
where
S: Stream<Item = M>,
A: Actor + StreamHandler<M>,
A::Context: AsyncContext<A>,
{
type Output = ();
type Actor = A;
fn poll(
self: Pin<&mut Self>,
act: &mut A,
ctx: &mut A::Context,
task: &mut Context<'_>,
) -> Poll<Self::Output> {
let mut this = self.project();
if !*this.started {
*this.started = true;
<A as StreamHandler<M>>::started(act, ctx);
}
match this.stream.as_mut().poll_next(task) {
Poll::Ready(Some(msg)) => {
A::handle(act, msg, ctx);
if !ctx.waiting() {
task.waker().wake_by_ref();
}
Poll::Pending
}
Poll::Ready(None) => {
A::finished(act, ctx);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
}