oxigdal_streaming/windowing/
sliding.rs1use super::window::{Window, WindowAssigner};
4use crate::core::stream::StreamElement;
5use crate::error::Result;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SlidingWindowConfig {
12 pub size: Duration,
14
15 pub slide: Duration,
17
18 pub offset: Duration,
20}
21
22impl SlidingWindowConfig {
23 pub fn new(size: Duration, slide: Duration) -> Self {
25 Self {
26 size,
27 slide,
28 offset: Duration::zero(),
29 }
30 }
31
32 pub fn with_offset(mut self, offset: Duration) -> Self {
34 self.offset = offset;
35 self
36 }
37}
38
39#[derive(Debug, Clone)]
41pub struct SlidingWindow {
42 config: SlidingWindowConfig,
43}
44
45impl SlidingWindow {
46 pub fn new(size: Duration, slide: Duration) -> Self {
48 Self {
49 config: SlidingWindowConfig::new(size, slide),
50 }
51 }
52
53 pub fn with_offset(size: Duration, slide: Duration, offset: Duration) -> Self {
55 Self {
56 config: SlidingWindowConfig::new(size, slide).with_offset(offset),
57 }
58 }
59
60 pub fn get_windows(&self, timestamp: DateTime<Utc>) -> Result<Vec<Window>> {
62 let size_ms = self.config.size.num_milliseconds();
63 let slide_ms = self.config.slide.num_milliseconds();
64 let offset_ms = self.config.offset.num_milliseconds();
65
66 let timestamp_ms = timestamp.timestamp_millis();
67 let adjusted_timestamp = timestamp_ms - offset_ms;
68
69 let mut windows = Vec::new();
70
71 let last_start = (adjusted_timestamp / slide_ms) * slide_ms + offset_ms;
72
73 let num_windows = (size_ms + slide_ms - 1) / slide_ms;
74
75 for i in (0..num_windows).rev() {
76 let window_start_ms = last_start - i * slide_ms;
77 let window_end_ms = window_start_ms + size_ms;
78
79 if window_end_ms > timestamp_ms {
80 let start = DateTime::from_timestamp_millis(window_start_ms).ok_or_else(|| {
81 crate::error::StreamingError::InvalidWindow(
82 "Invalid window start timestamp".to_string(),
83 )
84 })?;
85
86 let end = DateTime::from_timestamp_millis(window_end_ms).ok_or_else(|| {
87 crate::error::StreamingError::InvalidWindow(
88 "Invalid window end timestamp".to_string(),
89 )
90 })?;
91
92 if timestamp >= start && timestamp < end {
93 windows.push(Window::new(start, end)?);
94 }
95 }
96 }
97
98 Ok(windows)
99 }
100}
101
102pub struct SlidingAssigner {
104 window: SlidingWindow,
105}
106
107impl SlidingAssigner {
108 pub fn new(size: Duration, slide: Duration) -> Self {
110 Self {
111 window: SlidingWindow::new(size, slide),
112 }
113 }
114
115 pub fn with_offset(size: Duration, slide: Duration, offset: Duration) -> Self {
117 Self {
118 window: SlidingWindow::with_offset(size, slide, offset),
119 }
120 }
121}
122
123impl WindowAssigner for SlidingAssigner {
124 fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
125 self.window.get_windows(element.event_time)
126 }
127
128 fn assigner_type(&self) -> &str {
129 "SlidingAssigner"
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn test_sliding_window() {
139 let window = SlidingWindow::new(Duration::seconds(60), Duration::seconds(30));
140 let timestamp =
141 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
142
143 let windows = window
144 .get_windows(timestamp)
145 .expect("Sliding window calculation should succeed in test");
146 assert!(!windows.is_empty());
147
148 for w in &windows {
149 assert_eq!(w.duration(), Duration::seconds(60));
150 assert!(w.contains(×tamp));
151 }
152 }
153
154 #[test]
155 fn test_sliding_window_overlap() {
156 let window = SlidingWindow::new(Duration::seconds(60), Duration::seconds(20));
157 let timestamp =
158 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
159
160 let windows = window
161 .get_windows(timestamp)
162 .expect("Sliding window calculation should succeed in test");
163 assert!(windows.len() > 1);
164
165 for i in 0..windows.len() - 1 {
166 assert!(windows[i].overlaps(&windows[i + 1]));
167 }
168 }
169
170 #[test]
171 fn test_sliding_assigner() {
172 let assigner = SlidingAssigner::new(Duration::seconds(60), Duration::seconds(30));
173 let elem = StreamElement::new(
174 vec![1, 2, 3],
175 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed"),
176 );
177
178 let windows = assigner
179 .assign_windows(&elem)
180 .expect("Window assignment should succeed in test");
181 assert!(!windows.is_empty());
182
183 for w in &windows {
184 assert!(w.contains(&elem.event_time));
185 }
186 }
187
188 #[test]
189 fn test_sliding_window_with_offset() {
190 let window = SlidingWindow::with_offset(
191 Duration::seconds(60),
192 Duration::seconds(30),
193 Duration::seconds(15),
194 );
195 let timestamp =
196 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
197
198 let windows = window
199 .get_windows(timestamp)
200 .expect("Sliding window calculation should succeed in test");
201 assert!(!windows.is_empty());
202 }
203}