use serde::Deserialize;
use crate::dataflow::{
context::TwoInOneOutContext,
message::Message,
operator::{OperatorConfig, TwoInOneOut},
state::TimeVersionedState,
stream::{OperatorStream, Stream, WriteStreamT},
Data,
};
#[derive(Default)]
pub struct TimestampJoinOperator {}
impl TimestampJoinOperator {
pub fn new() -> Self {
Self {}
}
}
impl<T, U> TwoInOneOut<TimeVersionedState<(Vec<T>, Vec<U>)>, T, U, (T, U)> for TimestampJoinOperator
where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn on_left_data(
&mut self,
ctx: &mut TwoInOneOutContext<TimeVersionedState<(Vec<T>, Vec<U>)>, (T, U)>,
data: &T,
) {
let (left_items, right_items) = ctx.current_state().unwrap();
left_items.push(data.clone());
let num_right_items = right_items.len();
for i in 0..num_right_items {
let right_item = ctx.current_state().unwrap().1[i].clone();
let msg = Message::new_message(ctx.timestamp().clone(), (data.clone(), right_item));
ctx.write_stream().send(msg).unwrap();
}
}
fn on_right_data(
&mut self,
ctx: &mut TwoInOneOutContext<TimeVersionedState<(Vec<T>, Vec<U>)>, (T, U)>,
data: &U,
) {
let (left_items, right_items) = ctx.current_state().unwrap();
right_items.push(data.clone());
let num_left_items = left_items.len();
for i in 0..num_left_items {
let left_item = ctx.current_state().unwrap().0[i].clone();
let msg = Message::new_message(ctx.timestamp().clone(), (left_item, data.clone()));
ctx.write_stream().send(msg).unwrap();
}
}
fn on_watermark(
&mut self,
ctx: &mut TwoInOneOutContext<TimeVersionedState<(Vec<T>, Vec<U>)>, (T, U)>,
) {
let timestamp = ctx.timestamp().clone();
ctx.state_mut().evict_until(×tamp);
}
}
pub trait Join<T, U>
where
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn timestamp_join(&self, other: &dyn Stream<U>) -> OperatorStream<(T, U)>;
}
impl<S, T, U> Join<T, U> for S
where
S: Stream<T>,
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn timestamp_join(&self, other: &dyn Stream<U>) -> OperatorStream<(T, U)> {
let name = format!("TimestampJoinOp_{}_{}", self.name(), other.name());
crate::connect_two_in_one_out(
TimestampJoinOperator::new,
TimeVersionedState::new,
OperatorConfig::new().name(&name),
self,
other,
)
}
}