Skip to main content

oxigdal_streaming/windowing/
window.rs

1//! Core window types and traits.
2
3use crate::core::stream::StreamElement;
4use crate::error::{Result, StreamingError};
5use chrono::{DateTime, Duration, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// A window of time for grouping events.
10#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
11pub struct Window {
12    /// Window start time (inclusive)
13    pub start: DateTime<Utc>,
14
15    /// Window end time (exclusive)
16    pub end: DateTime<Utc>,
17}
18
19impl Window {
20    /// Create a new window.
21    pub fn new(start: DateTime<Utc>, end: DateTime<Utc>) -> Result<Self> {
22        if start >= end {
23            return Err(StreamingError::InvalidWindow(
24                "Window start must be before end".to_string(),
25            ));
26        }
27        Ok(Self { start, end })
28    }
29
30    /// Get the window duration.
31    pub fn duration(&self) -> Duration {
32        self.end - self.start
33    }
34
35    /// Check if a timestamp falls within this window.
36    pub fn contains(&self, timestamp: &DateTime<Utc>) -> bool {
37        timestamp >= &self.start && timestamp < &self.end
38    }
39
40    /// Check if this window overlaps with another.
41    pub fn overlaps(&self, other: &Window) -> bool {
42        self.start < other.end && other.start < self.end
43    }
44
45    /// Merge this window with another (if they overlap).
46    pub fn merge(&self, other: &Window) -> Option<Window> {
47        if self.overlaps(other) {
48            let start = self.start.min(other.start);
49            let end = self.end.max(other.end);
50            Window::new(start, end).ok()
51        } else {
52            None
53        }
54    }
55
56    /// Get the maximum timestamp in this window.
57    pub fn max_timestamp(&self) -> DateTime<Utc> {
58        self.end - Duration::milliseconds(1)
59    }
60}
61
62/// Assigns elements to windows.
63pub trait WindowAssigner: Send + Sync {
64    /// Assign an element to one or more windows.
65    fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>>;
66
67    /// Get the window assigner type name.
68    fn assigner_type(&self) -> &str;
69}
70
71/// Result of evaluating a trigger.
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73pub enum TriggerResult {
74    /// Continue accumulating elements
75    Continue,
76
77    /// Fire the window (emit results)
78    Fire,
79
80    /// Fire and purge the window
81    FireAndPurge,
82
83    /// Purge the window without firing
84    Purge,
85}
86
87/// Determines when a window should emit results.
88pub trait WindowTrigger: Send + Sync {
89    /// Called when an element is added to a window.
90    fn on_element(
91        &mut self,
92        element: &StreamElement,
93        window: &Window,
94        state: &WindowState,
95    ) -> TriggerResult;
96
97    /// Called when processing time advances.
98    fn on_processing_time(&mut self, time: DateTime<Utc>, window: &Window) -> TriggerResult;
99
100    /// Called when event time (watermark) advances.
101    fn on_event_time(&mut self, time: DateTime<Utc>, window: &Window) -> TriggerResult;
102
103    /// Called when windows are merged.
104    fn on_merge(&mut self, window: &Window, merged_windows: &[Window]) -> TriggerResult;
105
106    /// Clear the trigger state.
107    fn clear(&mut self);
108}
109
110/// State associated with a window.
111#[derive(Debug, Clone)]
112pub struct WindowState {
113    /// Number of elements in the window
114    pub element_count: usize,
115
116    /// Total size in bytes
117    pub size_bytes: usize,
118
119    /// Earliest element timestamp
120    pub earliest_timestamp: Option<DateTime<Utc>>,
121
122    /// Latest element timestamp
123    pub latest_timestamp: Option<DateTime<Utc>>,
124
125    /// Custom state
126    pub custom: HashMap<String, Vec<u8>>,
127}
128
129impl WindowState {
130    /// Create a new empty window state.
131    pub fn new() -> Self {
132        Self {
133            element_count: 0,
134            size_bytes: 0,
135            earliest_timestamp: None,
136            latest_timestamp: None,
137            custom: HashMap::new(),
138        }
139    }
140
141    /// Update state with a new element.
142    pub fn add_element(&mut self, element: &StreamElement) {
143        self.element_count += 1;
144        self.size_bytes += element.size_bytes();
145
146        if let Some(earliest) = self.earliest_timestamp {
147            if element.event_time < earliest {
148                self.earliest_timestamp = Some(element.event_time);
149            }
150        } else {
151            self.earliest_timestamp = Some(element.event_time);
152        }
153
154        if let Some(latest) = self.latest_timestamp {
155            if element.event_time > latest {
156                self.latest_timestamp = Some(element.event_time);
157            }
158        } else {
159            self.latest_timestamp = Some(element.event_time);
160        }
161    }
162
163    /// Clear the state.
164    pub fn clear(&mut self) {
165        self.element_count = 0;
166        self.size_bytes = 0;
167        self.earliest_timestamp = None;
168        self.latest_timestamp = None;
169        self.custom.clear();
170    }
171}
172
173impl Default for WindowState {
174    fn default() -> Self {
175        Self::new()
176    }
177}
178
179/// Event-time session windows.
180pub struct EventTimeSessionWindows {
181    gap: Duration,
182}
183
184impl EventTimeSessionWindows {
185    /// Create a new event-time session windows assigner.
186    pub fn with_gap(gap: Duration) -> Self {
187        Self { gap }
188    }
189}
190
191impl WindowAssigner for EventTimeSessionWindows {
192    fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
193        let start = element.event_time;
194        let end = start + self.gap;
195        Ok(vec![Window::new(start, end)?])
196    }
197
198    fn assigner_type(&self) -> &str {
199        "EventTimeSessionWindows"
200    }
201}
202
203/// Processing-time session windows.
204pub struct ProcessingTimeSessionWindows {
205    gap: Duration,
206}
207
208impl ProcessingTimeSessionWindows {
209    /// Create a new processing-time session windows assigner.
210    pub fn with_gap(gap: Duration) -> Self {
211        Self { gap }
212    }
213}
214
215impl WindowAssigner for ProcessingTimeSessionWindows {
216    fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
217        let start = element.processing_time;
218        let end = start + self.gap;
219        Ok(vec![Window::new(start, end)?])
220    }
221
222    fn assigner_type(&self) -> &str {
223        "ProcessingTimeSessionWindows"
224    }
225}
226
227/// Event-time trigger that fires when watermark passes window end.
228pub struct EventTimeTrigger {
229    fired_windows: Vec<Window>,
230}
231
232impl EventTimeTrigger {
233    /// Create a new event-time trigger.
234    pub fn new() -> Self {
235        Self {
236            fired_windows: Vec::new(),
237        }
238    }
239}
240
241impl Default for EventTimeTrigger {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247impl WindowTrigger for EventTimeTrigger {
248    fn on_element(
249        &mut self,
250        _element: &StreamElement,
251        _window: &Window,
252        _state: &WindowState,
253    ) -> TriggerResult {
254        TriggerResult::Continue
255    }
256
257    fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &Window) -> TriggerResult {
258        TriggerResult::Continue
259    }
260
261    fn on_event_time(&mut self, time: DateTime<Utc>, window: &Window) -> TriggerResult {
262        if time >= window.end {
263            self.fired_windows.push(window.clone());
264            TriggerResult::FireAndPurge
265        } else {
266            TriggerResult::Continue
267        }
268    }
269
270    fn on_merge(&mut self, _window: &Window, _merged_windows: &[Window]) -> TriggerResult {
271        TriggerResult::Continue
272    }
273
274    fn clear(&mut self) {
275        self.fired_windows.clear();
276    }
277}
278
279/// Count-based trigger that fires after a certain number of elements.
280pub struct CountTrigger {
281    count: usize,
282}
283
284impl CountTrigger {
285    /// Create a new count trigger.
286    pub fn of(count: usize) -> Self {
287        Self { count }
288    }
289}
290
291impl WindowTrigger for CountTrigger {
292    fn on_element(
293        &mut self,
294        _element: &StreamElement,
295        _window: &Window,
296        state: &WindowState,
297    ) -> TriggerResult {
298        if state.element_count >= self.count {
299            TriggerResult::FireAndPurge
300        } else {
301            TriggerResult::Continue
302        }
303    }
304
305    fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &Window) -> TriggerResult {
306        TriggerResult::Continue
307    }
308
309    fn on_event_time(&mut self, _time: DateTime<Utc>, _window: &Window) -> TriggerResult {
310        TriggerResult::Continue
311    }
312
313    fn on_merge(&mut self, _window: &Window, _merged_windows: &[Window]) -> TriggerResult {
314        TriggerResult::Continue
315    }
316
317    fn clear(&mut self) {}
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn test_window_creation() {
326        let start = Utc::now();
327        let end = start + Duration::seconds(60);
328
329        let window = Window::new(start, end).expect("Window creation for test should succeed");
330        assert_eq!(window.start, start);
331        assert_eq!(window.end, end);
332        assert_eq!(window.duration(), Duration::seconds(60));
333    }
334
335    #[test]
336    fn test_window_contains() {
337        let start = Utc::now();
338        let end = start + Duration::seconds(60);
339        let window =
340            Window::new(start, end).expect("Window creation for contains test should succeed");
341
342        let inside = start + Duration::seconds(30);
343        let outside = end + Duration::seconds(1);
344
345        assert!(window.contains(&inside));
346        assert!(!window.contains(&outside));
347    }
348
349    #[test]
350    fn test_window_overlaps() {
351        let start1 = Utc::now();
352        let end1 = start1 + Duration::seconds(60);
353        let window1 =
354            Window::new(start1, end1).expect("Window creation for overlap test should succeed");
355
356        let start2 = start1 + Duration::seconds(30);
357        let end2 = start2 + Duration::seconds(60);
358        let window2 =
359            Window::new(start2, end2).expect("Window creation for overlap test should succeed");
360
361        assert!(window1.overlaps(&window2));
362        assert!(window2.overlaps(&window1));
363    }
364
365    #[test]
366    fn test_window_merge() {
367        let start1 = Utc::now();
368        let end1 = start1 + Duration::seconds(60);
369        let window1 =
370            Window::new(start1, end1).expect("Window creation for merge test should succeed");
371
372        let start2 = start1 + Duration::seconds(30);
373        let end2 = start2 + Duration::seconds(60);
374        let window2 =
375            Window::new(start2, end2).expect("Window creation for merge test should succeed");
376
377        let merged = window1
378            .merge(&window2)
379            .expect("Window merge should succeed in test");
380        assert_eq!(merged.start, start1);
381        assert_eq!(merged.end, end2);
382    }
383
384    #[test]
385    fn test_window_state() {
386        let mut state = WindowState::new();
387        assert_eq!(state.element_count, 0);
388
389        let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
390        state.add_element(&elem);
391
392        assert_eq!(state.element_count, 1);
393        assert!(state.earliest_timestamp.is_some());
394        assert!(state.latest_timestamp.is_some());
395    }
396}