1use std::fmt;
4
5#[derive(Debug, Clone)]
7pub enum WindowType {
8 Tumbling { duration_ms: u64 },
10 Sliding { window_ms: u64, slide_ms: u64 },
12 Session { gap_ms: u64 },
14}
15
16impl fmt::Display for WindowType {
17 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
18 match self {
19 WindowType::Tumbling { duration_ms } => write!(f, "tumbling({duration_ms}ms)"),
20 WindowType::Sliding {
21 window_ms,
22 slide_ms,
23 } => {
24 write!(f, "sliding({window_ms}ms, {slide_ms}ms)")
25 }
26 WindowType::Session { gap_ms } => write!(f, "session({gap_ms}ms)"),
27 }
28 }
29}
30
31#[derive(Debug)]
33pub struct WindowState {
34 pub window_type: WindowType,
35 pub events: Vec<WindowEvent>,
36 pub window_start: u64,
37 pub last_event_time: u64,
38}
39
40#[derive(Debug, Clone)]
42pub struct WindowEvent {
43 pub value: String,
44 pub timestamp: u64,
45}
46
47impl WindowState {
48 pub fn new(window_type: WindowType) -> Self {
49 WindowState {
50 window_type,
51 events: Vec::new(),
52 window_start: 0,
53 last_event_time: 0,
54 }
55 }
56
57 pub fn add_event(&mut self, value: String, timestamp: u64) {
59 if self.events.is_empty() && self.window_start == 0 && self.last_event_time == 0 {
60 self.window_start = timestamp;
62 }
63 self.last_event_time = timestamp;
64 self.events.push(WindowEvent { value, timestamp });
65 }
66
67 pub fn should_fire(&self, current_time: u64) -> bool {
69 if self.events.is_empty() {
70 return false;
71 }
72 match &self.window_type {
73 WindowType::Tumbling { duration_ms } => current_time >= self.window_start + duration_ms,
74 WindowType::Sliding { slide_ms, .. } => current_time >= self.window_start + slide_ms,
75 WindowType::Session { gap_ms } => current_time >= self.last_event_time + gap_ms,
76 }
77 }
78
79 pub fn fire(&mut self) -> Vec<WindowEvent> {
82 let events = std::mem::take(&mut self.events);
83 match &self.window_type {
84 WindowType::Tumbling { duration_ms } => {
85 self.window_start += duration_ms;
86 }
87 WindowType::Sliding { slide_ms, .. } => {
88 self.window_start += slide_ms;
90 }
91 WindowType::Session { .. } => {
92 self.window_start = 0;
93 self.last_event_time = 0;
94 }
95 }
96 events
97 }
98
99 pub fn len(&self) -> usize {
101 self.events.len()
102 }
103
104 pub fn is_empty(&self) -> bool {
106 self.events.is_empty()
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 #[test]
115 fn test_tumbling_window_fires_after_duration() {
116 let mut state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
117 state.add_event("a".to_string(), 0);
118 state.add_event("b".to_string(), 500);
119
120 assert!(!state.should_fire(500));
121 assert!(!state.should_fire(999));
122 assert!(state.should_fire(1000));
123 assert!(state.should_fire(1500));
124
125 let events = state.fire();
126 assert_eq!(events.len(), 2);
127 assert_eq!(events[0].value, "a");
128 assert_eq!(events[1].value, "b");
129 assert!(state.is_empty());
130 }
131
132 #[test]
133 fn test_tumbling_window_resets_after_fire() {
134 let mut state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
135 state.add_event("a".to_string(), 0);
136 assert!(state.should_fire(1000));
137 state.fire();
138
139 state.add_event("b".to_string(), 1200);
141 assert!(!state.should_fire(1500));
142 assert!(state.should_fire(2000));
143 }
144
145 #[test]
146 fn test_sliding_window_fires_on_slide() {
147 let mut state = WindowState::new(WindowType::Sliding {
148 window_ms: 1000,
149 slide_ms: 500,
150 });
151 state.add_event("a".to_string(), 0);
152 state.add_event("b".to_string(), 300);
153
154 assert!(!state.should_fire(400));
155 assert!(state.should_fire(500));
156 }
157
158 #[test]
159 fn test_session_window_fires_on_gap() {
160 let mut state = WindowState::new(WindowType::Session { gap_ms: 500 });
161 state.add_event("a".to_string(), 0);
162 state.add_event("b".to_string(), 200);
163
164 assert!(!state.should_fire(600));
165 assert!(state.should_fire(700));
166 assert!(state.should_fire(800));
167
168 let events = state.fire();
169 assert_eq!(events.len(), 2);
170 }
171
172 #[test]
173 fn test_empty_window_never_fires() {
174 let state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
175 assert!(!state.should_fire(5000));
176 }
177
178 #[test]
179 fn test_window_type_display() {
180 assert_eq!(
181 format!("{}", WindowType::Tumbling { duration_ms: 5000 }),
182 "tumbling(5000ms)"
183 );
184 assert_eq!(
185 format!(
186 "{}",
187 WindowType::Sliding {
188 window_ms: 10000,
189 slide_ms: 1000
190 }
191 ),
192 "sliding(10000ms, 1000ms)"
193 );
194 assert_eq!(
195 format!("{}", WindowType::Session { gap_ms: 30000 }),
196 "session(30000ms)"
197 );
198 }
199}