use crate::stream::{CloneableStreamable, StreamMessage};
use actix::prelude::*;
use std::fmt::Debug;
#[derive(Debug)]
pub(crate) struct Merge2Actor<T>
where
T: CloneableStreamable,
{
actor_id: usize, downstream: Recipient<StreamMessage<T>>,
active_upstreams: usize,
downstream_signaled_end: bool,
}
impl<T> Merge2Actor<T>
where
T: CloneableStreamable,
{
pub(crate) fn new(downstream: Recipient<StreamMessage<T>>) -> Self {
static ACTOR_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
let actor_id = ACTOR_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
log::debug!("[Merge2Actor-{}] Creating new instance.", actor_id);
Merge2Actor {
actor_id,
downstream,
active_upstreams: 2, downstream_signaled_end: false,
}
}
fn try_send_end_to_downstream_and_stop(&mut self, ctx: &mut Context<Self>) {
if !self.downstream_signaled_end {
log::debug!("[Merge2Actor-{}] try_send_end_to_downstream_and_stop: Signaling End to downstream.", self.actor_id);
if self.downstream.try_send(StreamMessage::End).is_err() {
log::warn!("[Merge2Actor-{}] try_send_end_to_downstream_and_stop: Failed to send End to downstream (already closed).", self.actor_id);
}
self.downstream_signaled_end = true;
} else {
log::trace!("[Merge2Actor-{}] try_send_end_to_downstream_and_stop: Downstream already signaled end.", self.actor_id);
}
if ctx.state() != ActorState::Stopping && ctx.state() != ActorState::Stopped {
log::debug!("[Merge2Actor-{}] try_send_end_to_downstream_and_stop: Stopping actor.", self.actor_id);
ctx.stop();
}
}
}
impl<T> Actor for Merge2Actor<T>
where
T: CloneableStreamable,
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
log::debug!("[Merge2Actor-{}] Actor started. Expecting 2 upstreams. Active upstreams: {}", self.actor_id, self.active_upstreams);
if self.active_upstreams == 0 { log::warn!("[Merge2Actor-{}] Actor started with 0 active upstreams. Stopping.", self.actor_id);
self.try_send_end_to_downstream_and_stop(_ctx);
}
}
fn stopping(&mut self, _ctx: &mut Context<Self>) -> Running {
log::debug!("[Merge2Actor-{}] Actor stopping. Active upstreams: {}, Downstream signaled end: {}", self.actor_id, self.active_upstreams, self.downstream_signaled_end);
self.try_send_end_to_downstream_and_stop(_ctx);
Running::Stop
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
log::debug!("[Merge2Actor-{}] Actor stopped.", self.actor_id);
}
}
impl<T> Handler<StreamMessage<T>> for Merge2Actor<T>
where
T: CloneableStreamable,
{
type Result = ();
fn handle(&mut self, msg: StreamMessage<T>, ctx: &mut Context<Self>) {
if self.downstream_signaled_end { log::trace!("[Merge2Actor-{}] Downstream already ended. Ignoring message: {:?}", self.actor_id, msg);
return;
}
match msg {
StreamMessage::Element(item) => {
log::trace!("[Merge2Actor-{}] Received element: {:?}. Forwarding to downstream.", self.actor_id, item);
if self.downstream.try_send(StreamMessage::Element(item.clone())).is_err() { log::warn!("[Merge2Actor-{}] Downstream recipient closed while forwarding element {:?}. Stopping.", self.actor_id, item);
self.try_send_end_to_downstream_and_stop(ctx);
}
}
StreamMessage::End => {
log::debug!("[Merge2Actor-{}] Received End from one upstream. Active upstreams before decrement: {}.", self.actor_id, self.active_upstreams);
self.active_upstreams = self.active_upstreams.saturating_sub(1);
log::debug!("[Merge2Actor-{}] Active upstreams after decrement: {}.", self.actor_id, self.active_upstreams);
if self.active_upstreams == 0 {
log::debug!("[Merge2Actor-{}] Both upstreams ended. Signaling End downstream and stopping.", self.actor_id);
self.try_send_end_to_downstream_and_stop(ctx);
}
}
}
}
}