use std::any::Any;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use crate::dsl::builder::InternalStreamsBuilder;
use crate::dsl::config::Materialized;
use crate::dsl::graph::{GraphNodeKind, LowerState, NodeId};
use crate::dsl::ktable::KTable;
use crate::dsl::names;
use crate::dsl::processors::aggregate::{KStreamAggregateProcessor, KStreamReduceProcessor};
use crate::dsl::processors::tuple_forwarder::TupleForwarder;
use crate::processor::serde::{DefaultSerde, I64Serde, Serde};
use crate::topology::NodeHandle;
pub(crate) type RepartitionLowerFn =
Box<dyn FnOnce(&mut LowerState, String, String, String, String) + Send>;
pub(crate) fn repartition_lower<K, V, KS, VS>(key_serde: KS, value_serde: VS) -> RepartitionLowerFn
where
K: Any + Send + Clone,
V: Any + Send + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<V> + Clone + 'static,
{
Box::new(
move |state: &mut LowerState,
parent_name: String,
sink_name: String,
source_name: String,
topic: String| {
let parent = NodeHandle::<K, V>::from_name(parent_name);
state.topology.add_sink_explicit::<K, V, KS, VS, _, _>(
sink_name,
topic.clone(),
[parent],
crate::processor::serde::Produced::with(key_serde.clone(), value_serde.clone()),
);
state.topology.add_repartition_topic(topic.clone());
state.topology.add_source_explicit::<K, V, KS, VS>(
source_name,
[topic],
crate::processor::serde::Consumed::with(key_serde, value_serde),
);
},
)
}
pub struct KGroupedStream<K, V> {
builder: Rc<RefCell<InternalStreamsBuilder>>,
parent: NodeId,
key_changing_upstream: bool,
#[allow(dead_code)]
grouped_name: Option<String>,
repartition_lower: Option<RepartitionLowerFn>,
pub(crate) source_topic: Option<String>,
_pd: PhantomData<fn() -> (K, V)>,
}
impl<K, V> KGroupedStream<K, V>
where
K: Any + Send + Sync + Clone,
V: Any + Send + Clone,
{
pub(crate) fn new(
builder: Rc<RefCell<InternalStreamsBuilder>>,
parent: NodeId,
key_changing_upstream: bool,
grouped_name: Option<String>,
repartition_lower: RepartitionLowerFn,
) -> Self {
Self {
builder,
parent,
key_changing_upstream,
grouped_name,
repartition_lower: Some(repartition_lower),
source_topic: None,
_pd: PhantomData,
}
}
pub(crate) fn with_source_topic(mut self, topic: Option<String>) -> Self {
self.source_topic = topic;
self
}
pub fn count_explicit<KS, VS>(
self,
materialized: impl Into<Materialized<KS, VS>>,
) -> KTable<K, i64, KS, VS>
where
KS: Serde<K> + Clone + 'static,
VS: Serde<i64> + Clone + 'static,
{
self.aggregate_inner(
materialized.into(),
names::AGGREGATE_STORE,
|| 0i64,
|_k: &K, _v: &V, acc: i64| acc + 1,
)
}
pub fn reduce_explicit<KS, VS, R>(
self,
reducer: R,
materialized: impl Into<Materialized<KS, VS>>,
) -> KTable<K, V, KS, VS>
where
KS: Serde<K> + Clone + 'static,
VS: Serde<V> + Clone + 'static,
R: Fn(&V, &V) -> V + Clone + Send + Sync + 'static,
{
let materialized = materialized.into();
let store_name = mint_store_name(&self.builder, &materialized, names::REDUCE_STORE);
self.lower_reduce::<KS, VS, R>(materialized, store_name, reducer)
}
pub fn aggregate_explicit<KS, VS, VA, I, A>(
self,
init: I,
agg: A,
materialized: impl Into<Materialized<KS, VS>>,
) -> KTable<K, VA, KS, VS>
where
VA: Any + Send + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<VA> + Clone + 'static,
I: Fn() -> VA + Clone + Send + Sync + 'static,
A: Fn(&K, &V, VA) -> VA + Clone + Send + Sync + 'static,
{
self.aggregate_inner(materialized.into(), names::AGGREGATE_STORE, init, agg)
}
pub fn count(
self,
store_name: impl Into<String>,
) -> KTable<K, i64, <K as DefaultSerde>::Serde, I64Serde>
where
K: DefaultSerde,
<K as DefaultSerde>::Serde: Serde<K> + Clone,
{
self.count_explicit(
Materialized::with(<K as DefaultSerde>::Serde::default(), I64Serde)
.as_store(store_name),
)
}
pub fn reduce<R>(
self,
reducer: R,
store_name: impl Into<String>,
) -> KTable<K, V, <K as DefaultSerde>::Serde, <V as DefaultSerde>::Serde>
where
K: DefaultSerde,
V: DefaultSerde,
<K as DefaultSerde>::Serde: Serde<K> + Clone,
<V as DefaultSerde>::Serde: Serde<V> + Clone,
R: Fn(&V, &V) -> V + Clone + Send + Sync + 'static,
{
self.reduce_explicit(
reducer,
Materialized::with(
<K as DefaultSerde>::Serde::default(),
<V as DefaultSerde>::Serde::default(),
)
.as_store(store_name),
)
}
pub fn aggregate<VA, I, A>(
self,
init: I,
agg: A,
store_name: impl Into<String>,
) -> KTable<K, VA, <K as DefaultSerde>::Serde, <VA as DefaultSerde>::Serde>
where
VA: DefaultSerde + Any + Send + Clone,
K: DefaultSerde,
<K as DefaultSerde>::Serde: Serde<K> + Clone,
<VA as DefaultSerde>::Serde: Serde<VA> + Clone,
I: Fn() -> VA + Clone + Send + Sync + 'static,
A: Fn(&K, &V, VA) -> VA + Clone + Send + Sync + 'static,
{
self.aggregate_explicit(
init,
agg,
Materialized::with(
<K as DefaultSerde>::Serde::default(),
<VA as DefaultSerde>::Serde::default(),
)
.as_store(store_name),
)
}
#[must_use]
pub fn windowed_by(
mut self,
windows: crate::dsl::windows::TimeWindows,
) -> crate::dsl::windowed_kgrouped::TimeWindowedKGroupedStream<K, V> {
crate::dsl::windowed_kgrouped::TimeWindowedKGroupedStream::new(
Rc::clone(&self.builder),
self.parent,
self.key_changing_upstream,
self.grouped_name.take(),
self.repartition_lower.take(),
windows,
)
}
#[must_use]
pub fn windowed_by_session(
mut self,
windows: crate::dsl::windows::SessionWindows,
) -> crate::dsl::session_windowed_kgrouped::SessionWindowedKGroupedStream<K, V>
where
V: Sync,
{
crate::dsl::session_windowed_kgrouped::SessionWindowedKGroupedStream::new(
Rc::clone(&self.builder),
self.parent,
self.key_changing_upstream,
self.grouped_name.take(),
self.repartition_lower.take(),
windows,
)
}
#[must_use]
pub fn windowed_by_sliding(
mut self,
windows: crate::dsl::windows::SlidingWindows,
) -> crate::dsl::sliding_windowed_kgrouped::SlidingWindowedKGroupedStream<K, V>
where
V: Sync,
{
crate::dsl::sliding_windowed_kgrouped::SlidingWindowedKGroupedStream::new(
Rc::clone(&self.builder),
self.parent,
self.key_changing_upstream,
self.grouped_name.take(),
self.repartition_lower.take(),
windows,
)
}
#[must_use]
pub fn cogroup<VOut, A>(self, agg: A) -> crate::dsl::cogrouped::CogroupedKStream<K, VOut>
where
V: Sync,
VOut: Any + Send + Sync + Clone,
A: Fn(&K, &V, VOut) -> VOut + Send + Sync + 'static,
{
let builder = Rc::clone(&self.builder);
let (parent, key_changing, rp_lower, source_topic) = self.into_cogroup_parts();
crate::dsl::cogrouped::CogroupedKStream::new(
builder,
vec![crate::dsl::cogrouped::CogroupInput {
parent,
key_changing_upstream: key_changing,
repartition_lower: rp_lower,
make_agg: crate::dsl::cogrouped::make_agg_for_input::<K, V, VOut, A>(agg),
source_topic,
}],
)
}
pub(crate) fn into_cogroup_parts(
mut self,
) -> (NodeId, bool, Option<RepartitionLowerFn>, Option<String>) {
(
self.parent,
self.key_changing_upstream,
self.repartition_lower.take(),
self.source_topic,
)
}
fn aggregate_inner<KS, VS, VA, I, A>(
self,
materialized: Materialized<KS, VS>,
store_prefix: &'static str,
init: I,
agg: A,
) -> KTable<K, VA, KS, VS>
where
VA: Any + Send + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<VA> + Clone + 'static,
I: Fn() -> VA + Clone + Send + Sync + 'static,
A: Fn(&K, &V, VA) -> VA + Clone + Send + Sync + 'static,
{
let store_name = mint_store_name(&self.builder, &materialized, store_prefix);
self.lower_aggregate::<KS, VS, VA, I, A>(materialized, store_name, init, agg)
}
fn lower_aggregate<KS, VS, VA, I, A>(
mut self,
materialized: Materialized<KS, VS>,
store_name: String,
init: I,
agg: A,
) -> KTable<K, VA, KS, VS>
where
VA: Any + Send + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<VA> + Clone + 'static,
I: Fn() -> VA + Clone + Send + Sync + 'static,
A: Fn(&K, &V, VA) -> VA + Clone + Send + Sync + 'static,
{
let Materialized {
key_serde,
value_serde,
logging,
caching,
..
} = materialized;
let suppress_factory = crate::dsl::ktable::kv_suppress_factory::<K, VA, KS, VS>(
key_serde.clone(),
value_serde.clone(),
);
let parent = self.parent;
let key_changing = self.key_changing_upstream;
let rp_lower = self.repartition_lower.take();
let mut g = self.builder.borrow_mut();
let agg_parent =
Self::record_repartition(&mut g, &store_name, parent, key_changing, rp_lower);
let agg_name = g.new_processor_name(names::AGGREGATE);
let agg_id = g.graph.add(
agg_name.clone(),
GraphNodeKind::Aggregate {
store_name: store_name.clone(),
changelog: logging,
},
vec![agg_parent],
);
let store_for_thunk = store_name.clone();
let key_serde_for_lower = key_serde.clone();
let value_serde_for_lower = value_serde.clone();
g.graph.nodes[agg_id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent = NodeHandle::<K, V>::from_name(state.handle_name[&agg_parent].clone());
let store_for_proc = store_for_thunk.clone();
let h = state
.topology
.add_processor::<K, V, K, crate::dsl::processors::change::Change<VA>, _, _, _>(
agg_name.clone(),
move || KStreamAggregateProcessor {
store_name: store_for_proc.clone(),
init: init.clone(),
agg: agg.clone(),
forwarder: TupleForwarder::default(),
_pd: PhantomData,
},
[parent],
);
if logging {
state.topology.add_state_store::<K, VA, KS, VS>(
store_for_thunk.clone(),
key_serde_for_lower.clone(),
value_serde_for_lower.clone(),
[h.name().to_string()],
);
} else {
state
.topology
.add_state_store_no_changelog::<K, VA, KS, VS>(
store_for_thunk.clone(),
key_serde_for_lower.clone(),
value_serde_for_lower.clone(),
);
}
state.topology.mark_store_caching(&store_for_thunk, caching);
state.handle_name.insert(agg_id, h.name().to_string());
}));
drop(g);
KTable::new(
Rc::clone(&self.builder),
agg_id,
Some(store_name),
None,
key_serde,
value_serde,
)
.with_suppress_factory(Some(suppress_factory))
}
fn lower_reduce<KS, VS, R>(
mut self,
materialized: Materialized<KS, VS>,
store_name: String,
reducer: R,
) -> KTable<K, V, KS, VS>
where
KS: Serde<K> + Clone + 'static,
VS: Serde<V> + Clone + 'static,
R: Fn(&V, &V) -> V + Clone + Send + Sync + 'static,
{
let Materialized {
key_serde,
value_serde,
logging,
caching,
..
} = materialized;
let suppress_factory = crate::dsl::ktable::kv_suppress_factory::<K, V, KS, VS>(
key_serde.clone(),
value_serde.clone(),
);
let parent = self.parent;
let key_changing = self.key_changing_upstream;
let rp_lower = self.repartition_lower.take();
let mut g = self.builder.borrow_mut();
let agg_parent =
Self::record_repartition(&mut g, &store_name, parent, key_changing, rp_lower);
let red_name = g.new_processor_name(names::REDUCE);
let red_id = g.graph.add(
red_name.clone(),
GraphNodeKind::Aggregate {
store_name: store_name.clone(),
changelog: logging,
},
vec![agg_parent],
);
let store_for_thunk = store_name.clone();
let key_serde_for_lower = key_serde.clone();
let value_serde_for_lower = value_serde.clone();
g.graph.nodes[red_id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent = NodeHandle::<K, V>::from_name(state.handle_name[&agg_parent].clone());
let store_for_proc = store_for_thunk.clone();
let reducer = reducer.clone();
let h = state
.topology
.add_processor::<K, V, K, crate::dsl::processors::change::Change<V>, _, _, _>(
red_name.clone(),
move || KStreamReduceProcessor {
store_name: store_for_proc.clone(),
reducer: reducer.clone(),
forwarder: TupleForwarder::default(),
_pd: PhantomData,
},
[parent],
);
if logging {
state.topology.add_state_store::<K, V, KS, VS>(
store_for_thunk.clone(),
key_serde_for_lower.clone(),
value_serde_for_lower.clone(),
[h.name().to_string()],
);
} else {
state.topology.add_state_store_no_changelog::<K, V, KS, VS>(
store_for_thunk.clone(),
key_serde_for_lower.clone(),
value_serde_for_lower.clone(),
);
}
state.topology.mark_store_caching(&store_for_thunk, caching);
state.handle_name.insert(red_id, h.name().to_string());
}));
drop(g);
KTable::new(
Rc::clone(&self.builder),
red_id,
Some(store_name),
None,
key_serde,
value_serde,
)
.with_suppress_factory(Some(suppress_factory))
}
pub(crate) fn record_repartition(
g: &mut InternalStreamsBuilder,
store_name: &str,
parent: NodeId,
key_changing_upstream: bool,
repartition_lower: Option<RepartitionLowerFn>,
) -> NodeId {
if !key_changing_upstream {
return parent;
}
let _filter_name = g.new_processor_name(names::FILTER);
let sink_name = g.new_processor_name(names::SINK);
let source_name = g.new_processor_name(names::SOURCE);
let upstream = parent;
let topic_store = store_name.to_string();
let lower = repartition_lower.expect("repartition_lower already consumed");
let rp_id = g.graph.add(
source_name.clone(),
GraphNodeKind::Repartition {
topic: format!("{topic_store}{}", names::REPARTITION_SUFFIX),
partitions: None,
},
vec![upstream],
);
g.graph.nodes[rp_id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent_name = state.handle_name[&upstream].clone();
let topic = format!(
"{}-{topic_store}{}",
state.app_id,
names::REPARTITION_SUFFIX
);
lower(
state,
parent_name,
sink_name.clone(),
source_name.clone(),
topic,
);
state.handle_name.insert(rp_id, source_name.clone());
}));
rp_id
}
}
pub(crate) fn mint_store_name<KS, VS>(
builder: &Rc<RefCell<InternalStreamsBuilder>>,
materialized: &Materialized<KS, VS>,
prefix: &str,
) -> String {
match &materialized.store_name {
Some(name) => name.clone(),
None => builder.borrow_mut().new_processor_name(prefix),
}
}