Skip to main content

oxigdal_streaming/windowing/
sliding.rs

1//! Sliding window implementation.
2
3use super::window::{Window, WindowAssigner};
4use crate::core::stream::StreamElement;
5use crate::error::Result;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8
9/// Configuration for sliding windows.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SlidingWindowConfig {
12    /// Window size
13    pub size: Duration,
14
15    /// Slide interval
16    pub slide: Duration,
17
18    /// Window offset (for alignment)
19    pub offset: Duration,
20}
21
22impl SlidingWindowConfig {
23    /// Create a new sliding window configuration.
24    pub fn new(size: Duration, slide: Duration) -> Self {
25        Self {
26            size,
27            slide,
28            offset: Duration::zero(),
29        }
30    }
31
32    /// Set the window offset.
33    pub fn with_offset(mut self, offset: Duration) -> Self {
34        self.offset = offset;
35        self
36    }
37}
38
39/// Sliding window (fixed-size, overlapping windows).
40#[derive(Debug, Clone)]
41pub struct SlidingWindow {
42    config: SlidingWindowConfig,
43}
44
45impl SlidingWindow {
46    /// Create a new sliding window.
47    pub fn new(size: Duration, slide: Duration) -> Self {
48        Self {
49            config: SlidingWindowConfig::new(size, slide),
50        }
51    }
52
53    /// Create a new sliding window with offset.
54    pub fn with_offset(size: Duration, slide: Duration, offset: Duration) -> Self {
55        Self {
56            config: SlidingWindowConfig::new(size, slide).with_offset(offset),
57        }
58    }
59
60    /// Get all windows for a given timestamp.
61    pub fn get_windows(&self, timestamp: DateTime<Utc>) -> Result<Vec<Window>> {
62        let size_ms = self.config.size.num_milliseconds();
63        let slide_ms = self.config.slide.num_milliseconds();
64        let offset_ms = self.config.offset.num_milliseconds();
65
66        let timestamp_ms = timestamp.timestamp_millis();
67        let adjusted_timestamp = timestamp_ms - offset_ms;
68
69        let mut windows = Vec::new();
70
71        let last_start = (adjusted_timestamp / slide_ms) * slide_ms + offset_ms;
72
73        let num_windows = (size_ms + slide_ms - 1) / slide_ms;
74
75        for i in (0..num_windows).rev() {
76            let window_start_ms = last_start - i * slide_ms;
77            let window_end_ms = window_start_ms + size_ms;
78
79            if window_end_ms > timestamp_ms {
80                let start = DateTime::from_timestamp_millis(window_start_ms).ok_or_else(|| {
81                    crate::error::StreamingError::InvalidWindow(
82                        "Invalid window start timestamp".to_string(),
83                    )
84                })?;
85
86                let end = DateTime::from_timestamp_millis(window_end_ms).ok_or_else(|| {
87                    crate::error::StreamingError::InvalidWindow(
88                        "Invalid window end timestamp".to_string(),
89                    )
90                })?;
91
92                if timestamp >= start && timestamp < end {
93                    windows.push(Window::new(start, end)?);
94                }
95            }
96        }
97
98        Ok(windows)
99    }
100}
101
102/// Assigner for sliding windows.
103pub struct SlidingAssigner {
104    window: SlidingWindow,
105}
106
107impl SlidingAssigner {
108    /// Create a new sliding window assigner.
109    pub fn new(size: Duration, slide: Duration) -> Self {
110        Self {
111            window: SlidingWindow::new(size, slide),
112        }
113    }
114
115    /// Create a new sliding window assigner with offset.
116    pub fn with_offset(size: Duration, slide: Duration, offset: Duration) -> Self {
117        Self {
118            window: SlidingWindow::with_offset(size, slide, offset),
119        }
120    }
121}
122
123impl WindowAssigner for SlidingAssigner {
124    fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
125        self.window.get_windows(element.event_time)
126    }
127
128    fn assigner_type(&self) -> &str {
129        "SlidingAssigner"
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136
137    #[test]
138    fn test_sliding_window() {
139        let window = SlidingWindow::new(Duration::seconds(60), Duration::seconds(30));
140        let timestamp =
141            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
142
143        let windows = window
144            .get_windows(timestamp)
145            .expect("Sliding window calculation should succeed in test");
146        assert!(!windows.is_empty());
147
148        for w in &windows {
149            assert_eq!(w.duration(), Duration::seconds(60));
150            assert!(w.contains(&timestamp));
151        }
152    }
153
154    #[test]
155    fn test_sliding_window_overlap() {
156        let window = SlidingWindow::new(Duration::seconds(60), Duration::seconds(20));
157        let timestamp =
158            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
159
160        let windows = window
161            .get_windows(timestamp)
162            .expect("Sliding window calculation should succeed in test");
163        assert!(windows.len() > 1);
164
165        for i in 0..windows.len() - 1 {
166            assert!(windows[i].overlaps(&windows[i + 1]));
167        }
168    }
169
170    #[test]
171    fn test_sliding_assigner() {
172        let assigner = SlidingAssigner::new(Duration::seconds(60), Duration::seconds(30));
173        let elem = StreamElement::new(
174            vec![1, 2, 3],
175            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed"),
176        );
177
178        let windows = assigner
179            .assign_windows(&elem)
180            .expect("Window assignment should succeed in test");
181        assert!(!windows.is_empty());
182
183        for w in &windows {
184            assert!(w.contains(&elem.event_time));
185        }
186    }
187
188    #[test]
189    fn test_sliding_window_with_offset() {
190        let window = SlidingWindow::with_offset(
191            Duration::seconds(60),
192            Duration::seconds(30),
193            Duration::seconds(15),
194        );
195        let timestamp =
196            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
197
198        let windows = window
199            .get_windows(timestamp)
200            .expect("Sliding window calculation should succeed in test");
201        assert!(!windows.is_empty());
202    }
203}