use crate::application::ApplicationBuilder;
use crate::data::ArconType;
use crate::dataflow::dfg::ChannelKind;
use crate::dataflow::stream::Stream;
use crate::stream::operator::sink::measure::MeasureSink;
use crate::{
dataflow::stream::OperatorExt,
dataflow::{
builder::OperatorBuilder,
conf::{OperatorConf, ParallelismStrategy},
},
index::EmptyState,
};
use std::sync::Arc;
pub trait ToSinkExt<A: ArconType> {
fn print(self) -> Sink<A>;
fn ignore(self) -> Sink<A>;
fn debug(self) -> Sink<A>;
fn measure(self, log_freq: u64) -> Sink<A>;
}
pub struct Sink<A: ArconType> {
stream: Stream<A>,
debug: bool,
}
impl<A: ArconType> ToSinkExt<A> for Stream<A> {
fn print(mut self) -> Sink<A> {
self.set_channel_kind(ChannelKind::Console);
Sink {
stream: self,
debug: false,
}
}
fn debug(mut self) -> Sink<A> {
self.set_channel_kind(ChannelKind::Forward);
Sink {
stream: self,
debug: true,
}
}
fn ignore(mut self) -> Sink<A> {
self.set_channel_kind(ChannelKind::Mute);
Sink {
stream: self,
debug: false,
}
}
fn measure(self, log_freq: u64) -> Sink<A> {
let mut stream = self.operator(OperatorBuilder {
operator: Arc::new(move || MeasureSink::new(log_freq)),
state: Arc::new(|_| EmptyState),
conf: OperatorConf {
parallelism_strategy: ParallelismStrategy::Static(1),
..Default::default()
},
});
stream.set_channel_kind(ChannelKind::Mute);
Sink {
stream,
debug: false,
}
}
}
mod private {
use super::*;
pub trait Sealed {}
impl<A: ArconType> Sealed for Sink<A> {}
}
pub trait ToBuilderExt: private::Sealed {
fn builder(self) -> ApplicationBuilder;
}
impl<T: ArconType> ToBuilderExt for Sink<T> {
fn builder(mut self) -> ApplicationBuilder {
self.stream.move_last_node();
ApplicationBuilder::new(self.stream.ctx, self.debug)
}
}