use std::collections::HashMap;
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum WindowType {
Tumbling {
size_ms: u64,
},
Sliding {
size_ms: u64,
step_ms: u64,
},
Session {
gap_ms: u64,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct WindowBound {
pub start_ms: i64,
pub end_ms: i64,
}
impl WindowBound {
pub fn new(start_ms: i64, end_ms: i64) -> Self {
Self { start_ms, end_ms }
}
pub fn contains(&self, t_ms: i64) -> bool {
t_ms >= self.start_ms && t_ms < self.end_ms
}
}
pub trait WindowAssigner {
fn assign_windows(&self, event_time_ms: i64) -> Vec<WindowBound>;
}
#[derive(Debug, Clone)]
pub struct TumblingWindowAssigner {
size_ms: u64,
}
impl TumblingWindowAssigner {
pub fn new(size_ms: u64) -> Self {
Self { size_ms }
}
}
impl WindowAssigner for TumblingWindowAssigner {
fn assign_windows(&self, event_time_ms: i64) -> Vec<WindowBound> {
let size = self.size_ms as i64;
let start = floor_div(event_time_ms, size) * size;
vec![WindowBound::new(start, start + size)]
}
}
#[derive(Debug, Clone)]
pub struct SlidingWindowAssigner {
size_ms: u64,
step_ms: u64,
}
impl SlidingWindowAssigner {
pub fn new(size_ms: u64, step_ms: u64) -> Self {
Self { size_ms, step_ms }
}
}
impl WindowAssigner for SlidingWindowAssigner {
fn assign_windows(&self, event_time_ms: i64) -> Vec<WindowBound> {
let size = self.size_ms as i64;
let step = self.step_ms as i64;
let mut windows = Vec::new();
let last_start = floor_div(event_time_ms, step) * step;
let mut start = last_start;
loop {
if start + size <= event_time_ms {
break; }
windows.push(WindowBound::new(start, start + size));
start -= step;
}
windows
}
}
#[derive(Debug, Clone)]
pub struct SessionWindowAssigner {
gap_ms: u64,
sessions: HashMap<String, (i64, i64)>,
}
impl SessionWindowAssigner {
pub fn new(gap_ms: u64) -> Self {
Self {
gap_ms,
sessions: HashMap::new(),
}
}
pub fn assign_keyed(&mut self, key: &str, event_time_ms: i64) -> WindowBound {
let gap = self.gap_ms as i64;
let entry = self.sessions.entry(key.to_string());
let (session_start, last_seen) = entry.or_insert((event_time_ms, event_time_ms));
if event_time_ms - *last_seen >= gap {
*session_start = event_time_ms;
}
*last_seen = event_time_ms;
let end = *last_seen + gap;
WindowBound::new(*session_start, end)
}
}
impl WindowAssigner for SessionWindowAssigner {
fn assign_windows(&self, _event_time_ms: i64) -> Vec<WindowBound> {
Vec::new()
}
}
#[derive(Debug, Default)]
pub struct WindowBuffer<V> {
data: HashMap<WindowBound, Vec<V>>,
}
impl<V> WindowBuffer<V> {
pub fn new() -> Self {
Self {
data: HashMap::new(),
}
}
pub fn insert(&mut self, bounds: Vec<WindowBound>, value: V)
where
V: Clone,
{
for bound in bounds {
self.data.entry(bound).or_default().push(value.clone());
}
}
pub fn collect_expired(&mut self, watermark_ms: i64) -> Vec<(WindowBound, Vec<V>)> {
let expired_keys: Vec<WindowBound> = self
.data
.keys()
.filter(|b| b.end_ms <= watermark_ms)
.cloned()
.collect();
expired_keys
.into_iter()
.filter_map(|key| self.data.remove(&key).map(|v| (key, v)))
.collect()
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
pub struct WindowAggregator<V, A> {
buffer: WindowBuffer<V>,
aggregate_fn: Box<dyn Fn(&[V]) -> A + Send + Sync>,
current_watermark: i64,
}
impl<V: Clone, A> WindowAggregator<V, A> {
pub fn tumbling<F>(size_ms: u64, aggregate_fn: F) -> Self
where
F: Fn(&[V]) -> A + Send + Sync + 'static,
{
Self {
buffer: WindowBuffer::new(),
aggregate_fn: Box::new(aggregate_fn),
current_watermark: i64::MIN,
}
}
pub fn sliding<F>(size_ms: u64, step_ms: u64, aggregate_fn: F) -> Self
where
F: Fn(&[V]) -> A + Send + Sync + 'static,
{
let _ = (size_ms, step_ms); Self {
buffer: WindowBuffer::new(),
aggregate_fn: Box::new(aggregate_fn),
current_watermark: i64::MIN,
}
}
pub fn process_with_bounds(&mut self, bounds: Vec<WindowBound>, value: V) {
self.buffer.insert(bounds, value);
}
pub fn process_tumbling(&mut self, size_ms: u64, event_time_ms: i64, value: V) {
let assigner = TumblingWindowAssigner::new(size_ms);
let bounds = assigner.assign_windows(event_time_ms);
self.buffer.insert(bounds, value);
}
pub fn advance_watermark(&mut self, watermark_ms: i64) -> Vec<(WindowBound, A)> {
if watermark_ms > self.current_watermark {
self.current_watermark = watermark_ms;
}
let expired = self.buffer.collect_expired(watermark_ms);
expired
.into_iter()
.map(|(bound, values)| {
let agg = (self.aggregate_fn)(&values);
(bound, agg)
})
.collect()
}
pub fn current_watermark(&self) -> i64 {
self.current_watermark
}
}
fn floor_div(a: i64, b: i64) -> i64 {
let d = a / b;
let r = a % b;
if (r != 0) && ((r < 0) != (b < 0)) {
d - 1
} else {
d
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tumbling_event_at_500_size_1000() {
let assigner = TumblingWindowAssigner::new(1000);
let bounds = assigner.assign_windows(500);
assert_eq!(bounds.len(), 1);
assert_eq!(bounds[0].start_ms, 0);
assert_eq!(bounds[0].end_ms, 1000);
}
#[test]
fn test_tumbling_event_at_1500_size_1000() {
let assigner = TumblingWindowAssigner::new(1000);
let bounds = assigner.assign_windows(1500);
assert_eq!(bounds.len(), 1);
assert_eq!(bounds[0].start_ms, 1000);
assert_eq!(bounds[0].end_ms, 2000);
}
#[test]
fn test_tumbling_event_exactly_at_boundary() {
let assigner = TumblingWindowAssigner::new(1000);
let bounds = assigner.assign_windows(1000);
assert_eq!(bounds[0].start_ms, 1000);
assert_eq!(bounds[0].end_ms, 2000);
}
#[test]
fn test_sliding_window_overlapping_count() {
let assigner = SlidingWindowAssigner::new(1000, 500);
let bounds = assigner.assign_windows(800);
assert!(
bounds.len() >= 2,
"Expected ≥ 2 overlapping windows, got {}",
bounds.len()
);
}
#[test]
fn test_window_buffer_insert_and_collect_expired() {
let mut buf: WindowBuffer<i32> = WindowBuffer::new();
let b1 = WindowBound::new(0, 1000);
let b2 = WindowBound::new(1000, 2000);
buf.insert(vec![b1.clone()], 10);
buf.insert(vec![b1.clone()], 20);
buf.insert(vec![b2.clone()], 30);
let expired = buf.collect_expired(1000);
assert_eq!(expired.len(), 1);
let (bound, values) = &expired[0];
assert_eq!(*bound, b1);
assert_eq!(values.len(), 2);
assert_eq!(buf.len(), 1);
}
#[test]
fn test_window_aggregator_events_in_order() {
let mut agg: WindowAggregator<f64, f64> =
WindowAggregator::tumbling(1000, |vals| vals.iter().sum());
agg.process_tumbling(1000, 100, 1.0);
agg.process_tumbling(1000, 200, 2.0);
agg.process_tumbling(1000, 900, 3.0);
agg.process_tumbling(1000, 1500, 10.0);
let results = agg.advance_watermark(1000);
assert_eq!(results.len(), 1);
let (bound, sum) = &results[0];
assert_eq!(*bound, WindowBound::new(0, 1000));
assert!((sum - 6.0).abs() < 1e-9, "Expected sum=6, got {sum}");
}
}