use std::sync::Arc;
use serde::Deserialize;
use crate::dataflow::{
context::OneInOneOutContext,
message::Message,
operator::{OneInOneOut, OperatorConfig},
stream::{OperatorStream, Stream, WriteStreamT},
Data,
};
pub struct FlatMapOperator<D, I>
where
D: Data + for<'a> Deserialize<'a>,
I: IntoIterator,
I::Item: Data + for<'a> Deserialize<'a>,
{
flat_map_fn: Arc<dyn Fn(&D) -> I + Send + Sync>,
}
impl<D, I> FlatMapOperator<D, I>
where
D: Data + for<'a> Deserialize<'a>,
I: IntoIterator,
I::Item: Data + for<'a> Deserialize<'a>,
{
pub fn new<F>(flat_map_fn: F) -> Self
where
F: 'static + Fn(&D) -> I + Send + Sync,
{
Self {
flat_map_fn: Arc::new(flat_map_fn),
}
}
}
impl<D, I> OneInOneOut<(), D, I::Item> for FlatMapOperator<D, I>
where
D: Data + for<'a> Deserialize<'a>,
I: IntoIterator,
I::Item: Data + for<'a> Deserialize<'a>,
{
fn on_data(&mut self, ctx: &mut OneInOneOutContext<(), I::Item>, data: &D) {
for item in (self.flat_map_fn)(data).into_iter() {
tracing::trace!(
"{} @ {:?}: received {:?} and sending {:?}",
ctx.operator_config().get_name(),
ctx.timestamp(),
data,
item,
);
let timestamp = ctx.timestamp().clone();
let msg = Message::new_message(timestamp, item);
ctx.write_stream().send(msg).unwrap();
}
}
fn on_watermark(&mut self, _ctx: &mut OneInOneOutContext<(), I::Item>) {}
}
pub trait Map<D1, D2>
where
D1: Data + for<'a> Deserialize<'a>,
D2: Data + for<'a> Deserialize<'a>,
{
fn map<F>(&self, map_fn: F) -> OperatorStream<D2>
where
F: 'static + Fn(&D1) -> D2 + Send + Sync + Clone;
fn flat_map<F, I>(&self, flat_map_fn: F) -> OperatorStream<D2>
where
F: 'static + Fn(&D1) -> I + Send + Sync + Clone,
I: 'static + IntoIterator<Item = D2>;
}
impl<S, D1, D2> Map<D1, D2> for S
where
S: Stream<D1>,
D1: Data + for<'a> Deserialize<'a>,
D2: Data + for<'a> Deserialize<'a>,
{
fn map<F>(&self, map_fn: F) -> OperatorStream<D2>
where
F: 'static + Fn(&D1) -> D2 + Send + Sync + Clone,
{
let op_name = format!("MapOp_{}", self.id());
crate::connect_one_in_one_out(
move || -> FlatMapOperator<D1, _> {
let map_fn = map_fn.clone();
FlatMapOperator::new(move |x| std::iter::once(map_fn(x)))
},
|| {},
OperatorConfig::new().name(&op_name),
self,
)
}
fn flat_map<F, I>(&self, flat_map_fn: F) -> OperatorStream<D2>
where
F: 'static + Fn(&D1) -> I + Send + Sync + Clone,
I: 'static + IntoIterator<Item = D2>,
{
let op_name = format!("FlatMapOp_{}", self.id());
crate::connect_one_in_one_out(
move || -> FlatMapOperator<D1, _> { FlatMapOperator::new(flat_map_fn.clone()) },
|| {},
OperatorConfig::new().name(&op_name),
self,
)
}
}