mod one_in_one_out_executor;
mod one_in_two_out_executor;
mod sink_executor;
mod source_executor;
mod two_in_one_out_executor;
pub use one_in_one_out_executor::*;
pub use one_in_two_out_executor::*;
pub use sink_executor::*;
pub use source_executor::*;
pub use two_in_one_out_executor::*;
use std::{cmp, collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration};
use futures_delay_queue::{delay_queue, DelayHandle, DelayQueue, Receiver};
use futures_intrusive::buffer::GrowingHeapBuf;
use serde::Deserialize;
use tokio::{
self,
sync::{broadcast, mpsc},
};
use crate::{
dataflow::{
context::SetupContext,
deadlines::{ConditionContext, DeadlineEvent, DeadlineId},
operator::OperatorConfig,
stream::StreamId,
Data, Message, ReadStream, Timestamp,
},
node::{
lattice::ExecutionLattice,
operator_event::OperatorEvent,
worker::{EventNotification, OperatorExecutorNotification, WorkerNotification},
},
OperatorId,
};
pub(crate) trait OperatorExecutorT: Send {
fn execute<'a>(
&'a mut self,
channel_from_worker: broadcast::Receiver<OperatorExecutorNotification>,
channel_to_worker: mpsc::UnboundedSender<WorkerNotification>,
channel_to_event_runners: broadcast::Sender<EventNotification>,
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>;
fn lattice(&self) -> Arc<ExecutionLattice>;
fn operator_id(&self) -> OperatorId;
}
pub trait OneInMessageProcessorT<S, T>: Send + Sync
where
T: Data + for<'a> Deserialize<'a>,
{
fn execute_setup(&mut self, read_stream: &mut ReadStream<T>) -> SetupContext<S>;
fn execute_run(&mut self, read_stream: &mut ReadStream<T>);
fn execute_destroy(&mut self);
fn message_cb_event(&mut self, msg: Arc<Message<T>>) -> OperatorEvent;
fn watermark_cb_event(&mut self, timestamp: &Timestamp) -> OperatorEvent;
fn arm_deadlines(
&self,
setup_context: &mut SetupContext<S>,
read_stream_ids: Vec<StreamId>,
condition_context: &ConditionContext,
timestamp: Timestamp,
) -> Vec<DeadlineEvent>;
fn disarm_deadline(&self, deadline_event: &DeadlineEvent) -> bool;
fn cleanup(&mut self) {}
fn invoke_handler(
&self,
setup_context: &mut SetupContext<S>,
deadline_id: DeadlineId,
timestamp: Timestamp,
);
}
pub trait TwoInMessageProcessorT<S, T, U>: Send + Sync
where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn execute_setup(
&mut self,
left_read_stream: &mut ReadStream<T>,
right_read_stream: &mut ReadStream<U>,
) -> SetupContext<S>;
fn execute_run(
&mut self,
left_read_stream: &mut ReadStream<T>,
right_read_stream: &mut ReadStream<U>,
);
fn execute_destroy(&mut self);
fn left_message_cb_event(&mut self, msg: Arc<Message<T>>) -> OperatorEvent;
fn right_message_cb_event(&mut self, msg: Arc<Message<U>>) -> OperatorEvent;
fn watermark_cb_event(&mut self, timestamp: &Timestamp) -> OperatorEvent;
fn arm_deadlines(
&self,
setup_context: &mut SetupContext<S>,
read_stream_ids: Vec<StreamId>,
condition_context: &ConditionContext,
timestamp: Timestamp,
) -> Vec<DeadlineEvent>;
fn disarm_deadline(&self, deadline_event: &DeadlineEvent) -> bool;
fn cleanup(&mut self) {}
fn invoke_handler(
&self,
setup_context: &mut SetupContext<S>,
deadline_id: DeadlineId,
timestamp: Timestamp,
);
}
pub struct OneInExecutor<S, T>
where
T: Data + for<'a> Deserialize<'a>,
{
config: OperatorConfig,
processor: Box<dyn OneInMessageProcessorT<S, T>>,
helper: OperatorExecutorHelper,
read_stream: Option<ReadStream<T>>,
}
impl<S, T> OneInExecutor<S, T>
where
T: Data + for<'a> Deserialize<'a>,
{
pub fn new(
config: OperatorConfig,
processor: Box<dyn OneInMessageProcessorT<S, T>>,
read_stream: ReadStream<T>,
) -> Self {
let operator_id = config.id;
Self {
config,
processor,
read_stream: Some(read_stream),
helper: OperatorExecutorHelper::new(operator_id),
}
}
pub(crate) async fn execute(
&mut self,
mut channel_from_worker: broadcast::Receiver<OperatorExecutorNotification>,
channel_to_worker: mpsc::UnboundedSender<WorkerNotification>,
channel_to_event_runners: broadcast::Sender<EventNotification>,
) {
self.helper.synchronize().await;
let mut read_stream: ReadStream<T> = self.read_stream.take().unwrap();
let mut setup_context =
tokio::task::block_in_place(|| self.processor.execute_setup(&mut read_stream));
tracing::debug!(
"Node {}: Running Operator {}",
self.config.node_id,
self.config.get_name()
);
tokio::task::block_in_place(|| {
self.processor.execute_run(&mut read_stream);
});
let process_stream_fut = self.helper.process_stream(
read_stream,
&mut (*self.processor),
&channel_to_event_runners,
&mut setup_context,
);
loop {
tokio::select! {
_ = process_stream_fut => break,
notification_result = channel_from_worker.recv() => {
match notification_result {
Ok(notification) => {
match notification {
OperatorExecutorNotification::Shutdown => { break; }
}
}
Err(e) => {
tracing::error!(
"OneInExecutor {}: Error receiving notifications {:?}",
self.operator_id(),
e
);
break;
}
}
}
}
}
tokio::task::block_in_place(|| self.processor.execute_destroy());
self.processor.cleanup();
channel_to_worker
.send(WorkerNotification::DestroyedOperator(self.operator_id()))
.unwrap();
}
}
impl<S, T> OperatorExecutorT for OneInExecutor<S, T>
where
T: Data + for<'a> Deserialize<'a>,
{
fn execute<'a>(
&'a mut self,
channel_from_worker: broadcast::Receiver<OperatorExecutorNotification>,
channel_to_worker: mpsc::UnboundedSender<WorkerNotification>,
channel_to_event_runners: broadcast::Sender<EventNotification>,
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>> {
Box::pin(self.execute(
channel_from_worker,
channel_to_worker,
channel_to_event_runners,
))
}
fn lattice(&self) -> Arc<ExecutionLattice> {
self.helper.get_lattice()
}
fn operator_id(&self) -> OperatorId {
self.config.id
}
}
pub struct TwoInExecutor<S, T, U>
where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
config: OperatorConfig,
processor: Box<dyn TwoInMessageProcessorT<S, T, U>>,
helper: OperatorExecutorHelper,
left_read_stream: Option<ReadStream<T>>,
right_read_stream: Option<ReadStream<U>>,
}
impl<S, T, U> TwoInExecutor<S, T, U>
where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
pub fn new(
config: OperatorConfig,
processor: Box<dyn TwoInMessageProcessorT<S, T, U>>,
left_read_stream: ReadStream<T>,
right_read_stream: ReadStream<U>,
) -> Self {
let operator_id = config.id;
Self {
config,
processor,
left_read_stream: Some(left_read_stream),
right_read_stream: Some(right_read_stream),
helper: OperatorExecutorHelper::new(operator_id),
}
}
pub(crate) async fn execute(
&mut self,
mut channel_from_worker: broadcast::Receiver<OperatorExecutorNotification>,
channel_to_worker: mpsc::UnboundedSender<WorkerNotification>,
channel_to_event_runners: broadcast::Sender<EventNotification>,
) {
self.helper.synchronize().await;
let mut left_read_stream: ReadStream<T> = self.left_read_stream.take().unwrap();
let mut right_read_stream: ReadStream<U> = self.right_read_stream.take().unwrap();
let mut setup_context = tokio::task::block_in_place(|| {
self.processor
.execute_setup(&mut left_read_stream, &mut right_read_stream)
});
tracing::debug!(
"Node {}: Running Operator {}",
self.config.node_id,
self.config.get_name()
);
tokio::task::block_in_place(|| {
self.processor
.execute_run(&mut left_read_stream, &mut right_read_stream);
});
let process_stream_fut = self.helper.process_two_streams(
left_read_stream,
right_read_stream,
&mut (*self.processor),
&channel_to_event_runners,
&mut setup_context,
);
loop {
tokio::select! {
_ = process_stream_fut => break,
notification_result = channel_from_worker.recv() => {
match notification_result {
Ok(notification) => {
match notification {
OperatorExecutorNotification::Shutdown => { break; }
}
}
Err(e) => {
tracing::error!(
"TwoInExecutor {}: Error receiving notifications {:?}",
self.operator_id(),
e
);
break;
}
}
}
}
}
tokio::task::block_in_place(|| self.processor.execute_destroy());
self.processor.cleanup();
channel_to_worker
.send(WorkerNotification::DestroyedOperator(self.operator_id()))
.unwrap();
}
}
impl<S, T, U> OperatorExecutorT for TwoInExecutor<S, T, U>
where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn execute<'a>(
&'a mut self,
channel_from_worker: broadcast::Receiver<OperatorExecutorNotification>,
channel_to_worker: mpsc::UnboundedSender<WorkerNotification>,
channel_to_event_runners: broadcast::Sender<EventNotification>,
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>> {
Box::pin(self.execute(
channel_from_worker,
channel_to_worker,
channel_to_event_runners,
))
}
fn lattice(&self) -> Arc<ExecutionLattice> {
self.helper.get_lattice()
}
fn operator_id(&self) -> OperatorId {
self.config.id
}
}
pub struct OperatorExecutorHelper {
operator_id: OperatorId,
lattice: Arc<ExecutionLattice>,
deadline_queue: DelayQueue<DeadlineEvent, GrowingHeapBuf<DeadlineEvent>>,
deadline_queue_rx: Receiver<DeadlineEvent>,
deadline_to_key_map: HashMap<DeadlineId, DelayHandle>,
}
impl OperatorExecutorHelper {
pub(crate) fn new(operator_id: OperatorId) -> Self {
let (deadline_queue, deadline_queue_rx) = delay_queue();
OperatorExecutorHelper {
operator_id,
lattice: Arc::new(ExecutionLattice::new()),
deadline_queue,
deadline_queue_rx,
deadline_to_key_map: HashMap::new(),
}
}
pub(crate) fn get_lattice(&self) -> Arc<ExecutionLattice> {
Arc::clone(&self.lattice)
}
pub(crate) async fn synchronize(&self) {
tokio::time::sleep(Duration::from_secs(1)).await;
}
fn manage_deadlines(&mut self, deadlines: Vec<DeadlineEvent>) {
for event in deadlines {
if !self.deadline_to_key_map.contains_key(&event.id) {
let event_duration = event.duration;
let deadline_id = event.id;
let queue_key: DelayHandle = self.deadline_queue.insert(event, event_duration);
tracing::debug!(
"Installed a deadline handler for the Deadline {} with the DelayHandle: {:?}",
deadline_id,
queue_key,
);
self.deadline_to_key_map.insert(deadline_id, queue_key);
}
}
}
pub(crate) async fn process_stream<S, T>(
&mut self,
mut read_stream: ReadStream<T>,
message_processor: &mut dyn OneInMessageProcessorT<S, T>,
notifier_tx: &tokio::sync::broadcast::Sender<EventNotification>,
setup_context: &mut SetupContext<S>,
) where
T: Data + for<'a> Deserialize<'a>,
{
let mut condition_context = ConditionContext::new();
loop {
tokio::select! {
Some(deadline_event) = self.deadline_queue_rx.receive() => {
if !message_processor.disarm_deadline(&deadline_event) {
message_processor.invoke_handler(
setup_context,
deadline_event.id,
deadline_event.timestamp.clone(),
);
}
match self.deadline_to_key_map.remove(&deadline_event.id) {
None => {
tracing::warn!(
"Could not find a key corresponding to the Deadline ID: {}",
deadline_event.id,
);
}
Some(key) => {
tracing::debug!(
"Finished invoking the deadline handler for the DelayHandle: {:?} \
corresponding to the Deadline ID: {}",
key,
deadline_event.id,
);
}
}
for stream_id in deadline_event.read_stream_ids {
condition_context.clear_state(stream_id, deadline_event.timestamp.clone());
}
},
Ok(msg) = read_stream.async_read() => {
let events = match msg.data() {
Some(_) => {
condition_context.increment_msg_count(
read_stream.id(),
msg.timestamp().clone(),
);
let msg_ref = Arc::clone(&msg);
let data_event = message_processor.message_cb_event(
msg_ref,
);
vec![data_event]
},
None => {
condition_context.notify_watermark_arrival(
read_stream.id(),
msg.timestamp().clone(),
);
let watermark_event = message_processor.watermark_cb_event(
msg.timestamp());
vec![watermark_event]
}
};
let deadline_events = message_processor.arm_deadlines(
setup_context,
vec![read_stream.id()],
&condition_context,
msg.timestamp().clone()
);
self.manage_deadlines(deadline_events);
self.lattice.add_events(events).await;
notifier_tx
.send(EventNotification::AddedEvents(self.operator_id))
.unwrap();
},
else => break,
}
}
}
pub(crate) async fn process_two_streams<S, T, U>(
&mut self,
mut left_read_stream: ReadStream<T>,
mut right_read_stream: ReadStream<U>,
message_processor: &mut dyn TwoInMessageProcessorT<S, T, U>,
notifier_tx: &tokio::sync::broadcast::Sender<EventNotification>,
setup_context: &mut SetupContext<S>,
) where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
let mut condition_context = ConditionContext::new();
let mut left_watermark = Timestamp::Bottom;
let mut right_watermark = Timestamp::Bottom;
let mut min_watermark = cmp::min(&left_watermark, &right_watermark).clone();
loop {
tokio::select! {
Some(deadline_event) = self.deadline_queue_rx.receive() => {
if !message_processor.disarm_deadline(&deadline_event) {
message_processor.invoke_handler(
setup_context,
deadline_event.id,
deadline_event.timestamp.clone(),
);
}
match self.deadline_to_key_map.remove(&deadline_event.id) {
None => {
tracing::warn!(
"Could not find a key corresponding to the Deadline ID: {}",
deadline_event.id,
);
}
Some(key) => {
tracing::debug!(
"Finished invoking the deadline handler for the DelayHandle: {:?} \
corresponding to the Deadline ID: {}",
key,
deadline_event.id,
);
}
}
for stream_id in deadline_event.read_stream_ids {
condition_context.clear_state(stream_id, deadline_event.timestamp.clone());
}
},
Ok(left_msg) = left_read_stream.async_read() => {
let events = match left_msg.data() {
Some(_) => {
condition_context.increment_msg_count(
left_read_stream.id(),
left_msg.timestamp().clone(),
);
let msg_ref = Arc::clone(&left_msg);
let data_event = message_processor.left_message_cb_event(msg_ref);
vec![data_event]
},
None => {
condition_context.notify_watermark_arrival(
left_read_stream.id(),
left_msg.timestamp().clone(),
);
left_watermark = left_msg.timestamp().clone();
let advance_watermark = cmp::min(
&left_watermark,
&right_watermark,
) > &min_watermark;
if advance_watermark {
min_watermark = left_watermark.clone();
vec![message_processor.watermark_cb_event(
&left_msg.timestamp().clone())]
} else {
Vec::new()
}
}
};
let deadline_events = message_processor.arm_deadlines(
setup_context,
vec![left_read_stream.id(), right_read_stream.id()],
&condition_context,
left_msg.timestamp().clone()
);
self.manage_deadlines(deadline_events);
self.lattice.add_events(events).await;
notifier_tx
.send(EventNotification::AddedEvents(self.operator_id))
.unwrap();
},
Ok(right_msg) = right_read_stream.async_read() => {
let events = match right_msg.data() {
Some(_) => {
condition_context.increment_msg_count(
right_read_stream.id(),
right_msg.timestamp().clone(),
);
let msg_ref = Arc::clone(&right_msg);
let data_event = message_processor.right_message_cb_event(msg_ref);
vec![data_event]
},
None => {
condition_context.notify_watermark_arrival(
right_read_stream.id(),
right_msg.timestamp().clone(),
);
right_watermark = right_msg.timestamp().clone();
let advance_watermark = cmp::min(
&left_watermark,
&right_watermark,
) > &min_watermark;
if advance_watermark {
min_watermark = right_watermark.clone();
vec![message_processor.watermark_cb_event(
&right_msg.timestamp().clone())]
} else {
Vec::new()
}
}
};
let deadline_events = message_processor.arm_deadlines(
setup_context,
vec![left_read_stream.id(), right_read_stream.id()],
&condition_context,
right_msg.timestamp().clone()
);
self.manage_deadlines(deadline_events);
self.lattice.add_events(events).await;
notifier_tx
.send(EventNotification::AddedEvents(self.operator_id))
.unwrap();
}
else => break,
};
}
}
}