use serde::Deserialize;
use crate::{
dataflow::{context::*, AppendableState, Data, ReadStream, State, WriteStream},
node::NodeId,
OperatorId,
};
#[allow(unused_variables)]
pub trait Source<T>: Send + Sync
where
T: Data + for<'a> Deserialize<'a>,
{
fn run(&mut self, config: &OperatorConfig, write_stream: &mut WriteStream<T>) {}
fn destroy(&mut self) {}
}
#[allow(unused_variables)]
pub trait ParallelSink<S: AppendableState<U>, T: Data, U>: Send + Sync {
fn setup(&mut self, setup_context: &mut SetupContext<S>) {}
fn run(&mut self, config: &OperatorConfig, read_stream: &mut ReadStream<T>) {}
fn destroy(&mut self) {}
fn on_data(&self, ctx: &ParallelSinkContext<S, U>, data: &T);
fn on_watermark(&self, ctx: &mut ParallelSinkContext<S, U>);
}
#[allow(unused_variables)]
pub trait Sink<S: State, T: Data>: Send + Sync {
fn setup(&mut self, setup_context: &mut SetupContext<S>) {}
fn run(&mut self, config: &OperatorConfig, read_stream: &mut ReadStream<T>) {}
fn destroy(&mut self) {}
fn on_data(&mut self, ctx: &mut SinkContext<S>, data: &T);
fn on_watermark(&mut self, ctx: &mut SinkContext<S>);
}
#[allow(unused_variables)]
pub trait ParallelOneInOneOut<S, T, U, V>: Send + Sync
where
S: AppendableState<V>,
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn setup(&mut self, setup_context: &mut SetupContext<S>) {}
fn run(
&mut self,
config: &OperatorConfig,
read_stream: &mut ReadStream<T>,
write_stream: &mut WriteStream<U>,
) {
}
fn destroy(&mut self) {}
fn on_data(&self, ctx: &ParallelOneInOneOutContext<S, U, V>, data: &T);
fn on_watermark(&self, ctx: &mut ParallelOneInOneOutContext<S, U, V>);
}
#[allow(unused_variables)]
pub trait OneInOneOut<S, T, U>: Send + Sync
where
S: State,
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
{
fn setup(&mut self, setup_context: &mut SetupContext<S>) {}
fn run(
&mut self,
config: &OperatorConfig,
read_stream: &mut ReadStream<T>,
write_stream: &mut WriteStream<U>,
) {
}
fn destroy(&mut self) {}
fn on_data(&mut self, ctx: &mut OneInOneOutContext<S, U>, data: &T);
fn on_watermark(&mut self, ctx: &mut OneInOneOutContext<S, U>);
}
#[allow(unused_variables)]
pub trait ParallelTwoInOneOut<S, T, U, V, W>: Send + Sync
where
S: AppendableState<W>,
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
V: Data + for<'a> Deserialize<'a>,
{
fn setup(&mut self, setup_context: &mut SetupContext<S>) {}
fn run(
&mut self,
config: &OperatorConfig,
left_read_stream: &mut ReadStream<T>,
right_read_stream: &mut ReadStream<U>,
write_stream: &mut WriteStream<V>,
) {
}
fn destroy(&mut self) {}
fn on_left_data(&self, ctx: &ParallelTwoInOneOutContext<S, V, W>, data: &T);
fn on_right_data(&self, ctx: &ParallelTwoInOneOutContext<S, V, W>, data: &U);
fn on_watermark(&self, ctx: &mut ParallelTwoInOneOutContext<S, V, W>);
}
#[allow(unused_variables)]
pub trait TwoInOneOut<S, T, U, V>: Send + Sync
where
S: State,
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
V: Data + for<'a> Deserialize<'a>,
{
fn setup(&mut self, setup_context: &mut SetupContext<S>) {}
fn run(
&mut self,
config: &OperatorConfig,
left_read_stream: &mut ReadStream<T>,
right_read_stream: &mut ReadStream<U>,
write_stream: &mut WriteStream<V>,
) {
}
fn destroy(&mut self) {}
fn on_left_data(&mut self, ctx: &mut TwoInOneOutContext<S, V>, data: &T);
fn on_right_data(&mut self, ctx: &mut TwoInOneOutContext<S, V>, data: &U);
fn on_watermark(&mut self, ctx: &mut TwoInOneOutContext<S, V>);
}
#[allow(unused_variables)]
pub trait ParallelOneInTwoOut<S, T, U, V, W>: Send + Sync
where
S: AppendableState<W>,
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
V: Data + for<'a> Deserialize<'a>,
{
fn setup(&mut self, setup_context: &mut SetupContext<S>) {}
fn run(
&mut self,
config: &OperatorConfig,
read_stream: &mut ReadStream<T>,
left_write_stream: &mut WriteStream<U>,
right_write_stream: &mut WriteStream<V>,
) {
}
fn destroy(&mut self) {}
fn on_data(&self, ctx: &ParallelOneInTwoOutContext<S, U, V, W>, data: &T);
fn on_watermark(&self, ctx: &mut ParallelOneInTwoOutContext<S, U, V, W>);
}
#[allow(unused_variables)]
pub trait OneInTwoOut<S, T, U, V>: Send + Sync
where
S: State,
T: Data + for<'a> Deserialize<'a>,
U: Data + for<'a> Deserialize<'a>,
V: Data + for<'a> Deserialize<'a>,
{
fn setup(&mut self, setup_context: &mut SetupContext<S>) {}
fn run(
&mut self,
config: &OperatorConfig,
read_stream: &mut ReadStream<T>,
left_write_stream: &mut WriteStream<U>,
right_write_stream: &mut WriteStream<V>,
) {
}
fn destroy(&mut self) {}
fn on_data(&mut self, ctx: &mut OneInTwoOutContext<S, U, V>, data: &T);
fn on_watermark(&mut self, ctx: &mut OneInTwoOutContext<S, U, V>);
}
#[derive(Clone)]
pub struct OperatorConfig {
pub name: Option<String>,
pub id: OperatorId,
pub flow_watermarks: bool,
pub node_id: NodeId,
}
impl OperatorConfig {
pub fn new() -> Self {
Self {
id: OperatorId::nil(),
name: None,
flow_watermarks: true,
node_id: 0,
}
}
pub fn name(mut self, name: &str) -> Self {
self.name = Some(name.to_string());
self
}
pub fn flow_watermarks(mut self, flow_watermarks: bool) -> Self {
self.flow_watermarks = flow_watermarks;
self
}
pub fn node(mut self, node_id: NodeId) -> Self {
self.node_id = node_id;
self
}
pub fn get_name(&self) -> String {
self.name.clone().unwrap_or_else(|| format!("{}", self.id))
}
}
impl Default for OperatorConfig {
fn default() -> Self {
Self::new()
}
}