oxigdal_streaming/windowing/
tumbling.rs1use super::window::{Window, WindowAssigner};
4use crate::core::stream::StreamElement;
5use crate::error::Result;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct TumblingWindowConfig {
12 pub size: Duration,
14
15 pub offset: Duration,
17}
18
19impl TumblingWindowConfig {
20 pub fn new(size: Duration) -> Self {
22 Self {
23 size,
24 offset: Duration::zero(),
25 }
26 }
27
28 pub fn with_offset(mut self, offset: Duration) -> Self {
30 self.offset = offset;
31 self
32 }
33}
34
35#[derive(Debug, Clone)]
37pub struct TumblingWindow {
38 config: TumblingWindowConfig,
39}
40
41impl TumblingWindow {
42 pub fn new(size: Duration) -> Self {
44 Self {
45 config: TumblingWindowConfig::new(size),
46 }
47 }
48
49 pub fn with_offset(size: Duration, offset: Duration) -> Self {
51 Self {
52 config: TumblingWindowConfig::new(size).with_offset(offset),
53 }
54 }
55
56 pub fn get_window(&self, timestamp: DateTime<Utc>) -> Result<Window> {
58 let size_ms = self.config.size.num_milliseconds();
59 let offset_ms = self.config.offset.num_milliseconds();
60
61 let timestamp_ms = timestamp.timestamp_millis();
62 let adjusted_timestamp = timestamp_ms - offset_ms;
63
64 let window_start_ms = (adjusted_timestamp / size_ms) * size_ms + offset_ms;
65 let window_end_ms = window_start_ms + size_ms;
66
67 let start = DateTime::from_timestamp_millis(window_start_ms).ok_or_else(|| {
68 crate::error::StreamingError::InvalidWindow(
69 "Invalid window start timestamp".to_string(),
70 )
71 })?;
72
73 let end = DateTime::from_timestamp_millis(window_end_ms).ok_or_else(|| {
74 crate::error::StreamingError::InvalidWindow("Invalid window end timestamp".to_string())
75 })?;
76
77 Window::new(start, end)
78 }
79}
80
81pub struct TumblingAssigner {
83 window: TumblingWindow,
84}
85
86impl TumblingAssigner {
87 pub fn new(size: Duration) -> Self {
89 Self {
90 window: TumblingWindow::new(size),
91 }
92 }
93
94 pub fn with_offset(size: Duration, offset: Duration) -> Self {
96 Self {
97 window: TumblingWindow::with_offset(size, offset),
98 }
99 }
100}
101
102impl WindowAssigner for TumblingAssigner {
103 fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
104 let window = self.window.get_window(element.event_time)?;
105 Ok(vec![window])
106 }
107
108 fn assigner_type(&self) -> &str {
109 "TumblingAssigner"
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116
117 #[test]
118 fn test_tumbling_window() {
119 let window = TumblingWindow::new(Duration::seconds(60));
120 let timestamp =
121 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
122
123 let w = window
124 .get_window(timestamp)
125 .expect("Tumbling window calculation should succeed in test");
126 assert_eq!(w.duration(), Duration::seconds(60));
127 assert!(w.contains(×tamp));
128 }
129
130 #[test]
131 fn test_tumbling_window_with_offset() {
132 let window = TumblingWindow::with_offset(Duration::seconds(60), Duration::seconds(15));
133 let timestamp =
134 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
135
136 let w = window
137 .get_window(timestamp)
138 .expect("Tumbling window calculation should succeed in test");
139 assert_eq!(w.duration(), Duration::seconds(60));
140 }
141
142 #[test]
143 fn test_tumbling_assigner() {
144 let assigner = TumblingAssigner::new(Duration::seconds(60));
145 let elem = StreamElement::new(
146 vec![1, 2, 3],
147 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed"),
148 );
149
150 let windows = assigner
151 .assign_windows(&elem)
152 .expect("Window assignment should succeed in test");
153 assert_eq!(windows.len(), 1);
154 assert!(windows[0].contains(&elem.event_time));
155 }
156
157 #[test]
158 fn test_non_overlapping_windows() {
159 let window = TumblingWindow::new(Duration::seconds(60));
160
161 let ts1 =
162 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
163 let ts2 = ts1 + Duration::seconds(70);
164
165 let w1 = window
166 .get_window(ts1)
167 .expect("Tumbling window calculation should succeed in test");
168 let w2 = window
169 .get_window(ts2)
170 .expect("Tumbling window calculation should succeed in test");
171
172 assert!(!w1.overlaps(&w2));
173 assert!(!w2.overlaps(&w1));
174 }
175}