use std::fmt;
#[derive(Debug, Clone)]
pub enum WindowType {
Tumbling { duration_ms: u64 },
Sliding { window_ms: u64, slide_ms: u64 },
Session { gap_ms: u64 },
}
impl fmt::Display for WindowType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WindowType::Tumbling { duration_ms } => write!(f, "tumbling({duration_ms}ms)"),
WindowType::Sliding {
window_ms,
slide_ms,
} => {
write!(f, "sliding({window_ms}ms, {slide_ms}ms)")
}
WindowType::Session { gap_ms } => write!(f, "session({gap_ms}ms)"),
}
}
}
#[derive(Debug)]
pub struct WindowState {
pub window_type: WindowType,
pub events: Vec<WindowEvent>,
pub window_start: u64,
pub last_event_time: u64,
}
#[derive(Debug, Clone)]
pub struct WindowEvent {
pub value: String,
pub timestamp: u64,
}
impl WindowState {
pub fn new(window_type: WindowType) -> Self {
WindowState {
window_type,
events: Vec::new(),
window_start: 0,
last_event_time: 0,
}
}
pub fn add_event(&mut self, value: String, timestamp: u64) {
if self.events.is_empty() && self.window_start == 0 && self.last_event_time == 0 {
self.window_start = timestamp;
}
self.last_event_time = timestamp;
self.events.push(WindowEvent { value, timestamp });
}
pub fn should_fire(&self, current_time: u64) -> bool {
if self.events.is_empty() {
return false;
}
match &self.window_type {
WindowType::Tumbling { duration_ms } => current_time >= self.window_start + duration_ms,
WindowType::Sliding { slide_ms, .. } => current_time >= self.window_start + slide_ms,
WindowType::Session { gap_ms } => current_time >= self.last_event_time + gap_ms,
}
}
pub fn fire(&mut self) -> Vec<WindowEvent> {
let events = std::mem::take(&mut self.events);
match &self.window_type {
WindowType::Tumbling { duration_ms } => {
self.window_start += duration_ms;
}
WindowType::Sliding { slide_ms, .. } => {
self.window_start += slide_ms;
}
WindowType::Session { .. } => {
self.window_start = 0;
self.last_event_time = 0;
}
}
events
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tumbling_window_fires_after_duration() {
let mut state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
state.add_event("a".to_string(), 0);
state.add_event("b".to_string(), 500);
assert!(!state.should_fire(500));
assert!(!state.should_fire(999));
assert!(state.should_fire(1000));
assert!(state.should_fire(1500));
let events = state.fire();
assert_eq!(events.len(), 2);
assert_eq!(events[0].value, "a");
assert_eq!(events[1].value, "b");
assert!(state.is_empty());
}
#[test]
fn test_tumbling_window_resets_after_fire() {
let mut state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
state.add_event("a".to_string(), 0);
assert!(state.should_fire(1000));
state.fire();
state.add_event("b".to_string(), 1200);
assert!(!state.should_fire(1500));
assert!(state.should_fire(2000));
}
#[test]
fn test_sliding_window_fires_on_slide() {
let mut state = WindowState::new(WindowType::Sliding {
window_ms: 1000,
slide_ms: 500,
});
state.add_event("a".to_string(), 0);
state.add_event("b".to_string(), 300);
assert!(!state.should_fire(400));
assert!(state.should_fire(500));
}
#[test]
fn test_session_window_fires_on_gap() {
let mut state = WindowState::new(WindowType::Session { gap_ms: 500 });
state.add_event("a".to_string(), 0);
state.add_event("b".to_string(), 200);
assert!(!state.should_fire(600));
assert!(state.should_fire(700));
assert!(state.should_fire(800));
let events = state.fire();
assert_eq!(events.len(), 2);
}
#[test]
fn test_empty_window_never_fires() {
let state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
assert!(!state.should_fire(5000));
}
#[test]
fn test_window_type_display() {
assert_eq!(
format!("{}", WindowType::Tumbling { duration_ms: 5000 }),
"tumbling(5000ms)"
);
assert_eq!(
format!(
"{}",
WindowType::Sliding {
window_ms: 10000,
slide_ms: 1000
}
),
"sliding(10000ms, 1000ms)"
);
assert_eq!(
format!("{}", WindowType::Session { gap_ms: 30000 }),
"session(30000ms)"
);
}
}