use std::any::Any;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use crate::dsl::builder::InternalStreamsBuilder;
use crate::dsl::cogrouped::{
CogroupInput, CogroupKind, CogroupSpec, CogroupedKStream, StoreRegistrarFn, lower_cogroup,
};
use crate::dsl::config::Materialized;
use crate::dsl::kgrouped::mint_store_name;
use crate::dsl::ktable::KTable;
use crate::dsl::names;
use crate::dsl::windows::{SlidingWindows, TimeWindowedSerde, Windowed};
use crate::processor::serde::Serde;
impl<K, VOut> CogroupedKStream<K, VOut>
where
K: Any + Send + Sync + Clone,
VOut: Any + Send + Sync + Clone,
{
#[must_use]
pub fn windowed_by_sliding(
self,
windows: SlidingWindows,
) -> SlidingWindowedCogroupedStream<K, VOut> {
SlidingWindowedCogroupedStream {
builder: self.builder,
inputs: self.inputs,
windows,
_pd: PhantomData,
}
}
}
pub struct SlidingWindowedCogroupedStream<K, VOut> {
builder: Rc<RefCell<InternalStreamsBuilder>>,
inputs: Vec<CogroupInput<K, VOut>>,
windows: SlidingWindows,
_pd: PhantomData<fn() -> (K, VOut)>,
}
impl<K, VOut> SlidingWindowedCogroupedStream<K, VOut>
where
K: Any + Send + Sync + Clone,
VOut: Any + Send + Sync + Clone,
{
pub fn aggregate_explicit<KS, VS, I>(
self,
init: I,
materialized: impl Into<Materialized<KS, VS>>,
) -> KTable<Windowed<K>, VOut, TimeWindowedSerde<KS>, VS>
where
KS: Serde<K> + Clone + 'static,
VS: Serde<VOut> + Clone + 'static,
I: Fn() -> VOut + Send + Sync + 'static,
{
let materialized = materialized.into();
let store_name = mint_store_name(&self.builder, &materialized, names::AGGREGATE_STORE);
let Materialized {
key_serde,
value_serde,
logging,
caching,
..
} = materialized;
let spec = CogroupSpec::<K, VOut> {
kind: CogroupKind::Sliding(self.windows),
init: Arc::new(init),
merger: None,
};
let ks = key_serde.clone();
let vs = value_serde.clone();
let store_for_reg = store_name.clone();
let size = self.windows.time_difference_ms * 2;
let window_size = self.windows.time_difference_ms;
let grace = self.windows.grace_ms;
let registrar: StoreRegistrarFn = Box::new(move |state, procs| {
state.topology.add_window_store::<K, VOut, KS, VS>(
store_for_reg.clone(),
ks.clone(),
vs.clone(),
size,
window_size,
grace,
procs,
);
state.topology.mark_store_caching(&store_for_reg, caching);
});
let merge_id = lower_cogroup::<K, VOut, Windowed<K>>(
&self.builder,
self.inputs,
store_name.clone(),
spec,
logging,
registrar,
);
KTable::new(
Rc::clone(&self.builder),
merge_id,
Some(store_name),
None,
TimeWindowedSerde::new(key_serde, self.windows.time_difference_ms),
value_serde,
)
.with_window_grace(Some(self.windows.grace_ms))
}
}
#[cfg(test)]
mod caching_tests {
use assert2::check;
use crate::dsl::StreamsBuilder;
use crate::dsl::windows::TimeWindowedSerde;
use crate::store::backend::StoreBackend;
use crate::{I64Serde, Materialized, Produced, SlidingWindows, StringSerde};
#[test]
fn sliding_windowed_cogroup_marks_store_cached() {
let b = StreamsBuilder::new();
let g1 = b.stream::<String, String>(["in1"]).group_by_key();
let g2 = b.stream::<String, String>(["in2"]).group_by_key();
g1.cogroup::<i64, _>(|_k, v: &String, acc| {
acc + i64::try_from(v.len()).unwrap_or(i64::MAX)
})
.cogroup(g2, |_k, _v: &String, acc| acc + 1)
.windowed_by_sliding(SlidingWindows::of_time_difference_with_no_grace(100))
.aggregate_explicit(
|| 0i64,
Materialized::with(StringSerde, I64Serde).as_store("cg"),
)
.to_stream()
.to_explicit(
"out",
Produced::with(TimeWindowedSerde::new(StringSerde, 100), I64Serde),
);
let built = b.build("app").unwrap();
let g =
pollster::block_on(built.instantiate(&StoreBackend::InMemory, "app", 1024)).unwrap();
check!(g.cache_owner.contains_key("cg"));
}
#[test]
fn sliding_windowed_cogroup_uncached_when_off() {
let b = StreamsBuilder::new();
let g1 = b.stream::<String, String>(["in1"]).group_by_key();
let g2 = b.stream::<String, String>(["in2"]).group_by_key();
g1.cogroup::<i64, _>(|_k, v: &String, acc| {
acc + i64::try_from(v.len()).unwrap_or(i64::MAX)
})
.cogroup(g2, |_k, _v: &String, acc| acc + 1)
.windowed_by_sliding(SlidingWindows::of_time_difference_with_no_grace(100))
.aggregate_explicit(
|| 0i64,
Materialized::with(StringSerde, I64Serde)
.as_store("cg")
.with_caching(false),
)
.to_stream()
.to_explicit(
"out",
Produced::with(TimeWindowedSerde::new(StringSerde, 100), I64Serde),
);
let built = b.build("app").unwrap();
let g =
pollster::block_on(built.instantiate(&StoreBackend::InMemory, "app", 1024)).unwrap();
check!(!g.cache_owner.contains_key("cg"));
}
}