pub mod common;
pub mod debug;
pub mod source;
pub(crate) mod timer;
#[cfg(feature = "metrics")]
use metrics::{
gauge, histogram, increment_counter, register_counter, register_gauge, register_histogram,
};
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
use perf_event::{Builder, Group};
use crate::application::conf::logger::ArconLogger;
use crate::{
data::{flight_serde::reliable_remote::ReliableSerde, RawArconMessage, *},
dataflow::builder::KeyBuilder,
dataflow::dfg::GlobalNodeId,
error::{ArconResult, *},
index::{AppenderIndex, ArconState, EagerAppender, IndexOps},
manager::epoch::EpochEvent,
manager::node::*,
manager::snapshot::{Snapshot, SnapshotEvent},
reportable_error,
stream::{
channel::strategy::ChannelStrategy,
operator::{Operator, OperatorContext},
},
};
use arcon_macros::ArconState;
use arcon_state::Backend;
use fxhash::*;
use kompact::prelude::*;
use std::{
cell::{RefCell, UnsafeCell},
sync::Arc,
};
use self::timer::Timer;
pub type NodeDescriptor = String;
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
use crate::metrics::perf_event::PerfEvents;
#[cfg(feature = "metrics")]
use crate::metrics::runtime_metrics::NodeMetrics;
#[cfg(feature = "metrics")]
use std::time::Instant;
#[derive(ArconState)]
pub struct NodeState<OP: Operator + 'static, B: Backend> {
message_buffer: EagerAppender<RawArconMessage<OP::IN>, B>,
#[ephemeral]
watermarks: FxHashMap<NodeID, Watermark>,
#[ephemeral]
blocked_channels: FxHashSet<NodeID>,
#[ephemeral]
current_watermark: Watermark,
#[ephemeral]
current_epoch: Epoch,
#[ephemeral]
in_channels: Vec<NodeID>,
#[ephemeral]
id: NodeID,
}
impl<OP: Operator + 'static, B: Backend> NodeState<OP, B> {
pub fn new(id: NodeID, in_channels: Vec<NodeID>, backend: Arc<B>) -> Self {
let message_buffer = EagerAppender::new("_messagebuffer", backend);
let mut watermarks: FxHashMap<NodeID, Watermark> = FxHashMap::default();
for sender in &in_channels {
watermarks.insert(*sender, Watermark::new(0));
}
Self {
message_buffer,
watermarks,
blocked_channels: FxHashSet::default(),
current_watermark: Watermark::new(0),
current_epoch: Epoch::new(0),
in_channels,
id,
}
}
}
#[derive(ComponentDefinition)]
pub struct Node<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
ctx: ComponentContext<Self>,
pub(crate) node_manager_port: RequiredPort<NodeManagerPort>,
descriptor: NodeDescriptor,
channel_strategy: UnsafeCell<ChannelStrategy<OP::OUT>>,
operator: OP,
operator_context: RefCell<OperatorContext<OP::TimerState, OP::OperatorState>>,
node_state: NodeState<OP, B>,
backend: Arc<B>,
logger: ArconLogger,
epoch_manager: ActorRefStrong<EpochEvent>,
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
perf_events: PerfEvents,
#[cfg(feature = "metrics")]
node_metrics: NodeMetrics,
pub node_id: GlobalNodeId,
in_key_builder: Option<KeyBuilder<OP::IN>>,
}
impl<OP, B> Node<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
descriptor: NodeDescriptor,
channel_strategy: ChannelStrategy<OP::OUT>,
operator: OP,
operator_state: OP::OperatorState,
node_state: NodeState<OP, B>,
backend: Arc<B>,
logger: ArconLogger,
epoch_manager: ActorRefStrong<EpochEvent>,
#[cfg(all(feature = "hardware_counters", target_os = "linux"))]
#[cfg(not(test))]
perf_events: PerfEvents,
node_id: GlobalNodeId,
in_key_builder: Option<KeyBuilder<OP::IN>>,
) -> Self {
let timer_id = format!("_{}_timer", descriptor);
let timer = Timer::new(timer_id, backend.clone());
let operator_context = OperatorContext::new(
Box::new(timer),
operator_state,
logger.clone(),
#[cfg(feature = "metrics")]
descriptor.clone(),
);
#[cfg(feature = "metrics")]
{
register_gauge!("inbound_throughput", "node" => descriptor.clone());
register_gauge!("last_watermark_timestamp", "node" => descriptor.clone());
register_counter!("epoch_counter", "node" => descriptor.clone());
register_counter!("watermark_counter", "node" => descriptor.clone());
register_histogram!("batch_execution_time","execution time per events batch","node" => descriptor.clone());
}
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
{
for value in perf_events.counters.iter() {
register_histogram!(value.to_string(),"node" => descriptor.clone());
}
}
Node {
ctx: ComponentContext::uninitialised(),
node_manager_port: RequiredPort::uninitialised(),
descriptor,
channel_strategy: UnsafeCell::new(channel_strategy),
operator,
operator_context: RefCell::new(operator_context),
node_state,
backend,
logger,
epoch_manager,
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
perf_events,
#[cfg(feature = "metrics")]
node_metrics: NodeMetrics::new(),
node_id,
in_key_builder,
}
}
#[inline]
fn handle_message(&mut self, message: MessageContainer<OP::IN>) -> ArconResult<()> {
#[cfg(feature = "metrics")]
self.node_metrics
.inbound_throughput
.mark_n(message.total_events());
if !self.node_state.in_channels.contains(message.sender()) {
error!(
self.logger,
"Message from invalid sender id {:?}",
message.sender()
);
return Ok(());
}
if self.sender_blocked(message.sender()) {
self.node_state.message_buffer().append(message.raw())?;
return Ok(());
}
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
let (mut group, counters) = {
let mut group = Group::new()?;
let mut counters = Vec::with_capacity(self.perf_events.counters.len());
for hardware_counter in self.perf_events.counters.iter() {
let counter = Builder::new()
.group(&mut group)
.kind(hardware_counter.get_hardware_kind())
.build()?;
counters.push((hardware_counter.to_string(), counter));
}
(group, counters)
};
#[cfg(feature = "metrics")]
let start_time = Instant::now();
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
group.enable()?;
match message {
MessageContainer::Raw(r) => self.handle_events(r.sender, r.events)?,
MessageContainer::Local(l) => self.handle_events(l.sender, l.events)?,
}
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
group.disable()?;
#[cfg(feature = "metrics")]
{
let elapsed = start_time.elapsed();
histogram!("batch_execution_time", elapsed.as_micros() as f64,"node" => self.descriptor.clone());
}
#[cfg(feature = "metrics")]
gauge!("inbound_throughput", self.node_metrics.inbound_throughput.get_one_min_rate(), "node" => self.descriptor.clone());
#[cfg(all(feature = "hardware_counters", target_os = "linux", not(test)))]
{
let counts = group.read()?;
for (metric_name, counter) in counters.iter() {
histogram!(String::from(metric_name), counts[counter] as f64, "node" => self.descriptor.clone());
}
}
Ok(())
}
#[inline(always)]
fn sender_blocked(&mut self, sender: &NodeID) -> bool {
self.node_state.blocked_channels().contains(sender)
}
#[inline]
fn handle_events<I>(&mut self, sender: NodeID, events: I) -> ArconResult<()>
where
I: IntoIterator<Item = ArconEventWrapper<OP::IN>>,
{
'event_loop: for event in events.into_iter() {
match event.unwrap() {
ArconEvent::Element(e) => {
let watermark = match self.node_state.watermarks().get(&sender) {
Some(wm) => wm,
None => return reportable_error!("Uninitialised watermark"),
};
if e.timestamp <= watermark.timestamp {
continue 'event_loop;
}
self.handle_element(e)?;
}
ArconEvent::Watermark(w) => {
self.handle_watermark(w, sender)?;
}
ArconEvent::Epoch(e) => {
self.handle_epoch(e, sender)?;
}
ArconEvent::Death(s) => {
self.add_outgoing_event(ArconEvent::Death(s))?;
self.ctx.suicide(); }
}
}
Ok(())
}
fn get_in_key(&self, e: &OP::IN) -> u64 {
if let Some(key_builder) = &self.in_key_builder {
key_builder.get_key(e)
} else {
0
}
}
#[inline(always)]
fn handle_element(&mut self, e: ArconElement<OP::IN>) -> ArconResult<()> {
self.set_context(self.get_in_key(&e.data));
for elem in self
.operator
.handle_element(e, &mut self.operator_context.borrow_mut())?
{
self.add_outgoing_event(ArconEvent::Element(elem))?;
}
Ok(())
}
#[inline]
fn set_context(&mut self, key: u64) {
let mut context = self.operator_context.borrow_mut();
context.current_key = key;
}
#[inline]
fn handle_watermark(&mut self, w: Watermark, sender: NodeID) -> ArconResult<()> {
let watermark = match self.node_state.watermarks().get(&sender) {
Some(wm) => wm,
None => return reportable_error!("Uninitialised watermark"),
};
if w <= *watermark {
return Ok(());
}
if let Some(old) = self.node_state.watermarks().insert(sender, w) {
if old > self.node_state.current_watermark {
return Ok(());
}
}
if w <= self.node_state.current_watermark {
return Ok(());
}
let new_watermark = *self.node_state.watermarks().values().min().unwrap();
if new_watermark.timestamp > self.node_state.current_watermark.timestamp {
#[cfg(feature = "metrics")]
gauge!("last_watermark_timestamp", new_watermark.timestamp as f64, "node" => self.descriptor.clone());
self.node_state.current_watermark = new_watermark;
let timeouts = self
.operator_context
.borrow_mut()
.timer
.advance_to(new_watermark.timestamp)?;
for timer_entry in timeouts {
self.set_context(timer_entry.key());
if let Some(elems) = self
.operator
.handle_timeout(timer_entry.value(), &mut self.operator_context.borrow_mut())?
{
for elem in elems {
self.add_outgoing_event(ArconEvent::Element(elem))?;
}
}
}
#[cfg(feature = "metrics")]
increment_counter!("watermark_counter", "node" => self.descriptor.clone());
self.add_outgoing_event(ArconEvent::Watermark(new_watermark))?;
}
Ok(())
}
fn handle_epoch(&mut self, e: Epoch, sender: NodeID) -> ArconResult<()> {
debug!(self.logger, "Got Epoch {:?}", e);
if e < self.node_state.current_epoch {
return Ok(());
}
self.node_state.blocked_channels().insert(sender);
if self.node_state.blocked_channels().len() == self.node_state.in_channels.len() {
self.add_outgoing_event(ArconEvent::Epoch(self.node_state.current_epoch))?;
self.node_state.persist()?;
self.operator_context.borrow_mut().state.persist()?;
self.checkpoint()?;
self.epoch_manager.tell(EpochEvent::Ack(
self.descriptor.clone(),
self.node_state.current_epoch,
));
self.node_state.current_epoch.epoch += 1;
#[cfg(feature = "metrics")]
increment_counter!("epoch_counter", "node" => self.descriptor.clone());
self.node_state.blocked_channels().clear();
for message in self.node_state.message_buffer().consume()? {
self.handle_events(message.sender, message.events)?;
}
}
Ok(())
}
#[inline]
fn add_outgoing_event(&self, event: ArconEvent<OP::OUT>) -> ArconResult<()> {
let strategy = unsafe { &mut *self.channel_strategy.get() };
common::add_outgoing_event(event, strategy, self)
}
fn checkpoint(&mut self) -> ArconResult<()> {
if let Some(base_dir) = &self.ctx.config()["checkpoint_dir"].as_string() {
let checkpoint_dir = format!(
"{}/{}/checkpoint_{id}_{epoch}",
base_dir,
self.descriptor,
id = self.descriptor,
epoch = self.node_state.current_epoch.epoch,
);
#[cfg(feature = "metrics")]
let start_time = Instant::now();
self.backend.checkpoint(checkpoint_dir.as_ref())?;
#[cfg(feature = "metrics")]
{
let elapsed = start_time.elapsed();
histogram!("checkpoint_execution_time_ms", elapsed.as_millis() as f64, "node" => self.descriptor.clone());
}
#[cfg(feature = "metrics")]
{
let metadata = std::fs::metadata(checkpoint_dir.clone())?;
gauge!("last_checkpoint_size", metadata.len() as f64, "node" => self.descriptor.clone());
}
let snapshot = Snapshot::new(
std::any::type_name::<B>().to_string(),
self.node_state.current_epoch.epoch,
checkpoint_dir.clone(),
);
self.node_manager_port.trigger(NodeManagerEvent::Checkpoint(
self.node_state.id,
SnapshotEvent::Snapshot(self.descriptor.clone(), snapshot),
));
debug!(
self.logger,
"Completed a Checkpoint to path {}", checkpoint_dir
);
} else {
return reportable_error!("Failed to fetch checkpoint_dir from Config");
}
Ok(())
}
}
impl<OP, B> ComponentLifecycle for Node<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
fn on_start(&mut self) -> Handled {
debug!(
self.logger,
"Started Arcon Node {} with Node ID {:?}", self.descriptor, self.node_state.id
);
match self.ctx.config()["checkpoint_dir"].as_string() {
Some(base_dir) => {
let checkpoint_dir = format!("{}/{}", base_dir, self.descriptor,);
std::fs::create_dir_all(checkpoint_dir).unwrap();
}
None => {
error!(self.logger, "Failed to locate checkpoint_dir config");
}
}
self.epoch_manager
.tell(EpochEvent::Register(self.descriptor.clone()));
if self
.operator
.on_start(&mut self.operator_context.borrow_mut())
.is_err()
{
error!(self.logger, "Failed to run startup code");
}
Handled::Ok
}
}
impl<OP, B> Require<NodeManagerPort> for Node<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
fn handle(&mut self, _: ()) -> Handled {
Handled::Ok
}
}
impl<OP, B> Provide<NodeManagerPort> for Node<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
fn handle(&mut self, e: NodeManagerEvent) -> Handled {
trace!(self.logger, "Ignoring node event: {:?}", e);
Handled::Ok
}
}
impl<OP, B> Actor for Node<OP, B>
where
OP: Operator + 'static,
B: Backend,
{
type Message = ArconMessage<OP::IN>;
fn receive_local(&mut self, msg: Self::Message) -> Handled {
if let Err(err) = self.handle_message(MessageContainer::Local(msg)) {
error!(self.logger, "Failed to handle message: {}", err);
}
Handled::Ok
}
fn receive_network(&mut self, msg: NetMessage) -> Handled {
let arcon_msg = match *msg.ser_id() {
id if id == OP::IN::RELIABLE_SER_ID => msg
.try_deserialise::<RawArconMessage<OP::IN>, ReliableSerde<OP::IN>>()
.map_err(|e| Error::Unsupported {
msg: format!("Failed to unpack reliable ArconMessage with err {:?}", e),
}),
id => reportable_error!("Unexpected deserialiser with id {}", id),
};
match arcon_msg {
Ok(m) => {
if let Err(err) = self.handle_message(MessageContainer::Raw(m)) {
error!(self.logger, "Failed to handle node message: {}", err);
}
}
Err(e) => error!(self.logger, "Error ArconNetworkMessage: {:?}", e),
}
Handled::Ok
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(all(feature = "hardware_counters", target_os = "linux"))]
#[cfg(not(test))]
use crate::metrics::perf_event::HardwareCounter;
use crate::{
application::Application,
dataflow::builder::OperatorBuilder,
index::EmptyState,
stream::{
channel::{strategy::forward::Forward, Channel},
node::debug::DebugNode,
operator::function::Filter,
},
};
use std::{sync::Arc, thread, time};
fn node_test_setup() -> (ActorRef<ArconMessage<i32>>, Arc<Component<DebugNode<i32>>>) {
fn filter_fn(x: &i32) -> bool {
*x >= 0
}
let builder = OperatorBuilder::<_> {
operator: Arc::new(|| Filter::new(&filter_fn)),
state: Arc::new(|_backend| EmptyState),
conf: Default::default(),
};
fn setup<OP: Operator<IN = i32, OUT = i32> + 'static, B: Backend>(
builder: OperatorBuilder<OP, B>,
) -> (ActorRef<ArconMessage<i32>>, Arc<Component<DebugNode<i32>>>) {
let app = Application::default();
let pool_info = app.get_pool_info();
let epoch_manager_ref = app.epoch_manager();
let sink = app.data_system().create(DebugNode::<i32>::new);
app.data_system()
.start_notify(&sink)
.wait_timeout(std::time::Duration::from_millis(1000))
.expect("started");
let actor_ref: ActorRefStrong<ArconMessage<i32>> =
sink.actor_ref().hold().expect("Failed to fetch");
let channel = Channel::Local(actor_ref);
let channel_strategy: ChannelStrategy<i32> =
ChannelStrategy::Forward(Forward::new(channel, NodeID::new(0), pool_info));
let backend = Arc::new(crate::test_utils::temp_backend::<B>());
let descriptor = String::from("node_");
let in_channels = vec![1.into(), 2.into(), 3.into()];
let operator = builder.operator.clone();
let operator_state = builder.state.clone();
#[cfg(not(test))]
let mut perf_events = PerfEvents::new();
let nm = NodeManager::<OP, B>::new(
descriptor.clone(),
app.data_system().clone(),
in_channels.clone(),
app.arcon_logger.clone(),
Arc::new(builder),
);
let node_manager_comp = app.ctrl_system().create(|| nm);
app.ctrl_system()
.start_notify(&node_manager_comp)
.wait_timeout(std::time::Duration::from_millis(1000))
.expect("started");
let node = Node::<OP, _>::new(
descriptor,
channel_strategy,
operator(),
operator_state(backend.clone()),
NodeState::new(NodeID::new(0), in_channels, backend.clone()),
backend,
app.arcon_logger.clone(),
epoch_manager_ref,
#[cfg(not(test))]
perf_events,
GlobalNodeId::null(),
None,
);
let filter_comp = app.data_system().create(|| node);
let required_ref = filter_comp.on_definition(|cd| cd.node_manager_port.share());
biconnect_components::<NodeManagerPort, _, _>(&node_manager_comp, &filter_comp)
.expect("connection");
app.data_system()
.start_notify(&filter_comp)
.wait_timeout(std::time::Duration::from_millis(1000))
.expect("started");
let filter_ref = filter_comp.actor_ref();
node_manager_comp.on_definition(|cd| {
cd.nodes
.insert(GlobalNodeId::null(), (filter_comp, required_ref));
});
(filter_ref, sink)
}
setup(builder)
}
fn watermark(time: u64, sender: u32) -> ArconMessage<i32> {
ArconMessage::watermark(time, sender.into())
}
fn element(data: i32, time: u64, sender: u32) -> ArconMessage<i32> {
ArconMessage::element(data, time, sender.into())
}
fn epoch(epoch: u64, sender: u32) -> ArconMessage<i32> {
ArconMessage::epoch(epoch, sender.into())
}
fn death(sender: u32) -> ArconMessage<i32> {
ArconMessage::death(String::from("die"), sender.into())
}
fn wait(time: u64) {
thread::sleep(time::Duration::from_secs(time));
}
#[test]
fn node_no_watermark() {
let (node_ref, sink) = node_test_setup();
node_ref.tell(watermark(1, 1));
wait(1);
sink.on_definition(|cd| {
let data_len = cd.data.len();
let watermark_len = cd.watermarks.len();
assert_eq!(watermark_len, 0);
assert_eq!(data_len, 0);
});
}
#[test]
fn node_one_watermark() {
let (node_ref, sink) = node_test_setup();
node_ref.tell(watermark(1, 1));
node_ref.tell(watermark(1, 2));
node_ref.tell(watermark(1, 3));
wait(1);
sink.on_definition(|cd| {
let data_len = cd.data.len();
let watermark_len = cd.watermarks.len();
assert_eq!(watermark_len, 1);
assert_eq!(data_len, 0);
});
}
#[test]
fn node_outoforder_watermarks() {
let (node_ref, sink) = node_test_setup();
node_ref.tell(watermark(1, 1));
node_ref.tell(watermark(3, 1));
node_ref.tell(watermark(1, 2));
node_ref.tell(watermark(2, 2));
node_ref.tell(watermark(4, 3));
wait(1);
sink.on_definition(|cd| {
let watermark_len = cd.watermarks.len();
assert_eq!(watermark_len, 1);
assert_eq!(cd.watermarks[0].timestamp, 2u64);
});
}
#[test]
fn node_epoch_block() {
let (node_ref, sink) = node_test_setup();
node_ref.tell(element(1, 1, 1));
node_ref.tell(epoch(3, 1));
node_ref.tell(element(2, 1, 1));
node_ref.tell(element(3, 1, 2));
node_ref.tell(death(2));
wait(1);
sink.on_definition(|cd| {
let data_len = cd.data.len();
let epoch_len = cd.epochs.len();
assert_eq!(epoch_len, 0);
assert_eq!(cd.data[0].data, 1i32);
assert_eq!(cd.data[1].data, 3i32);
assert_eq!(data_len, 2);
});
}
#[test]
fn node_epoch_no_continue() {
let (node_ref, sink) = node_test_setup();
node_ref.tell(element(11, 1, 1)); node_ref.tell(epoch(1, 1)); node_ref.tell(element(12, 1, 1)); node_ref.tell(element(21, 1, 2)); node_ref.tell(epoch(2, 1)); node_ref.tell(epoch(1, 2)); node_ref.tell(epoch(2, 2)); node_ref.tell(element(23, 1, 2)); node_ref.tell(element(31, 1, 3));
node_ref.tell(death(3)); wait(1);
sink.on_definition(|cd| {
let data_len = cd.data.len();
let epoch_len = cd.epochs.len();
assert_eq!(epoch_len, 0); assert_eq!(cd.data[0].data, 11i32);
assert_eq!(cd.data[1].data, 21i32);
assert_eq!(cd.data[2].data, 31i32);
assert_eq!(data_len, 3);
});
}
#[test]
fn node_epoch_continue() {
let (node_ref, sink) = node_test_setup();
node_ref.tell(element(11, 1, 1)); node_ref.tell(epoch(1, 1)); node_ref.tell(element(12, 1, 1)); node_ref.tell(element(21, 1, 2)); node_ref.tell(epoch(2, 1)); node_ref.tell(element(13, 1, 1)); node_ref.tell(epoch(1, 2)); node_ref.tell(epoch(2, 2)); node_ref.tell(element(22, 1, 2)); node_ref.tell(element(31, 1, 3)); node_ref.tell(epoch(1, 3)); node_ref.tell(epoch(2, 3));
wait(3);
node_ref.tell(death(3)); wait(3);
sink.on_definition(|cd| {
let data_len = cd.data.len();
let epoch_len = cd.epochs.len();
assert_eq!(epoch_len, 2); assert_eq!(cd.data[0].data, 11i32);
assert_eq!(cd.data[1].data, 21i32);
assert_eq!(cd.data[2].data, 31i32);
assert_eq!(cd.data[3].data, 12i32); assert_eq!(cd.data[4].data, 13i32); assert_eq!(cd.data[5].data, 22i32); assert_eq!(data_len, 6);
});
}
}