oxigdal_streaming/windowing/
window.rs1use crate::core::stream::StreamElement;
4use crate::error::{Result, StreamingError};
5use chrono::{DateTime, Duration, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
11pub struct Window {
12 pub start: DateTime<Utc>,
14
15 pub end: DateTime<Utc>,
17}
18
19impl Window {
20 pub fn new(start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Self> {
22 if start >= end {
23 return Err(StreamingError::InvalidWindow(
24 "Window start must be before end".to_string(),
25 ));
26 }
27 Ok(Self { start, end })
28 }
29
30 pub fn duration(&self) -> Duration {
32 self.end - self.start
33 }
34
35 pub fn contains(&self, timestamp: &DateTime<Utc>) -> bool {
37 timestamp >= &self.start && timestamp < &self.end
38 }
39
40 pub fn overlaps(&self, other: &Window) -> bool {
42 self.start < other.end && other.start < self.end
43 }
44
45 pub fn merge(&self, other: &Window) -> Option<Window> {
47 if self.overlaps(other) {
48 let start = self.start.min(other.start);
49 let end = self.end.max(other.end);
50 Window::new(start, end).ok()
51 } else {
52 None
53 }
54 }
55
56 pub fn max_timestamp(&self) -> DateTime<Utc> {
58 self.end - Duration::milliseconds(1)
59 }
60}
61
62pub trait WindowAssigner: Send + Sync {
64 fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>>;
66
67 fn assigner_type(&self) -> &str;
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum TriggerResult {
74 Continue,
76
77 Fire,
79
80 FireAndPurge,
82
83 Purge,
85}
86
87pub trait WindowTrigger: Send + Sync {
89 fn on_element(
91 &mut self,
92 element: &StreamElement,
93 window: &Window,
94 state: &WindowState,
95 ) -> TriggerResult;
96
97 fn on_processing_time(&mut self, time: DateTime<Utc>, window: &Window) -> TriggerResult;
99
100 fn on_event_time(&mut self, time: DateTime<Utc>, window: &Window) -> TriggerResult;
102
103 fn on_merge(&mut self, window: &Window, merged_windows: &[Window]) -> TriggerResult;
105
106 fn clear(&mut self);
108}
109
110#[derive(Debug, Clone)]
112pub struct WindowState {
113 pub element_count: usize,
115
116 pub size_bytes: usize,
118
119 pub earliest_timestamp: Option<DateTime<Utc>>,
121
122 pub latest_timestamp: Option<DateTime<Utc>>,
124
125 pub custom: HashMap<String, Vec<u8>>,
127}
128
129impl WindowState {
130 pub fn new() -> Self {
132 Self {
133 element_count: 0,
134 size_bytes: 0,
135 earliest_timestamp: None,
136 latest_timestamp: None,
137 custom: HashMap::new(),
138 }
139 }
140
141 pub fn add_element(&mut self, element: &StreamElement) {
143 self.element_count += 1;
144 self.size_bytes += element.size_bytes();
145
146 if let Some(earliest) = self.earliest_timestamp {
147 if element.event_time < earliest {
148 self.earliest_timestamp = Some(element.event_time);
149 }
150 } else {
151 self.earliest_timestamp = Some(element.event_time);
152 }
153
154 if let Some(latest) = self.latest_timestamp {
155 if element.event_time > latest {
156 self.latest_timestamp = Some(element.event_time);
157 }
158 } else {
159 self.latest_timestamp = Some(element.event_time);
160 }
161 }
162
163 pub fn clear(&mut self) {
165 self.element_count = 0;
166 self.size_bytes = 0;
167 self.earliest_timestamp = None;
168 self.latest_timestamp = None;
169 self.custom.clear();
170 }
171}
172
173impl Default for WindowState {
174 fn default() -> Self {
175 Self::new()
176 }
177}
178
179pub struct EventTimeSessionWindows {
181 gap: Duration,
182}
183
184impl EventTimeSessionWindows {
185 pub fn with_gap(gap: Duration) -> Self {
187 Self { gap }
188 }
189}
190
191impl WindowAssigner for EventTimeSessionWindows {
192 fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
193 let start = element.event_time;
194 let end = start + self.gap;
195 Ok(vec![Window::new(start, end)?])
196 }
197
198 fn assigner_type(&self) -> &str {
199 "EventTimeSessionWindows"
200 }
201}
202
203pub struct ProcessingTimeSessionWindows {
205 gap: Duration,
206}
207
208impl ProcessingTimeSessionWindows {
209 pub fn with_gap(gap: Duration) -> Self {
211 Self { gap }
212 }
213}
214
215impl WindowAssigner for ProcessingTimeSessionWindows {
216 fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
217 let start = element.processing_time;
218 let end = start + self.gap;
219 Ok(vec![Window::new(start, end)?])
220 }
221
222 fn assigner_type(&self) -> &str {
223 "ProcessingTimeSessionWindows"
224 }
225}
226
227pub struct EventTimeTrigger {
229 fired_windows: Vec<Window>,
230}
231
232impl EventTimeTrigger {
233 pub fn new() -> Self {
235 Self {
236 fired_windows: Vec::new(),
237 }
238 }
239}
240
241impl Default for EventTimeTrigger {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247impl WindowTrigger for EventTimeTrigger {
248 fn on_element(
249 &mut self,
250 _element: &StreamElement,
251 _window: &Window,
252 _state: &WindowState,
253 ) -> TriggerResult {
254 TriggerResult::Continue
255 }
256
257 fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &Window) -> TriggerResult {
258 TriggerResult::Continue
259 }
260
261 fn on_event_time(&mut self, time: DateTime<Utc>, window: &Window) -> TriggerResult {
262 if time >= window.end {
263 self.fired_windows.push(window.clone());
264 TriggerResult::FireAndPurge
265 } else {
266 TriggerResult::Continue
267 }
268 }
269
270 fn on_merge(&mut self, _window: &Window, _merged_windows: &[Window]) -> TriggerResult {
271 TriggerResult::Continue
272 }
273
274 fn clear(&mut self) {
275 self.fired_windows.clear();
276 }
277}
278
279pub struct CountTrigger {
281 count: usize,
282}
283
284impl CountTrigger {
285 pub fn of(count: usize) -> Self {
287 Self { count }
288 }
289}
290
291impl WindowTrigger for CountTrigger {
292 fn on_element(
293 &mut self,
294 _element: &StreamElement,
295 _window: &Window,
296 state: &WindowState,
297 ) -> TriggerResult {
298 if state.element_count >= self.count {
299 TriggerResult::FireAndPurge
300 } else {
301 TriggerResult::Continue
302 }
303 }
304
305 fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &Window) -> TriggerResult {
306 TriggerResult::Continue
307 }
308
309 fn on_event_time(&mut self, _time: DateTime<Utc>, _window: &Window) -> TriggerResult {
310 TriggerResult::Continue
311 }
312
313 fn on_merge(&mut self, _window: &Window, _merged_windows: &[Window]) -> TriggerResult {
314 TriggerResult::Continue
315 }
316
317 fn clear(&mut self) {}
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn test_window_creation() {
326 let start = Utc::now();
327 let end = start + Duration::seconds(60);
328
329 let window = Window::new(start, end).expect("Window creation for test should succeed");
330 assert_eq!(window.start, start);
331 assert_eq!(window.end, end);
332 assert_eq!(window.duration(), Duration::seconds(60));
333 }
334
335 #[test]
336 fn test_window_contains() {
337 let start = Utc::now();
338 let end = start + Duration::seconds(60);
339 let window =
340 Window::new(start, end).expect("Window creation for contains test should succeed");
341
342 let inside = start + Duration::seconds(30);
343 let outside = end + Duration::seconds(1);
344
345 assert!(window.contains(&inside));
346 assert!(!window.contains(&outside));
347 }
348
349 #[test]
350 fn test_window_overlaps() {
351 let start1 = Utc::now();
352 let end1 = start1 + Duration::seconds(60);
353 let window1 =
354 Window::new(start1, end1).expect("Window creation for overlap test should succeed");
355
356 let start2 = start1 + Duration::seconds(30);
357 let end2 = start2 + Duration::seconds(60);
358 let window2 =
359 Window::new(start2, end2).expect("Window creation for overlap test should succeed");
360
361 assert!(window1.overlaps(&window2));
362 assert!(window2.overlaps(&window1));
363 }
364
365 #[test]
366 fn test_window_merge() {
367 let start1 = Utc::now();
368 let end1 = start1 + Duration::seconds(60);
369 let window1 =
370 Window::new(start1, end1).expect("Window creation for merge test should succeed");
371
372 let start2 = start1 + Duration::seconds(30);
373 let end2 = start2 + Duration::seconds(60);
374 let window2 =
375 Window::new(start2, end2).expect("Window creation for merge test should succeed");
376
377 let merged = window1
378 .merge(&window2)
379 .expect("Window merge should succeed in test");
380 assert_eq!(merged.start, start1);
381 assert_eq!(merged.end, end2);
382 }
383
384 #[test]
385 fn test_window_state() {
386 let mut state = WindowState::new();
387 assert_eq!(state.element_count, 0);
388
389 let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
390 state.add_element(&elem);
391
392 assert_eq!(state.element_count, 1);
393 assert!(state.earliest_timestamp.is_some());
394 assert!(state.latest_timestamp.is_some());
395 }
396}