use crate::stream::{CloneableStreamable, StreamMessage, Streamable};
use actix::prelude::*;
#[derive(Debug)]
pub(crate) struct DropWhileActor<Out, P>
where
Out: Streamable,
P: FnMut(&Out) -> bool + Send + 'static + Unpin,
{
predicate: P,
downstream: Recipient<StreamMessage<Out>>,
is_dropping: bool,
}
impl<Out, P> DropWhileActor<Out, P>
where
Out: Streamable,
P: FnMut(&Out) -> bool + Send + 'static + Unpin,
{
pub(crate) fn new(predicate: P, downstream: Recipient<StreamMessage<Out>>) -> Self {
Self {
predicate,
downstream,
is_dropping: true,
}
}
}
impl<Out, P> Actor for DropWhileActor<Out, P>
where
Out: Streamable,
P: FnMut(&Out) -> bool + Send + 'static + Unpin,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
let _ = self.downstream.try_send(StreamMessage::End);
}
}
impl<Out, P> Handler<StreamMessage<Out>> for DropWhileActor<Out, P>
where
Out: CloneableStreamable,
P: FnMut(&Out) -> bool + Send + 'static + Unpin,
{
type Result = ();
fn handle(&mut self, msg: StreamMessage<Out>, ctx: &mut Context<Self>) {
if ctx.state() == ActorState::Stopping || ctx.state() == ActorState::Stopped {
return;
}
match msg {
StreamMessage::Element(item) => {
if self.is_dropping {
if !(self.predicate)(&item) {
self.is_dropping = false;
if self
.downstream
.try_send(StreamMessage::Element(item.clone()))
.is_err()
{
if ctx.state() != ActorState::Stopping
&& ctx.state() != ActorState::Stopped
{
ctx.stop();
}
}
}
} else {
if self
.downstream
.try_send(StreamMessage::Element(item))
.is_err()
{
if ctx.state() != ActorState::Stopping && ctx.state() != ActorState::Stopped
{
ctx.stop();
}
}
}
}
StreamMessage::End => {
if ctx.state() != ActorState::Stopping && ctx.state() != ActorState::Stopped {
ctx.stop();
}
}
}
}
}