use super::epoch::EpochEvent;
#[cfg(feature = "metrics")]
use metrics::{gauge, register_gauge};
use crate::{
application::conf::logger::ArconLogger,
data::StateID,
stream::{node::source::SourceEvent, time::ArconTime},
};
use arcon_state::Backend;
use kompact::{component::AbstractComponent, prelude::*};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum SourceManagerEvent {
End,
}
pub struct SourceManagerPort;
impl Port for SourceManagerPort {
type Indication = Never;
type Request = SourceManagerEvent;
}
#[derive(ComponentDefinition)]
pub(crate) struct SourceManager<B: Backend> {
ctx: ComponentContext<Self>,
manager_port: ProvidedPort<SourceManagerPort>,
arcon_time: ArconTime,
watermark_interval: u64,
watermark_timeout: Option<ScheduledTimer>,
state_id: StateID,
pub(crate) sources: Vec<Arc<dyn AbstractComponent<Message = SourceEvent>>>,
pub source_refs: Vec<ActorRefStrong<SourceEvent>>,
_backend: Arc<B>,
epoch_manager: ActorRefStrong<EpochEvent>,
logger: ArconLogger,
}
impl<B: Backend> SourceManager<B> {
pub fn new(
state_id: StateID,
arcon_time: ArconTime,
watermark_interval: u64,
epoch_manager: ActorRefStrong<EpochEvent>,
backend: Arc<B>,
logger: ArconLogger,
) -> Self {
#[cfg(feature = "metrics")]
register_gauge!("sources", "source_manager" => state_id.clone() );
Self {
ctx: ComponentContext::uninitialised(),
manager_port: ProvidedPort::uninitialised(),
arcon_time,
watermark_interval,
watermark_timeout: None,
state_id,
sources: Vec::new(),
source_refs: Vec::new(),
_backend: backend,
epoch_manager,
logger,
}
}
pub(crate) fn add_source(&mut self, source: Arc<dyn AbstractComponent<Message = SourceEvent>>) {
let source_ref = source.actor_ref().hold().expect("failed to fetch ref");
self.sources.push(source);
self.source_refs.push(source_ref);
}
fn handle_watermark_timeout(&mut self, timeout_id: ScheduledTimer) -> Handled {
match self.watermark_timeout {
Some(ref timeout) if *timeout == timeout_id => {
for source in &self.sources {
source
.actor_ref()
.tell(SourceEvent::Watermark(self.arcon_time));
}
Handled::Ok
}
Some(_) => Handled::Ok, None => {
warn!(self.log(), "Got unexpected timeout: {:?}", timeout_id);
Handled::Ok
} }
}
}
impl<B: Backend> ComponentLifecycle for SourceManager<B> {
fn on_start(&mut self) -> Handled {
info!(self.logger, "Started SourceManager for {}", self.state_id,);
Handled::Ok
}
fn on_stop(&mut self) -> Handled {
if let Some(timeout) = self.watermark_timeout.take() {
self.cancel_timer(timeout);
}
Handled::Ok
}
}
impl<B: Backend> Actor for SourceManager<B> {
type Message = SourceEvent;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
if SourceEvent::Start == msg {
#[cfg(feature = "metrics")]
gauge!("sources",self.sources.len() as f64,"source_manager" => self.state_id.clone());
let duration = std::time::Duration::from_millis(self.watermark_interval);
let timeout =
self.schedule_periodic(duration, duration, Self::handle_watermark_timeout);
self.watermark_timeout = Some(timeout);
}
for source_ref in &self.source_refs {
source_ref.tell(msg.clone());
}
Handled::Ok
}
fn receive_network(&mut self, _: NetMessage) -> Handled {
unimplemented!();
}
}
impl<B> Provide<SourceManagerPort> for SourceManager<B>
where
B: Backend,
{
fn handle(&mut self, event: SourceManagerEvent) -> Handled {
match event {
SourceManagerEvent::End => {
self.epoch_manager.tell(EpochEvent::Halt);
}
}
Handled::Ok
}
}