use futures::{Async, Poll, Stream};
use std::marker::PhantomData;
use actor::{Actor, ActorContext, ActorState, AsyncContext, Running, SpawnHandle};
use fut::ActorFuture;
#[allow(unused_variables)]
pub trait StreamHandler<I, E>
where
Self: Actor,
{
fn handle(&mut self, item: I, ctx: &mut Self::Context);
fn started(&mut self, ctx: &mut Self::Context) {}
fn error(&mut self, err: E, ctx: &mut Self::Context) -> Running {
Running::Stop
}
fn finished(&mut self, ctx: &mut Self::Context) {
ctx.stop()
}
fn add_stream<S>(fut: S, ctx: &mut Self::Context) -> SpawnHandle
where
Self::Context: AsyncContext<Self>,
S: Stream<Item = I, Error = E> + 'static,
I: 'static,
E: 'static,
{
if ctx.state() == ActorState::Stopped {
error!("Context::add_stream called for stopped actor.");
SpawnHandle::default()
} else {
ctx.spawn(ActorStream::new(fut))
}
}
}
pub(crate) struct ActorStream<A, M, E, S> {
stream: S,
started: bool,
act: PhantomData<A>,
msg: PhantomData<M>,
error: PhantomData<E>,
}
impl<A, M, E, S> ActorStream<A, M, E, S> {
pub fn new(fut: S) -> Self {
ActorStream {
stream: fut,
started: false,
act: PhantomData,
msg: PhantomData,
error: PhantomData,
}
}
}
impl<A, M, E, S> ActorFuture for ActorStream<A, M, E, S>
where
S: Stream<Item = M, Error = E>,
A: Actor + StreamHandler<M, E>,
A::Context: AsyncContext<A>,
{
type Item = ();
type Error = ();
type Actor = A;
fn poll(
&mut self, act: &mut A, ctx: &mut A::Context
) -> Poll<Self::Item, Self::Error> {
if !self.started {
self.started = true;
<A as StreamHandler<M, E>>::started(act, ctx);
}
loop {
match self.stream.poll() {
Ok(Async::Ready(Some(msg))) => {
A::handle(act, msg, ctx);
if ctx.waiting() {
return Ok(Async::NotReady);
}
}
Err(err) => {
if A::error(act, err, ctx) == Running::Stop {
A::finished(act, ctx);
return Ok(Async::Ready(()));
}
}
Ok(Async::Ready(None)) => {
A::finished(act, ctx);
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
}
}
}
}