Skip to main content

oxigdal_streaming/windowing/
tumbling.rs

1//! Tumbling window implementation.
2
3use super::window::{Window, WindowAssigner};
4use crate::core::stream::StreamElement;
5use crate::error::Result;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8
9/// Configuration for tumbling windows.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct TumblingWindowConfig {
12    /// Window size
13    pub size: Duration,
14
15    /// Window offset (for alignment)
16    pub offset: Duration,
17}
18
19impl TumblingWindowConfig {
20    /// Create a new tumbling window configuration.
21    pub fn new(size: Duration) -> Self {
22        Self {
23            size,
24            offset: Duration::zero(),
25        }
26    }
27
28    /// Set the window offset.
29    pub fn with_offset(mut self, offset: Duration) -> Self {
30        self.offset = offset;
31        self
32    }
33}
34
35/// Tumbling window (fixed, non-overlapping windows).
36#[derive(Debug, Clone)]
37pub struct TumblingWindow {
38    config: TumblingWindowConfig,
39}
40
41impl TumblingWindow {
42    /// Create a new tumbling window.
43    pub fn new(size: Duration) -> Self {
44        Self {
45            config: TumblingWindowConfig::new(size),
46        }
47    }
48
49    /// Create a new tumbling window with offset.
50    pub fn with_offset(size: Duration, offset: Duration) -> Self {
51        Self {
52            config: TumblingWindowConfig::new(size).with_offset(offset),
53        }
54    }
55
56    /// Get the window for a given timestamp.
57    pub fn get_window(&self, timestamp: DateTime<Utc>) -> Result<Window> {
58        let size_ms = self.config.size.num_milliseconds();
59        let offset_ms = self.config.offset.num_milliseconds();
60
61        let timestamp_ms = timestamp.timestamp_millis();
62        let adjusted_timestamp = timestamp_ms - offset_ms;
63
64        let window_start_ms = (adjusted_timestamp / size_ms) * size_ms + offset_ms;
65        let window_end_ms = window_start_ms + size_ms;
66
67        let start = DateTime::from_timestamp_millis(window_start_ms).ok_or_else(|| {
68            crate::error::StreamingError::InvalidWindow(
69                "Invalid window start timestamp".to_string(),
70            )
71        })?;
72
73        let end = DateTime::from_timestamp_millis(window_end_ms).ok_or_else(|| {
74            crate::error::StreamingError::InvalidWindow("Invalid window end timestamp".to_string())
75        })?;
76
77        Window::new(start, end)
78    }
79}
80
81/// Assigner for tumbling windows.
82pub struct TumblingAssigner {
83    window: TumblingWindow,
84}
85
86impl TumblingAssigner {
87    /// Create a new tumbling window assigner.
88    pub fn new(size: Duration) -> Self {
89        Self {
90            window: TumblingWindow::new(size),
91        }
92    }
93
94    /// Create a new tumbling window assigner with offset.
95    pub fn with_offset(size: Duration, offset: Duration) -> Self {
96        Self {
97            window: TumblingWindow::with_offset(size, offset),
98        }
99    }
100}
101
102impl WindowAssigner for TumblingAssigner {
103    fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
104        let window = self.window.get_window(element.event_time)?;
105        Ok(vec![window])
106    }
107
108    fn assigner_type(&self) -> &str {
109        "TumblingAssigner"
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    #[test]
118    fn test_tumbling_window() {
119        let window = TumblingWindow::new(Duration::seconds(60));
120        let timestamp =
121            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
122
123        let w = window
124            .get_window(timestamp)
125            .expect("Tumbling window calculation should succeed in test");
126        assert_eq!(w.duration(), Duration::seconds(60));
127        assert!(w.contains(&timestamp));
128    }
129
130    #[test]
131    fn test_tumbling_window_with_offset() {
132        let window = TumblingWindow::with_offset(Duration::seconds(60), Duration::seconds(15));
133        let timestamp =
134            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
135
136        let w = window
137            .get_window(timestamp)
138            .expect("Tumbling window calculation should succeed in test");
139        assert_eq!(w.duration(), Duration::seconds(60));
140    }
141
142    #[test]
143    fn test_tumbling_assigner() {
144        let assigner = TumblingAssigner::new(Duration::seconds(60));
145        let elem = StreamElement::new(
146            vec![1, 2, 3],
147            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed"),
148        );
149
150        let windows = assigner
151            .assign_windows(&elem)
152            .expect("Window assignment should succeed in test");
153        assert_eq!(windows.len(), 1);
154        assert!(windows[0].contains(&elem.event_time));
155    }
156
157    #[test]
158    fn test_non_overlapping_windows() {
159        let window = TumblingWindow::new(Duration::seconds(60));
160
161        let ts1 =
162            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
163        let ts2 = ts1 + Duration::seconds(70);
164
165        let w1 = window
166            .get_window(ts1)
167            .expect("Tumbling window calculation should succeed in test");
168        let w2 = window
169            .get_window(ts2)
170            .expect("Tumbling window calculation should succeed in test");
171
172        assert!(!w1.overlaps(&w2));
173        assert!(!w2.overlaps(&w1));
174    }
175}