use crate::{
application::conf::logger::ArconLogger,
data::{ArconMessage, Epoch, NodeID, StateID, Watermark},
dataflow::dfg::GlobalNodeId,
error::*,
index::EMPTY_STATE_ID,
manager::snapshot::{Snapshot, SnapshotEvent, SnapshotManagerPort},
prelude::OperatorBuilder,
stream::operator::Operator,
};
#[cfg(feature = "metrics")]
use metrics::{gauge, register_gauge, register_histogram};
use arcon_state::Backend;
use fxhash::FxHashMap;
use kompact::{component::AbstractComponent, prelude::*};
use std::{collections::HashMap, sync::Arc};
pub type AbstractNode<IN> = (
Arc<dyn AbstractComponent<Message = ArconMessage<IN>>>,
RequiredRef<NodeManagerPort>,
);
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub enum NodeManagerEvent {
Watermark(NodeID, Watermark),
Epoch(NodeID, Epoch),
Checkpoint(NodeID, SnapshotEvent),
}
pub struct NodeManagerPort {}
impl Port for NodeManagerPort {
type Indication = ();
type Request = NodeManagerEvent;
}
pub struct NodeManagerState {
watermarks: HashMap<NodeID, Watermark>,
epochs: HashMap<NodeID, Epoch>,
}
impl NodeManagerState {
fn new() -> Self {
Self {
watermarks: HashMap::new(),
epochs: HashMap::new(),
}
}
}
#[allow(dead_code)]
#[derive(ComponentDefinition)]
pub struct NodeManager<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
ctx: ComponentContext<Self>,
pub(crate) state_id: StateID,
pub(crate) manager_port: ProvidedPort<NodeManagerPort>,
pub(crate) snapshot_manager_port: RequiredPort<SnapshotManagerPort>,
data_system: KompactSystem,
node_parallelism: usize,
max_node_parallelism: usize,
in_channels: Vec<NodeID>,
node_index: u32,
pub(crate) nodes: FxHashMap<GlobalNodeId, AbstractNode<OP::IN>>,
manager_state: NodeManagerState,
latest_snapshot: Option<Snapshot>,
builder: Arc<OperatorBuilder<OP, B>>,
logger: ArconLogger,
}
impl<OP, B> NodeManager<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
pub fn new(
state_id: String,
data_system: KompactSystem,
in_channels: Vec<NodeID>,
logger: ArconLogger,
builder: Arc<OperatorBuilder<OP, B>>,
) -> Self {
#[cfg(feature = "metrics")]
{
register_gauge!("nodes", "node_manager" => state_id.clone());
register_histogram!("checkpoint_execution_time_ms", "node_manager" => state_id.clone());
register_gauge!("last_checkpoint_size", "node_manager"=> state_id.clone());
}
NodeManager {
ctx: ComponentContext::uninitialised(),
state_id,
manager_port: ProvidedPort::uninitialised(),
snapshot_manager_port: RequiredPort::uninitialised(),
data_system,
node_parallelism: num_cpus::get(),
max_node_parallelism: (num_cpus::get() * 2) as usize,
node_index: 0,
in_channels,
nodes: FxHashMap::<GlobalNodeId, AbstractNode<OP::IN>>::default(),
manager_state: NodeManagerState::new(),
latest_snapshot: None,
logger,
builder,
}
}
#[inline]
fn has_snapshot_state(&self) -> bool {
self.state_id != EMPTY_STATE_ID
}
fn handle_node_event(&mut self, event: NodeManagerEvent) -> ArconResult<()> {
match event {
NodeManagerEvent::Watermark(id, w) => {
self.manager_state.watermarks.insert(id, w);
}
NodeManagerEvent::Epoch(id, e) => {
self.manager_state.epochs.insert(id, e);
}
NodeManagerEvent::Checkpoint(id, s) => {
debug!(self.logger, "Reporting Checkpoint from Node ID {:?}", id);
self.snapshot_manager_port.trigger(s);
}
}
Ok(())
}
}
impl<OP, B> ComponentLifecycle for NodeManager<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
fn on_start(&mut self) -> Handled {
info!(self.logger, "Started NodeManager for {}", self.state_id,);
#[cfg(feature = "metrics")]
gauge!("nodes", self.nodes.len() as f64 ,"node_manager" => self.state_id.clone());
if self.has_snapshot_state() {
self.snapshot_manager_port
.trigger(SnapshotEvent::Register(self.state_id.clone()));
}
Handled::Ok
}
}
impl<OP, B> Require<SnapshotManagerPort> for NodeManager<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
fn handle(&mut self, _: Never) -> Handled {
unreachable!("can't be instantiated!");
}
}
impl<OP, B> Provide<SnapshotManagerPort> for NodeManager<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
fn handle(&mut self, _: SnapshotEvent) -> Handled {
Handled::Ok
}
}
impl<OP, B> Provide<NodeManagerPort> for NodeManager<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
fn handle(&mut self, event: NodeManagerEvent) -> Handled {
if let Err(err) = self.handle_node_event(event) {
error!(
self.logger,
"Failed to handle NodeManagerEvent {:?}",
err.to_string()
);
}
Handled::Ok
}
}
impl<OP, B> Actor for NodeManager<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
type Message = Never;
fn receive_local(&mut self, _: Self::Message) -> Handled {
Handled::Ok
}
fn receive_network(&mut self, _: NetMessage) -> Handled {
unreachable!();
}
}