use std::any::Any;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use crate::dsl::builder::InternalStreamsBuilder;
use crate::dsl::config::Materialized;
use crate::dsl::emit::EmitStrategy;
use crate::dsl::graph::{GraphNodeKind, LowerState, NodeId};
use crate::dsl::kgrouped::{KGroupedStream, RepartitionLowerFn, mint_store_name};
use crate::dsl::ktable::KTable;
use crate::dsl::ktable::SuppressStoreFactory;
use crate::dsl::names;
use crate::dsl::processors::session_aggregate::{
KStreamSessionAggregateProcessor, KStreamSessionReduceProcessor,
};
use crate::dsl::windows::{SessionWindowedSerde, SessionWindows, Windowed};
use crate::processor::serde::{DefaultSerde, Serde};
use crate::topology::NodeHandle;
pub struct SessionWindowedKGroupedStream<K, V> {
builder: Rc<RefCell<InternalStreamsBuilder>>,
parent: NodeId,
key_changing_upstream: bool,
#[allow(dead_code)]
grouped_name: Option<String>,
repartition_lower: Option<RepartitionLowerFn>,
windows: SessionWindows,
emit: EmitStrategy,
_pd: PhantomData<fn() -> (K, V)>,
}
impl<K, V> SessionWindowedKGroupedStream<K, V>
where
K: Any + Send + Sync + Clone,
V: Any + Send + Sync + Clone,
{
pub(crate) fn new(
builder: Rc<RefCell<InternalStreamsBuilder>>,
parent: NodeId,
key_changing_upstream: bool,
grouped_name: Option<String>,
repartition_lower: Option<RepartitionLowerFn>,
windows: SessionWindows,
) -> Self {
Self {
builder,
parent,
key_changing_upstream,
grouped_name,
repartition_lower,
windows,
emit: EmitStrategy::default(),
_pd: PhantomData,
}
}
#[must_use]
pub fn emit_strategy(mut self, emit: EmitStrategy) -> Self {
self.emit = emit;
self
}
pub fn count_explicit<KS, VS>(
self,
materialized: impl Into<Materialized<KS, VS>>,
) -> KTable<Windowed<K>, i64, SessionWindowedSerde<KS>, VS>
where
KS: Serde<K> + Clone + 'static,
VS: Serde<i64> + Clone + 'static,
{
let materialized = materialized.into();
let store_name = mint_store_name(&self.builder, &materialized, names::AGGREGATE_STORE);
if materialized.store_name.is_none() {
self.builder
.borrow_mut()
.new_processor_name(names::AGGREGATE_STORE);
}
self.lower_aggregate::<KS, VS, i64, _, _, _>(
materialized,
store_name,
|| 0i64,
|_k: &K, _v: &V, acc: i64| acc + 1,
|_k: &K, a: i64, b: i64| a + b,
)
}
pub fn aggregate_explicit<KS, VS, VA, I, A, M>(
self,
init: I,
agg: A,
merger: M,
materialized: impl Into<Materialized<KS, VS>>,
) -> KTable<Windowed<K>, VA, SessionWindowedSerde<KS>, VS>
where
VA: Any + Send + Sync + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<VA> + Clone + 'static,
I: Fn() -> VA + Clone + Send + Sync + 'static,
A: Fn(&K, &V, VA) -> VA + Clone + Send + Sync + 'static,
M: Fn(&K, VA, VA) -> VA + Clone + Send + Sync + 'static,
{
let materialized = materialized.into();
let store_name = mint_store_name(&self.builder, &materialized, names::AGGREGATE_STORE);
self.lower_aggregate::<KS, VS, VA, I, A, M>(materialized, store_name, init, agg, merger)
}
pub fn reduce_explicit<KS, VS, R>(
self,
reducer: R,
materialized: impl Into<Materialized<KS, VS>>,
) -> KTable<Windowed<K>, V, SessionWindowedSerde<KS>, VS>
where
KS: Serde<K> + Clone + 'static,
VS: Serde<V> + Clone + 'static,
R: Fn(&V, &V) -> V + Clone + Send + Sync + 'static,
{
let materialized = materialized.into();
let store_name = mint_store_name(&self.builder, &materialized, names::REDUCE_STORE);
self.lower_reduce::<KS, VS, R>(materialized, store_name, reducer)
}
pub fn count(
self,
store_name: impl Into<String>,
) -> KTable<
Windowed<K>,
i64,
SessionWindowedSerde<<K as DefaultSerde>::Serde>,
crate::processor::serde::I64Serde,
>
where
K: DefaultSerde,
<K as DefaultSerde>::Serde: Serde<K> + Clone,
{
self.count_explicit(
Materialized::with(
<K as DefaultSerde>::Serde::default(),
crate::processor::serde::I64Serde,
)
.as_store(store_name),
)
}
pub fn reduce<R>(
self,
reducer: R,
store_name: impl Into<String>,
) -> KTable<
Windowed<K>,
V,
SessionWindowedSerde<<K as DefaultSerde>::Serde>,
<V as DefaultSerde>::Serde,
>
where
K: DefaultSerde,
V: DefaultSerde,
<K as DefaultSerde>::Serde: Serde<K> + Clone,
<V as DefaultSerde>::Serde: Serde<V> + Clone,
R: Fn(&V, &V) -> V + Clone + Send + Sync + 'static,
{
self.reduce_explicit(
reducer,
Materialized::with(
<K as DefaultSerde>::Serde::default(),
<V as DefaultSerde>::Serde::default(),
)
.as_store(store_name),
)
}
pub fn aggregate<VA, I, A, M>(
self,
init: I,
agg: A,
merger: M,
store_name: impl Into<String>,
) -> KTable<
Windowed<K>,
VA,
SessionWindowedSerde<<K as DefaultSerde>::Serde>,
<VA as DefaultSerde>::Serde,
>
where
VA: DefaultSerde + Any + Send + Sync + Clone,
K: DefaultSerde,
<K as DefaultSerde>::Serde: Serde<K> + Clone,
<VA as DefaultSerde>::Serde: Serde<VA> + Clone,
I: Fn() -> VA + Clone + Send + Sync + 'static,
A: Fn(&K, &V, VA) -> VA + Clone + Send + Sync + 'static,
M: Fn(&K, VA, VA) -> VA + Clone + Send + Sync + 'static,
{
self.aggregate_explicit(
init,
agg,
merger,
Materialized::with(
<K as DefaultSerde>::Serde::default(),
<VA as DefaultSerde>::Serde::default(),
)
.as_store(store_name),
)
}
#[allow(clippy::too_many_lines)]
#[allow(clippy::too_many_arguments)]
fn lower_aggregate<KS, VS, VA, I, A, M>(
mut self,
materialized: Materialized<KS, VS>,
store_name: String,
init: I,
agg: A,
merger: M,
) -> KTable<Windowed<K>, VA, SessionWindowedSerde<KS>, VS>
where
VA: Any + Send + Sync + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<VA> + Clone + 'static,
I: Fn() -> VA + Clone + Send + Sync + 'static,
A: Fn(&K, &V, VA) -> VA + Clone + Send + Sync + 'static,
M: Fn(&K, VA, VA) -> VA + Clone + Send + Sync + 'static,
{
let Materialized {
key_serde,
value_serde,
caching,
..
} = materialized;
let suppress_factory =
session_suppress_factory::<K, VA, KS, VS>(key_serde.clone(), value_serde.clone());
let parent = self.parent;
let key_changing = self.key_changing_upstream;
let rp_lower = self.repartition_lower.take();
let windows = self.windows;
let emit = self.emit;
let mut g = self.builder.borrow_mut();
let agg_parent = KGroupedStream::<K, V>::record_repartition(
&mut g,
&store_name,
parent,
key_changing,
rp_lower,
);
let agg_name = g.new_processor_name(names::AGGREGATE);
let agg_id = g.graph.add(
agg_name.clone(),
GraphNodeKind::Aggregate {
store_name: store_name.clone(),
changelog: true,
},
vec![agg_parent],
);
let store_for_thunk = store_name.clone();
let key_serde_for_lower = key_serde.clone();
let value_serde_for_lower = value_serde.clone();
g.graph.nodes[agg_id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent = NodeHandle::<K, V>::from_name(state.handle_name[&agg_parent].clone());
let store_for_proc = store_for_thunk.clone();
let init = init.clone();
let agg = agg.clone();
let merger = merger.clone();
let h = state
.topology
.add_processor::<K, V, Windowed<K>, crate::dsl::processors::change::Change<VA>, _, _, _>(
agg_name.clone(),
move || KStreamSessionAggregateProcessor {
store_name: store_for_proc.clone(),
gap_ms: windows.gap_ms,
init: init.clone(),
agg: agg.clone(),
merger: merger.clone(),
emit,
grace_ms: windows.grace_ms,
stream_time: i64::MIN,
last_emitted_close: i64::MIN,
forwarder: crate::dsl::processors::tuple_forwarder::TupleForwarder::default(),
_pd: PhantomData,
},
[parent],
);
state.topology.add_session_store::<K, VA, KS, VS>(
store_for_thunk.clone(),
key_serde_for_lower.clone(),
value_serde_for_lower.clone(),
windows.gap_ms,
windows.grace_ms,
[h.name().to_string()],
);
state
.topology
.mark_store_caching(&store_for_thunk, caching && emit.is_on_update());
state.handle_name.insert(agg_id, h.name().to_string());
}));
drop(g);
KTable::new(
Rc::clone(&self.builder),
agg_id,
Some(store_name),
None,
SessionWindowedSerde::new(key_serde.clone()),
value_serde.clone(),
)
.with_window_grace(Some(windows.grace_ms))
.with_suppress_factory(Some(suppress_factory))
}
#[allow(clippy::too_many_lines)]
fn lower_reduce<KS, VS, R>(
mut self,
materialized: Materialized<KS, VS>,
store_name: String,
reducer: R,
) -> KTable<Windowed<K>, V, SessionWindowedSerde<KS>, VS>
where
KS: Serde<K> + Clone + 'static,
VS: Serde<V> + Clone + 'static,
R: Fn(&V, &V) -> V + Clone + Send + Sync + 'static,
{
let Materialized {
key_serde,
value_serde,
caching,
..
} = materialized;
let suppress_factory =
session_suppress_factory::<K, V, KS, VS>(key_serde.clone(), value_serde.clone());
let parent = self.parent;
let key_changing = self.key_changing_upstream;
let rp_lower = self.repartition_lower.take();
let windows = self.windows;
let emit = self.emit;
let mut g = self.builder.borrow_mut();
let agg_parent = KGroupedStream::<K, V>::record_repartition(
&mut g,
&store_name,
parent,
key_changing,
rp_lower,
);
let red_name = g.new_processor_name(names::REDUCE);
let red_id = g.graph.add(
red_name.clone(),
GraphNodeKind::Aggregate {
store_name: store_name.clone(),
changelog: true,
},
vec![agg_parent],
);
let store_for_thunk = store_name.clone();
let key_serde_for_lower = key_serde.clone();
let value_serde_for_lower = value_serde.clone();
g.graph.nodes[red_id].lower = Some(Box::new(move |state: &mut LowerState| {
let parent = NodeHandle::<K, V>::from_name(state.handle_name[&agg_parent].clone());
let store_for_proc = store_for_thunk.clone();
let reducer = reducer.clone();
let h = state
.topology
.add_processor::<K, V, Windowed<K>, crate::dsl::processors::change::Change<V>, _, _, _>(
red_name.clone(),
move || KStreamSessionReduceProcessor {
store_name: store_for_proc.clone(),
gap_ms: windows.gap_ms,
reducer: reducer.clone(),
emit,
grace_ms: windows.grace_ms,
stream_time: i64::MIN,
last_emitted_close: i64::MIN,
forwarder: crate::dsl::processors::tuple_forwarder::TupleForwarder::default(),
_pd: PhantomData,
},
[parent],
);
state.topology.add_session_store::<K, V, KS, VS>(
store_for_thunk.clone(),
key_serde_for_lower.clone(),
value_serde_for_lower.clone(),
windows.gap_ms,
windows.grace_ms,
[h.name().to_string()],
);
state
.topology
.mark_store_caching(&store_for_thunk, caching && emit.is_on_update());
state.handle_name.insert(red_id, h.name().to_string());
}));
drop(g);
KTable::new(
Rc::clone(&self.builder),
red_id,
Some(store_name),
None,
SessionWindowedSerde::new(key_serde.clone()),
value_serde.clone(),
)
.with_window_grace(Some(windows.grace_ms))
.with_suppress_factory(Some(suppress_factory))
}
}
fn session_suppress_factory<K, VA, KS, VS>(key_serde: KS, value_serde: VS) -> SuppressStoreFactory
where
K: Any + Send + Sync + Clone,
VA: Any + Send + Clone,
KS: Serde<K> + Clone + 'static,
VS: Serde<VA> + Clone + 'static,
{
std::sync::Arc::new(
move |state: &mut LowerState, store_name: &str, proc_name: &str, logging: bool| {
state
.topology
.add_suppress_store::<Windowed<K>, VA, SessionWindowedSerde<KS>, VS>(
store_name.to_string(),
SessionWindowedSerde::new(key_serde.clone()),
value_serde.clone(),
logging,
[proc_name.to_string()],
);
},
)
}
#[cfg(test)]
mod tests {
use assert2::check;
use crate::dsl::StreamsBuilder;
use crate::dsl::emit::EmitStrategy;
use crate::dsl::windows::{SessionWindowedSerde, SessionWindows, Window, Windowed};
use crate::processor::serde::{Consumed, I64Serde, Produced, StringSerde};
use crate::test_driver::TopologyTestDriver;
#[test]
fn emit_on_update_session_store_is_cached() {
let b = StreamsBuilder::new();
b.stream::<String, String>(["in"])
.group_by_key()
.windowed_by_session(SessionWindows::of_inactivity_gap(60))
.count("s");
let built = b.build("app").unwrap();
let g = pollster::block_on(built.instantiate(
&crate::store::backend::StoreBackend::InMemory,
"app",
10_485_760,
))
.unwrap();
check!(
g.cache_owner.contains_key("s"),
"emit-on-update session store must be cached, cache_owner = {:?}",
g.cache_owner
);
}
#[test]
fn emit_final_session_store_is_not_cached() {
let b = StreamsBuilder::new();
b.stream::<String, String>(["in"])
.group_by_key()
.windowed_by_session(SessionWindows::of_inactivity_gap(60).grace(10))
.emit_strategy(EmitStrategy::on_window_close())
.count("s");
let built = b.build("app").unwrap();
let g = pollster::block_on(built.instantiate(
&crate::store::backend::StoreBackend::InMemory,
"app",
10_485_760,
))
.unwrap();
check!(
!g.cache_owner.contains_key("s"),
"emit-final session store must NOT be cached, cache_owner = {:?}",
g.cache_owner
);
}
#[test]
fn dsl_session_count_emit_final_emits_once_on_close() {
let b = StreamsBuilder::new();
b.stream::<String, String>(["in"])
.group_by_key()
.windowed_by_session(SessionWindows::of_inactivity_gap(60).grace(10))
.emit_strategy(EmitStrategy::on_window_close())
.count("s")
.to_stream()
.to_explicit(
"out",
Produced::with(SessionWindowedSerde::new(StringSerde), I64Serde),
);
let built = b.build("app").unwrap();
let mut d = TopologyTestDriver::new(&built).unwrap();
for ts in [1, 4, 1000] {
d.pipe_input(
"in",
Consumed::with(StringSerde, StringSerde),
Some("a".to_string()),
"x".to_string(),
ts,
);
}
let p = || Produced::with(SessionWindowedSerde::new(StringSerde), I64Serde);
assert_eq!(
d.read_output("out", p()),
Some((
Some(Windowed {
key: "a".into(),
window: Window { start: 1, end: 4 }
}),
2
)),
"emit-final forwards session [1,4] with final count 2 on close"
);
assert_eq!(
d.read_output("out", p()),
None,
"exactly one emit-final record"
);
}
}