use std::collections::VecDeque;
#[derive(Debug, Clone)]
pub enum WindowSpec {
Tumbling(f64),
Sliding {
size: f64,
step: f64,
},
Session {
gap: f64,
},
}
#[derive(Debug, Clone)]
pub struct Record<T: Clone> {
pub timestamp: f64,
pub key: Option<String>,
pub value: T,
}
impl<T: Clone> Record<T> {
pub fn new(timestamp: f64, value: T) -> Self {
Self {
timestamp,
key: None,
value,
}
}
pub fn with_key(mut self, key: String) -> Self {
self.key = Some(key);
self
}
}
pub struct TumblingWindow<T: Clone + Send> {
pub window_size: f64,
buffer: VecDeque<Record<T>>,
window_start: Option<f64>,
pub windows_emitted: usize,
}
impl<T: Clone + Send> TumblingWindow<T> {
pub fn new(window_size: f64) -> Self {
assert!(window_size > 0.0, "window_size must be positive");
Self {
window_size,
buffer: VecDeque::new(),
window_start: None,
windows_emitted: 0,
}
}
pub fn add(&mut self, record: Record<T>) -> Option<Vec<Record<T>>> {
let ws = *self.window_start.get_or_insert(record.timestamp);
let window_end = ws + self.window_size;
if record.timestamp >= window_end {
let window: Vec<Record<T>> = self.buffer.drain(..).collect();
self.window_start = Some(record.timestamp);
self.buffer.push_back(record);
self.windows_emitted += 1;
if !window.is_empty() {
Some(window)
} else {
None
}
} else {
self.buffer.push_back(record);
None
}
}
pub fn flush(&mut self) -> Option<Vec<Record<T>>> {
if !self.buffer.is_empty() {
let window: Vec<Record<T>> = self.buffer.drain(..).collect();
self.windows_emitted += 1;
Some(window)
} else {
None
}
}
pub fn buffer_size(&self) -> usize {
self.buffer.len()
}
}
pub struct SlidingWindow<T: Clone + Send> {
pub size: f64,
pub step: f64,
buffer: VecDeque<Record<T>>,
next_emit: Option<f64>,
pub windows_emitted: usize,
}
impl<T: Clone + Send> SlidingWindow<T> {
pub fn new(size: f64, step: f64) -> Self {
assert!(size > 0.0, "size must be positive");
assert!(step > 0.0, "step must be positive");
Self {
size,
step,
buffer: VecDeque::new(),
next_emit: None,
windows_emitted: 0,
}
}
pub fn add(&mut self, record: Record<T>) -> Option<Vec<Record<T>>> {
let next_emit = self
.next_emit
.get_or_insert_with(|| record.timestamp + self.step);
self.buffer.push_back(record.clone());
if record.timestamp >= *next_emit {
let emit_time = *next_emit;
let window_start = emit_time - self.size;
let window: Vec<Record<T>> = self
.buffer
.iter()
.filter(|r| r.timestamp >= window_start && r.timestamp <= emit_time)
.cloned()
.collect();
let evict_before = window_start - self.step;
while self
.buffer
.front()
.map_or(false, |r| r.timestamp < evict_before)
{
self.buffer.pop_front();
}
*self.next_emit.as_mut().expect("next_emit set above") += self.step;
self.windows_emitted += 1;
if !window.is_empty() {
Some(window)
} else {
None
}
} else {
None
}
}
pub fn buffer_size(&self) -> usize {
self.buffer.len()
}
}
pub struct SessionWindow<T: Clone + Send> {
pub gap: f64,
current_session: Vec<Record<T>>,
last_timestamp: Option<f64>,
pub sessions_emitted: usize,
}
impl<T: Clone + Send> SessionWindow<T> {
pub fn new(gap: f64) -> Self {
assert!(gap > 0.0, "gap must be positive");
Self {
gap,
current_session: Vec::new(),
last_timestamp: None,
sessions_emitted: 0,
}
}
pub fn add(&mut self, record: Record<T>) -> Option<Vec<Record<T>>> {
let result = if let Some(last_ts) = self.last_timestamp {
if record.timestamp - last_ts > self.gap && !self.current_session.is_empty() {
let session = std::mem::take(&mut self.current_session);
self.sessions_emitted += 1;
Some(session)
} else {
None
}
} else {
None
};
self.last_timestamp = Some(record.timestamp);
self.current_session.push(record);
result
}
pub fn flush(&mut self) -> Option<Vec<Record<T>>> {
if !self.current_session.is_empty() {
let session = std::mem::take(&mut self.current_session);
self.sessions_emitted += 1;
Some(session)
} else {
None
}
}
pub fn current_session_size(&self) -> usize {
self.current_session.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tumbling_window_emits_correct_windows() {
let mut tw = TumblingWindow::new(10.0);
for i in 0..10u64 {
let r = Record::new(i as f64, i as f64);
let out = tw.add(r);
if i < 9 {
assert!(out.is_none(), "No window yet at t={}", i);
}
}
let r = Record::new(10.0, 10.0_f64);
let window = tw.add(r);
assert!(window.is_some());
let window = window.expect("window should be present");
assert_eq!(window.len(), 10);
let final_window = tw.flush();
assert!(final_window.is_some());
let final_window = final_window.expect("final window should be present");
assert_eq!(final_window.len(), 1); }
#[test]
fn test_sliding_window_overlap() {
let mut sw = SlidingWindow::new(10.0, 5.0);
let mut windows_seen = 0usize;
for i in 0..20u64 {
let r = Record::new(i as f64, i as f64);
if let Some(w) = sw.add(r) {
windows_seen += 1;
let max_ts = w.iter().map(|r| r.timestamp as u64).max().unwrap_or(0);
let min_ts = w.iter().map(|r| r.timestamp as u64).min().unwrap_or(0);
assert!(max_ts - min_ts <= 10, "Window too wide: {} to {}", min_ts, max_ts);
}
}
assert!(windows_seen >= 2, "Expected multiple windows, got {}", windows_seen);
}
#[test]
fn test_session_window_gap_detection() {
let mut sw = SessionWindow::new(5.0);
for i in 0..5u64 {
let r = Record::new(i as f64, i as f64);
let out = sw.add(r);
assert!(out.is_none(), "No session closed yet at t={}", i);
}
let r = Record::new(10.0, 10.0_f64);
let session = sw.add(r);
assert!(session.is_some(), "Session should have been emitted");
let session = session.expect("session should be present");
assert_eq!(session.len(), 5);
let final_session = sw.flush();
assert!(final_session.is_some());
let final_session = final_session.expect("final session should be present");
assert_eq!(final_session.len(), 1);
assert_eq!(sw.sessions_emitted, 2);
}
}