use std::marker::PhantomData;
use async_trait::async_trait;
use crate::dsl::processors::change::Change;
use crate::dsl::processors::tuple_forwarder::TupleForwarder;
use crate::dsl::windows::{TimeWindows, Window, Windowed};
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
type Marker<T> = PhantomData<fn() -> T>;
#[allow(dead_code)]
pub(crate) struct KStreamWindowAggregateProcessor<K, V, VA, I, A> {
pub store_name: String,
pub windows: TimeWindows,
pub init: I,
pub agg: A,
pub emit: crate::dsl::emit::EmitStrategy,
pub stream_time: i64,
pub last_emitted_close: i64,
pub forwarder: TupleForwarder,
pub _pd: Marker<(K, V, VA)>,
}
#[async_trait]
impl<K, V, VA, I, A> Processor<K, V, Windowed<K>, Change<VA>>
for KStreamWindowAggregateProcessor<K, V, VA, I, A>
where
K: std::any::Any + Send + Sync + Clone,
V: Send + 'static,
VA: std::any::Any + Send + Clone,
I: Fn() -> VA + Send + 'static,
A: Fn(&K, &V, VA) -> VA + Send + 'static,
{
async fn init(&mut self, ctx: &mut ProcessorContext<'_, '_, Windowed<K>, Change<VA>>) {
self.forwarder = TupleForwarder::resolve(ctx.store_is_cached(&self.store_name));
}
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, Windowed<K>, Change<VA>>,
r: Record<K, V>,
) {
let key = r.key.expect("windowed aggregate requires a non-null key");
let size = self.windows.size_ms;
self.stream_time = self.stream_time.max(r.timestamp);
let window_close_time = self.stream_time - self.windows.grace_ms;
let rc = ctx.record_context().clone();
for ws in self.windows.windows_for(r.timestamp) {
if self.emit.is_on_close() && ws + size < self.last_emitted_close {
continue;
}
let (old, new, new_ts) = {
let store = ctx
.get_window_store::<K, VA>(&self.store_name)
.expect("window store not found");
store.set_record_context(rc.clone());
let prior = store.fetch_single(&key, ws).await;
let stored_ts = prior.as_ref().map_or(i64::MIN, |&(ts, _)| ts);
let old = prior.map(|(_ts, v)| v);
let seed = old.clone().unwrap_or_else(|| (self.init)());
let new = (self.agg)(&key, &r.value, seed);
let new_ts = std::cmp::max(r.timestamp, stored_ts);
store.put(key.clone(), ws, new.clone(), new_ts).await;
(old, new, new_ts)
};
if self.emit.is_on_update() {
self.forwarder.maybe_forward_change(
ctx,
Windowed {
key: key.clone(),
window: Window {
start: ws,
end: ws + size,
},
},
Change::update(old, new),
new_ts,
);
}
}
if self.emit.is_on_close() {
self.emit_closed_windows(ctx, window_close_time).await;
}
}
}
impl<K, V, VA, I, A> KStreamWindowAggregateProcessor<K, V, VA, I, A>
where
K: std::any::Any + Send + Sync + Clone,
V: Send + 'static,
VA: std::any::Any + Send + Clone,
I: Fn() -> VA + Send + 'static,
A: Fn(&K, &V, VA) -> VA + Send + 'static,
{
async fn emit_closed_windows(
&mut self,
ctx: &mut ProcessorContext<'_, '_, Windowed<K>, Change<VA>>,
window_close_time: i64,
) {
let size = self.windows.size_ms;
let start_to = window_close_time - size - 1;
let start_from = self.last_emitted_close.saturating_sub(size);
let mut due = {
let store = ctx
.get_window_store::<K, VA>(&self.store_name)
.expect("window store not found");
store.fetch_all_in_range(start_from, start_to).await
};
due.retain(|(_, ws, _, _)| ws + size >= self.last_emitted_close);
due.sort_by_key(|(_, ws, _, _)| *ws);
for (k, ws, ts, v) in due {
ctx.forward(Record::new(
Some(Windowed {
key: k,
window: Window {
start: ws,
end: ws + size,
},
}),
Change::update(None, v),
ts,
));
}
self.last_emitted_close = window_close_time;
}
}
#[allow(dead_code)]
pub(crate) struct KStreamWindowReduceProcessor<K, V, R> {
pub store_name: String,
pub windows: TimeWindows,
pub reducer: R,
pub emit: crate::dsl::emit::EmitStrategy,
pub stream_time: i64,
pub last_emitted_close: i64,
pub forwarder: TupleForwarder,
pub _pd: Marker<(K, V)>,
}
#[async_trait]
impl<K, V, R> Processor<K, V, Windowed<K>, Change<V>> for KStreamWindowReduceProcessor<K, V, R>
where
K: std::any::Any + Send + Sync + Clone,
V: std::any::Any + Send + Clone,
R: Fn(&V, &V) -> V + Send + 'static,
{
async fn init(&mut self, ctx: &mut ProcessorContext<'_, '_, Windowed<K>, Change<V>>) {
self.forwarder = TupleForwarder::resolve(ctx.store_is_cached(&self.store_name));
}
async fn process(
&mut self,
ctx: &mut ProcessorContext<'_, '_, Windowed<K>, Change<V>>,
r: Record<K, V>,
) {
let key = r.key.expect("windowed reduce requires a non-null key");
let size = self.windows.size_ms;
self.stream_time = self.stream_time.max(r.timestamp);
let window_close_time = self.stream_time - self.windows.grace_ms;
let rc = ctx.record_context().clone();
for ws in self.windows.windows_for(r.timestamp) {
if self.emit.is_on_close() && ws + size < self.last_emitted_close {
continue;
}
let (old, new, new_ts) = {
let store = ctx
.get_window_store::<K, V>(&self.store_name)
.expect("window store not found");
store.set_record_context(rc.clone());
let prior = store.fetch_single(&key, ws).await;
let stored_ts = prior.as_ref().map_or(i64::MIN, |&(ts, _)| ts);
let old = prior.map(|(_ts, v)| v);
let new = match &old {
None => r.value.clone(),
Some(acc) => (self.reducer)(acc, &r.value),
};
let new_ts = std::cmp::max(r.timestamp, stored_ts);
store.put(key.clone(), ws, new.clone(), new_ts).await;
(old, new, new_ts)
};
if self.emit.is_on_update() {
self.forwarder.maybe_forward_change(
ctx,
Windowed {
key: key.clone(),
window: Window {
start: ws,
end: ws + size,
},
},
Change::update(old, new),
new_ts,
);
}
}
if self.emit.is_on_close() {
self.emit_closed_windows(ctx, window_close_time).await;
}
}
}
impl<K, V, R> KStreamWindowReduceProcessor<K, V, R>
where
K: std::any::Any + Send + Sync + Clone,
V: std::any::Any + Send + Clone,
R: Fn(&V, &V) -> V + Send + 'static,
{
async fn emit_closed_windows(
&mut self,
ctx: &mut ProcessorContext<'_, '_, Windowed<K>, Change<V>>,
window_close_time: i64,
) {
let size = self.windows.size_ms;
let start_to = window_close_time - size - 1;
let start_from = self.last_emitted_close.saturating_sub(size);
let mut due = {
let store = ctx
.get_window_store::<K, V>(&self.store_name)
.expect("window store not found");
store.fetch_all_in_range(start_from, start_to).await
};
due.retain(|(_, ws, _, _)| ws + size >= self.last_emitted_close);
due.sort_by_key(|(_, ws, _, _)| *ws);
for (k, ws, ts, v) in due {
ctx.forward(Record::new(
Some(Windowed {
key: k,
window: Window {
start: ws,
end: ws + size,
},
}),
Change::update(None, v),
ts,
));
}
self.last_emitted_close = window_close_time;
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::marker::PhantomData;
use super::*;
use crate::dsl::windows::{TimeWindows, Window, Windowed};
use crate::processor::api::ProcessorContext;
use crate::processor::erased::{Dispatch, ErasedRecord};
use crate::processor::record::{Record, RecordContext};
use crate::processor::serde::{I64Serde, StringSerde};
use crate::store::registry::StoreRegistry;
use crate::store::window::WindowBytesStore;
#[tokio::test]
async fn windowed_count_tumbling_emits_per_window() {
let mut stores = StoreRegistry::default();
stores.insert(Box::new(WindowBytesStore::<String, i64>::in_memory(
"w".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"app-w-changelog".into(),
10,
)));
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
let mut proc = KStreamWindowAggregateProcessor {
store_name: "w".into(),
windows: TimeWindows::of_size(10),
init: || 0i64,
agg: |_k: &String, _v: &String, a: i64| a + 1,
emit: crate::dsl::emit::EmitStrategy::on_window_update(),
stream_time: i64::MIN,
last_emitted_close: i64::MIN,
forwarder: TupleForwarder::default(),
_pd: PhantomData::<fn() -> (String, String, i64)>,
};
macro_rules! drive {
($ts:expr) => {{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut d = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx =
ProcessorContext::<'_, '_, Windowed<String>, Change<i64>>::new(&mut d);
proc.process(&mut ctx, Record::new(Some("a".into()), "x".into(), $ts))
.await;
}};
}
drive!(3);
let (_, rec) = buffer.pop_front().unwrap();
let change = rec.value.downcast::<Change<i64>>().unwrap();
let key = rec.key.unwrap().downcast::<Windowed<String>>().unwrap();
assert_eq!(key.window, Window { start: 0, end: 10 });
assert_eq!(change.old, None);
assert_eq!(change.new, Some(1));
drive!(7);
let (_, rec2) = buffer.pop_front().unwrap();
let change2 = rec2.value.downcast::<Change<i64>>().unwrap();
assert_eq!(change2.old, Some(1));
assert_eq!(change2.new, Some(2));
drive!(12);
let (_, rec3) = buffer.pop_front().unwrap();
assert_eq!(
rec3.key
.unwrap()
.downcast::<Windowed<String>>()
.unwrap()
.window,
Window { start: 10, end: 20 }
);
assert_eq!(rec3.value.downcast::<Change<i64>>().unwrap().new, Some(1));
}
#[tokio::test]
async fn windowed_count_emit_final_emits_only_on_close() {
let mut stores = StoreRegistry::default();
stores.insert(Box::new(WindowBytesStore::<String, i64>::in_memory(
"w".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"app-w-changelog".into(),
10,
)));
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
let mut proc = KStreamWindowAggregateProcessor {
store_name: "w".into(),
windows: TimeWindows::of_size(10),
init: || 0i64,
agg: |_k: &String, _v: &String, a: i64| a + 1,
emit: crate::dsl::emit::EmitStrategy::on_window_close(),
stream_time: i64::MIN,
last_emitted_close: i64::MIN,
forwarder: TupleForwarder::default(),
_pd: PhantomData::<fn() -> (String, String, i64)>,
};
macro_rules! drive {
($ts:expr) => {{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut d = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx =
ProcessorContext::<'_, '_, Windowed<String>, Change<i64>>::new(&mut d);
proc.process(&mut ctx, Record::new(Some("a".into()), "x".into(), $ts))
.await;
}};
}
drive!(3);
assert!(buffer.is_empty(), "no emit while window [0,10) is open");
drive!(7);
assert!(
buffer.is_empty(),
"still no emit while window [0,10) is open"
);
drive!(15);
assert_eq!(buffer.len(), 1, "exactly one final emit on close");
let (_, rec) = buffer.pop_front().unwrap();
let key = rec.key.unwrap().downcast::<Windowed<String>>().unwrap();
assert_eq!(key.key, "a");
assert_eq!(key.window, Window { start: 0, end: 10 });
assert_eq!(rec.value.downcast::<Change<i64>>().unwrap().new, Some(2));
}
#[tokio::test]
async fn windowed_reduce_emit_final_emits_only_on_close() {
let mut stores = StoreRegistry::default();
stores.insert(Box::new(WindowBytesStore::<String, i64>::in_memory(
"w".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"app-w-changelog".into(),
10,
)));
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
let mut proc = KStreamWindowReduceProcessor {
store_name: "w".into(),
windows: TimeWindows::of_size(10),
reducer: |a: &i64, b: &i64| a + b,
emit: crate::dsl::emit::EmitStrategy::on_window_close(),
stream_time: i64::MIN,
last_emitted_close: i64::MIN,
forwarder: TupleForwarder::default(),
_pd: PhantomData::<fn() -> (String, i64)>,
};
macro_rules! drive {
($v:expr, $ts:expr) => {{
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut d = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores: &mut stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx =
ProcessorContext::<'_, '_, Windowed<String>, Change<i64>>::new(&mut d);
proc.process(&mut ctx, Record::new(Some("a".into()), $v, $ts))
.await;
}};
}
drive!(4i64, 3);
assert!(buffer.is_empty(), "no emit while window [0,10) is open");
drive!(6i64, 7);
assert!(
buffer.is_empty(),
"still no emit while window [0,10) is open"
);
drive!(99i64, 15);
assert_eq!(buffer.len(), 1, "exactly one final emit on close");
let (_, rec) = buffer.pop_front().unwrap();
let key = rec.key.unwrap().downcast::<Windowed<String>>().unwrap();
assert_eq!(key.key, "a");
assert_eq!(key.window, Window { start: 0, end: 10 });
let ch = rec.value.downcast::<Change<i64>>().unwrap();
assert_eq!((ch.old, ch.new), (None, Some(10)));
}
fn window_registry(cached: bool) -> StoreRegistry {
let mut stores = StoreRegistry::default();
stores.insert(Box::new(WindowBytesStore::<String, i64>::in_memory(
"w".into(),
Box::new(StringSerde),
Box::new(I64Serde),
"app-w-changelog".into(),
10,
)));
if cached {
stores.enable_cache(
"w",
std::sync::Arc::new(std::sync::Mutex::new(
crate::store::cache::named::NamedCache::new("w".into()),
)),
);
}
stores
}
async fn run_two_same_window(stores: &mut StoreRegistry) -> usize {
let children = [0usize];
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
let mut output = Vec::new();
let rc = RecordContext {
topic: "in".into(),
partition: 0,
offset: 0,
timestamp: 0,
};
let mut proc = KStreamWindowAggregateProcessor {
store_name: "w".into(),
windows: TimeWindows::of_size(10),
init: || 0i64,
agg: |_k: &String, _v: &String, a: i64| a + 1,
emit: crate::dsl::emit::EmitStrategy::on_window_update(),
stream_time: i64::MIN,
last_emitted_close: i64::MIN,
forwarder: TupleForwarder::default(),
_pd: PhantomData::<fn() -> (String, String, i64)>,
};
for ts in [3i64, 7] {
let globals = crate::runtime::global::GlobalStateManager::default();
let mut scheds = Vec::new();
let mut d = Dispatch {
buffer: &mut buffer,
children: &children,
output: &mut output,
record_ctx: &rc,
stores,
globals: &globals,
node_idx: 0,
schedules: &mut scheds,
sched_stream_time: i64::MIN,
sched_wall_clock: 0,
};
let mut ctx = ProcessorContext::<'_, '_, Windowed<String>, Change<i64>>::new(&mut d);
if ts == 3 {
proc.init(&mut ctx).await;
}
proc.process(&mut ctx, Record::new(Some("a".into()), "x".into(), ts))
.await;
}
buffer.len()
}
#[tokio::test]
async fn uncached_windowed_aggregate_forwards_each_record() {
let mut stores = window_registry(false);
assert_eq!(run_two_same_window(&mut stores).await, 2);
}
#[tokio::test]
async fn cached_windowed_aggregate_suppresses_then_flushes_one() {
let mut stores = window_registry(true);
assert_eq!(
run_two_same_window(&mut stores).await,
0,
"cached window store must suppress both immediate forwards"
);
let mut buffer: VecDeque<(usize, ErasedRecord)> = VecDeque::new();
stores
.get_mut("w")
.unwrap()
.flush_cache_into(&mut buffer, &[0])
.await;
assert_eq!(buffer.len(), 1, "flush emits exactly one deduped change");
let (_, rec) = buffer.pop_front().unwrap();
let key = rec.key.unwrap().downcast::<Windowed<String>>().unwrap();
assert_eq!(key.key, "a");
assert_eq!(key.window, Window { start: 0, end: 10 });
let ch = rec.value.downcast::<Change<i64>>().unwrap();
assert_eq!(ch.new, Some(2), "deduped to the latest window count");
}
}