use serde::Deserialize;
use crate::{
dataflow::{
context::TwoInOneOutContext,
operator::TwoInOneOut,
stream::{OperatorStream, WriteStreamT},
Data, Message, Stream,
},
OperatorConfig,
};
#[derive(Default)]
pub struct ConcatOperator {}
impl ConcatOperator {
pub fn new() -> Self {
Self {}
}
}
impl<D: Data> TwoInOneOut<(), D, D, D> for ConcatOperator
where
for<'a> D: Data + Deserialize<'a>,
{
fn on_left_data(&mut self, ctx: &mut TwoInOneOutContext<(), D>, data: &D) {
let msg = Message::new_message(ctx.timestamp().clone(), data.clone());
ctx.write_stream().send(msg).unwrap();
}
fn on_right_data(&mut self, ctx: &mut TwoInOneOutContext<(), D>, data: &D) {
let msg = Message::new_message(ctx.timestamp().clone(), data.clone());
ctx.write_stream().send(msg).unwrap();
}
fn on_watermark(&mut self, _ctx: &mut TwoInOneOutContext<(), D>) {}
}
pub trait Concat<D>
where
D: Data + for<'a> Deserialize<'a>,
{
fn concat(&self, other: &dyn Stream<D>) -> OperatorStream<D>;
}
impl<S, D> Concat<D> for S
where
S: Stream<D>,
D: Data + for<'a> Deserialize<'a>,
{
fn concat(&self, other: &dyn Stream<D>) -> OperatorStream<D> {
let name = format!("ConcatOp_{}_{}", self.name(), other.name());
crate::connect_two_in_one_out(
ConcatOperator::new,
|| {},
OperatorConfig::new().name(&name),
self,
other,
)
}
}