use crate::stream::{StreamMessage, Streamable, CloneableStreamable};
use actix::prelude::*;
use actix_rt; use tokio::sync::Semaphore; use futures::future::BoxFuture;
use std::marker::PhantomData;
use std::sync::Arc;
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub(crate) struct ParMapResultMessage<Item: CloneableStreamable> {
item: Item,
}
#[derive(Debug)]
pub(crate) struct ParMapUnorderedActor<In, Out, F>
where
In: Streamable,
Out: CloneableStreamable,
F: Fn(In) -> BoxFuture<'static, Out> + Send + Sync + 'static + Clone + Unpin,
{
map_fn: F,
downstream: Recipient<StreamMessage<Out>>,
#[allow(dead_code)]
parallelism: usize,
semaphore: Arc<Semaphore>, in_flight_tasks: usize,
upstream_ended: bool,
_phantom_in: PhantomData<In>,
}
impl<In, Out, F> ParMapUnorderedActor<In, Out, F>
where
In: Streamable,
Out: CloneableStreamable,
F: Fn(In) -> BoxFuture<'static, Out> + Send + Sync + 'static + Clone + Unpin,
{
pub(crate) fn new(map_fn: F, downstream: Recipient<StreamMessage<Out>>, parallelism: usize) -> Self {
let effective_parallelism = parallelism.max(1);
ParMapUnorderedActor {
map_fn,
downstream,
parallelism: effective_parallelism,
semaphore: Arc::new(Semaphore::new(effective_parallelism)), in_flight_tasks: 0,
upstream_ended: false,
_phantom_in: PhantomData,
}
}
fn check_completion_and_stop(&mut self, ctx: &mut Context<Self>) {
if self.upstream_ended && self.in_flight_tasks == 0 {
let _ = self.downstream.try_send(StreamMessage::End);
ctx.stop();
}
}
}
impl<In, Out, F> Actor for ParMapUnorderedActor<In, Out, F>
where
In: Streamable,
Out: CloneableStreamable,
F: Fn(In) -> BoxFuture<'static, Out> + Send + Sync + 'static + Clone + Unpin,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {}
fn stopped(&mut self, _ctx: &mut Context<Self>) {
if !(self.upstream_ended && self.in_flight_tasks == 0) {
let _ = self.downstream.try_send(StreamMessage::End);
}
}
}
impl<In, Out, F> Handler<StreamMessage<In>> for ParMapUnorderedActor<In, Out, F>
where
In: Streamable,
Out: CloneableStreamable,
F: Fn(In) -> BoxFuture<'static, Out> + Send + Sync + 'static + Clone + Unpin,
{
type Result = ();
fn handle(&mut self, msg: StreamMessage<In>, ctx: &mut Context<Self>) {
match msg {
StreamMessage::Element(elem) => {
self.in_flight_tasks += 1;
let fut_map_fn = self.map_fn.clone();
let semaphore_clone = self.semaphore.clone();
let self_addr = ctx.address();
actix_rt::spawn(async move { match semaphore_clone.acquire().await { Ok(_permit) => {
let mapped_item = fut_map_fn(elem).await;
self_addr.do_send(ParMapResultMessage { item: mapped_item });
}
Err(_e) => {
}
}
});
}
StreamMessage::End => {
self.upstream_ended = true;
self.check_completion_and_stop(ctx);
}
}
}
}
impl<In, Out, F> Handler<ParMapResultMessage<Out>> for ParMapUnorderedActor<In, Out, F>
where
In: Streamable,
Out: CloneableStreamable,
F: Fn(In) -> BoxFuture<'static, Out> + Send + Sync + 'static + Clone + Unpin,
{
type Result = ();
fn handle(&mut self, msg: ParMapResultMessage<Out>, ctx: &mut Context<Self>) {
self.in_flight_tasks -= 1;
if self.downstream.try_send(StreamMessage::Element(msg.item)).is_err() {
self.upstream_ended = true;
ctx.stop();
return;
}
self.check_completion_and_stop(ctx);
}
}