use crate::stream::{StreamMessage, Streamable, CloneableStreamable};
use actix::prelude::*; use std::marker::PhantomData;
#[derive(Debug)] pub(crate) struct MappingActor<In, Out, F>
where
In: Streamable, Out: CloneableStreamable, F: FnMut(In) -> Out + Send + 'static + Clone + Unpin, {
map_fn: F,
downstream: Recipient<StreamMessage<Out>>,
_phantom_in: PhantomData<In>, }
impl<In, Out, F> MappingActor<In, Out, F>
where
In: Streamable,
Out: CloneableStreamable,
F: FnMut(In) -> Out + Send + 'static + Clone + Unpin,
{
pub(crate) fn new(map_fn: F, downstream: Recipient<StreamMessage<Out>>) -> Self {
MappingActor {
map_fn,
downstream,
_phantom_in: PhantomData,
}
}
}
impl<In, Out, F> Actor for MappingActor<In, Out, F>
where
In: Streamable,
Out: CloneableStreamable,
F: FnMut(In) -> Out + Send + 'static + Clone + Unpin,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
let _ = self.downstream.try_send(StreamMessage::End);
}
}
impl<In, Out, F> Handler<StreamMessage<In>> for MappingActor<In, Out, F>
where
In: Streamable, Out: CloneableStreamable, F: FnMut(In) -> Out + Send + 'static + Clone + Unpin,
{
type Result = ();
fn handle(&mut self, msg: StreamMessage<In>, ctx: &mut Context<Self>) {
if ctx.state() == ActorState::Stopping || ctx.state() == ActorState::Stopped {
return;
}
match msg {
StreamMessage::Element(elem) => {
let mapped_elem = (self.map_fn)(elem); if self.downstream.try_send(StreamMessage::Element(mapped_elem)).is_err() {
if ctx.state() != ActorState::Stopping && ctx.state() != ActorState::Stopped {
ctx.stop(); }
}
}
StreamMessage::End => {
if ctx.state() != ActorState::Stopping && ctx.state() != ActorState::Stopped {
ctx.stop(); }
}
}
}
}