use serde::Deserialize;
use std::{
collections::HashSet,
marker::PhantomData,
sync::{Arc, Mutex},
};
use crate::{
dataflow::{
context::{ParallelSinkContext, SetupContext, SinkContext},
deadlines::{ConditionContext, DeadlineEvent, DeadlineId},
operator::{OperatorConfig, ParallelSink, Sink},
stream::StreamId,
AppendableState, Data, Message, ReadStream, State, Timestamp,
},
node::{
operator_event::{OperatorEvent, OperatorType},
operator_executors::OneInMessageProcessorT,
},
Uuid,
};
pub struct ParallelSinkMessageProcessor<O, S, T, U>
where
O: 'static + ParallelSink<S, T, U>,
S: AppendableState<U>,
T: Data + for<'a> Deserialize<'a>,
U: 'static + Send + Sync,
{
config: OperatorConfig,
operator: Arc<O>,
state: Arc<S>,
state_ids: HashSet<Uuid>,
phantom_t: PhantomData<T>,
phantom_u: PhantomData<U>,
}
impl<O, S, T, U> ParallelSinkMessageProcessor<O, S, T, U>
where
O: 'static + ParallelSink<S, T, U>,
S: AppendableState<U>,
T: Data + for<'a> Deserialize<'a>,
U: 'static + Send + Sync,
{
pub fn new(
config: OperatorConfig,
operator_fn: impl Fn() -> O + Send,
state_fn: impl Fn() -> S + Send,
) -> Self {
Self {
config,
operator: Arc::new(operator_fn()),
state: Arc::new(state_fn()),
state_ids: vec![Uuid::new_deterministic()].into_iter().collect(),
phantom_t: PhantomData,
phantom_u: PhantomData,
}
}
}
impl<O, S, T, U> OneInMessageProcessorT<S, T> for ParallelSinkMessageProcessor<O, S, T, U>
where
O: 'static + ParallelSink<S, T, U>,
S: AppendableState<U>,
T: Data + for<'a> Deserialize<'a>,
U: 'static + Send + Sync,
{
fn execute_setup(&mut self, read_stream: &mut ReadStream<T>) -> SetupContext<S> {
let mut setup_context = SetupContext::new(vec![read_stream.id()], vec![]);
Arc::get_mut(&mut self.operator)
.unwrap()
.setup(&mut setup_context);
setup_context
}
fn execute_run(&mut self, read_stream: &mut ReadStream<T>) {
Arc::get_mut(&mut self.operator)
.unwrap()
.run(&self.config, read_stream);
}
fn execute_destroy(&mut self) {
Arc::get_mut(&mut self.operator).unwrap().destroy();
}
fn message_cb_event(&mut self, msg: Arc<Message<T>>) -> OperatorEvent {
let operator = Arc::clone(&self.operator);
let state = Arc::clone(&self.state);
let time = msg.timestamp().clone();
let config = self.config.clone();
OperatorEvent::new(
time.clone(),
false,
0,
HashSet::new(),
HashSet::new(),
move || {
operator.on_data(
&ParallelSinkContext::new(time, config, &state),
msg.data().unwrap(),
)
},
OperatorType::Parallel,
)
}
fn watermark_cb_event(&mut self, timestamp: &Timestamp) -> OperatorEvent {
let operator = Arc::clone(&self.operator);
let state = Arc::clone(&self.state);
let time = timestamp.clone();
let config = self.config.clone();
OperatorEvent::new(
time.clone(),
true,
0,
HashSet::new(),
self.state_ids.clone(),
move || {
operator.on_watermark(&mut ParallelSinkContext::new(time.clone(), config, &state));
state.commit(&time);
},
OperatorType::Parallel,
)
}
fn arm_deadlines(
&self,
setup_context: &mut SetupContext<S>,
read_stream_ids: Vec<StreamId>,
condition_context: &ConditionContext,
timestamp: Timestamp,
) -> Vec<DeadlineEvent> {
let mut deadline_events = Vec::new();
let state = Arc::clone(&self.state);
for deadline in setup_context.deadlines() {
if deadline
.get_constrained_read_stream_ids()
.is_superset(&read_stream_ids.iter().cloned().collect())
&& deadline.invoke_start_condition(&read_stream_ids, condition_context, ×tamp)
{
let deadline_duration = deadline.calculate_deadline(&state, ×tamp);
deadline_events.push(DeadlineEvent::new(
deadline.get_constrained_read_stream_ids().clone(),
deadline.get_constrained_write_stream_ids().clone(),
timestamp.clone(),
deadline_duration,
deadline.get_end_condition_fn(),
deadline.id(),
));
}
}
deadline_events
}
fn disarm_deadline(&self, deadline_event: &DeadlineEvent) -> bool {
self.state.last_committed_timestamp() >= deadline_event.timestamp
}
fn invoke_handler(
&self,
setup_context: &mut SetupContext<S>,
deadline_id: DeadlineId,
timestamp: Timestamp,
) {
setup_context.invoke_handler(deadline_id, &(*self.state), ×tamp);
}
}
pub struct SinkMessageProcessor<O, S, T>
where
O: 'static + Sink<S, T>,
S: State,
T: Data + for<'a> Deserialize<'a>,
{
config: OperatorConfig,
operator: Arc<Mutex<O>>,
state: Arc<Mutex<S>>,
state_ids: HashSet<Uuid>,
phantom_t: PhantomData<T>,
}
impl<O, S, T> SinkMessageProcessor<O, S, T>
where
O: 'static + Sink<S, T>,
S: State,
T: Data + for<'a> Deserialize<'a>,
{
pub fn new(
config: OperatorConfig,
operator_fn: impl Fn() -> O + Send,
state_fn: impl Fn() -> S + Send,
) -> Self {
Self {
config,
operator: Arc::new(Mutex::new(operator_fn())),
state: Arc::new(Mutex::new(state_fn())),
state_ids: vec![Uuid::new_deterministic()].into_iter().collect(),
phantom_t: PhantomData,
}
}
}
impl<O, S, T> OneInMessageProcessorT<S, T> for SinkMessageProcessor<O, S, T>
where
O: 'static + Sink<S, T>,
S: State,
T: Data + for<'a> Deserialize<'a>,
{
fn execute_setup(&mut self, read_stream: &mut ReadStream<T>) -> SetupContext<S> {
let mut setup_context = SetupContext::new(vec![read_stream.id()], vec![]);
self.operator.lock().unwrap().setup(&mut setup_context);
setup_context
}
fn execute_run(&mut self, read_stream: &mut ReadStream<T>) {
self.operator.lock().unwrap().run(&self.config, read_stream);
}
fn execute_destroy(&mut self) {
self.operator.lock().unwrap().destroy();
}
fn message_cb_event(&mut self, msg: Arc<Message<T>>) -> OperatorEvent {
let operator = Arc::clone(&self.operator);
let state = Arc::clone(&self.state);
let time = msg.timestamp().clone();
let config = self.config.clone();
OperatorEvent::new(
time.clone(),
false,
0,
HashSet::new(),
HashSet::new(),
move || {
let mut mutable_operator = operator.lock().unwrap();
let mut mutable_state = state.lock().unwrap();
mutable_operator.on_data(
&mut SinkContext::new(time, config, &mut mutable_state),
msg.data().unwrap(),
)
},
OperatorType::Sequential,
)
}
fn watermark_cb_event(&mut self, timestamp: &Timestamp) -> OperatorEvent {
let operator = Arc::clone(&self.operator);
let state = Arc::clone(&self.state);
let config = self.config.clone();
let time = timestamp.clone();
OperatorEvent::new(
time.clone(),
true,
0,
HashSet::new(),
self.state_ids.clone(),
move || {
let mut mutable_operator = operator.lock().unwrap();
let mut mutable_state = state.lock().unwrap();
mutable_operator.on_watermark(&mut SinkContext::new(
time.clone(),
config,
&mut mutable_state,
));
mutable_state.commit(&time);
},
OperatorType::Sequential,
)
}
fn arm_deadlines(
&self,
setup_context: &mut SetupContext<S>,
read_stream_ids: Vec<StreamId>,
condition_context: &ConditionContext,
timestamp: Timestamp,
) -> Vec<DeadlineEvent> {
let mut deadline_events = Vec::new();
let state = Arc::clone(&self.state);
for deadline in setup_context.deadlines() {
if deadline
.get_constrained_read_stream_ids()
.is_superset(&read_stream_ids.iter().cloned().collect())
&& deadline.invoke_start_condition(&read_stream_ids, condition_context, ×tamp)
{
let deadline_duration =
deadline.calculate_deadline(&(*state.lock().unwrap()), ×tamp);
deadline_events.push(DeadlineEvent::new(
deadline.get_constrained_read_stream_ids().clone(),
deadline.get_constrained_write_stream_ids().clone(),
timestamp.clone(),
deadline_duration,
deadline.get_end_condition_fn(),
deadline.id(),
));
}
}
deadline_events
}
fn disarm_deadline(&self, deadline_event: &DeadlineEvent) -> bool {
self.state.lock().unwrap().last_committed_timestamp() >= deadline_event.timestamp
}
fn invoke_handler(
&self,
setup_context: &mut SetupContext<S>,
deadline_id: DeadlineId,
timestamp: Timestamp,
) {
setup_context.invoke_handler(deadline_id, &(*self.state.lock().unwrap()), ×tamp);
}
}