1use std::time::Duration;
7
8use crate::parser::{
9 EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15 Tumbling,
17 Sliding,
19 Session,
21}
22
23#[derive(Debug, Clone)]
28pub struct WindowOperatorConfig {
29 pub window_type: WindowType,
31 pub time_column: String,
33 pub size: Duration,
35 pub slide: Option<Duration>,
37 pub gap: Option<Duration>,
39 pub allowed_lateness: Duration,
41 pub emit_strategy: EmitStrategy,
43 pub late_data_side_output: Option<String>,
45}
46
47impl WindowOperatorConfig {
48 #[must_use]
50 pub fn tumbling(time_column: String, size: Duration) -> Self {
51 Self {
52 window_type: WindowType::Tumbling,
53 time_column,
54 size,
55 slide: None,
56 gap: None,
57 allowed_lateness: Duration::ZERO,
58 emit_strategy: EmitStrategy::OnWatermark,
59 late_data_side_output: None,
60 }
61 }
62
63 #[must_use]
65 pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
66 Self {
67 window_type: WindowType::Sliding,
68 time_column,
69 size,
70 slide: Some(slide),
71 gap: None,
72 allowed_lateness: Duration::ZERO,
73 emit_strategy: EmitStrategy::OnWatermark,
74 late_data_side_output: None,
75 }
76 }
77
78 #[must_use]
80 pub fn session(time_column: String, gap: Duration) -> Self {
81 Self {
82 window_type: WindowType::Session,
83 time_column,
84 size: Duration::ZERO, slide: None,
86 gap: Some(gap),
87 allowed_lateness: Duration::ZERO,
88 emit_strategy: EmitStrategy::OnWatermark,
89 late_data_side_output: None,
90 }
91 }
92
93 pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
101 let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
102 ParseError::WindowError("Cannot extract time column name".to_string())
103 })?;
104
105 match window {
106 WindowFunction::Tumble { interval, .. } => {
107 let size = WindowRewriter::parse_interval_to_duration(interval)?;
108 Ok(Self::tumbling(time_column, size))
109 }
110 WindowFunction::Hop {
111 slide_interval,
112 window_interval,
113 ..
114 } => {
115 let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
116 let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
117 Ok(Self::sliding(time_column, size, slide))
118 }
119 WindowFunction::Session { gap_interval, .. } => {
120 let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
121 Ok(Self::session(time_column, gap))
122 }
123 }
124 }
125
126 pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
132 self.emit_strategy = emit_clause.to_emit_strategy()?;
133 Ok(self)
134 }
135
136 pub fn with_late_data_clause(
142 mut self,
143 late_data_clause: &LateDataClause,
144 ) -> Result<Self, ParseError> {
145 self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
146 self.late_data_side_output
147 .clone_from(&late_data_clause.side_output);
148 Ok(self)
149 }
150
151 #[must_use]
153 pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
154 self.allowed_lateness = lateness;
155 self
156 }
157
158 #[must_use]
160 pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
161 self.emit_strategy = strategy;
162 self
163 }
164
165 #[must_use]
167 pub fn with_late_data_side_output(mut self, name: String) -> Self {
168 self.late_data_side_output = Some(name);
169 self
170 }
171
172 #[must_use]
177 pub fn is_append_only_compatible(&self) -> bool {
178 matches!(
179 self.emit_strategy,
180 EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
181 )
182 }
183
184 #[must_use]
186 pub fn has_late_data_handling(&self) -> bool {
187 self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use sqlparser::ast::{Expr, Ident};
195
196 fn make_tumble_window() -> WindowFunction {
197 WindowFunction::Tumble {
199 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
200 interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
201 }
202 }
203
204 #[test]
205 fn test_tumbling_config() {
206 let config =
207 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
208
209 assert_eq!(config.window_type, WindowType::Tumbling);
210 assert_eq!(config.time_column, "event_time");
211 assert_eq!(config.size, Duration::from_secs(300));
212 assert!(config.slide.is_none());
213 assert!(config.gap.is_none());
214 }
215
216 #[test]
217 fn test_sliding_config() {
218 let config = WindowOperatorConfig::sliding(
219 "ts".to_string(),
220 Duration::from_secs(300),
221 Duration::from_secs(60),
222 );
223
224 assert_eq!(config.window_type, WindowType::Sliding);
225 assert_eq!(config.size, Duration::from_secs(300));
226 assert_eq!(config.slide, Some(Duration::from_secs(60)));
227 }
228
229 #[test]
230 fn test_session_config() {
231 let config =
232 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
233
234 assert_eq!(config.window_type, WindowType::Session);
235 assert_eq!(config.gap, Some(Duration::from_secs(1800)));
236 }
237
238 #[test]
239 fn test_from_window_function() {
240 let window = make_tumble_window();
241 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
242
243 assert_eq!(config.window_type, WindowType::Tumbling);
244 assert_eq!(config.time_column, "event_time");
245 assert_eq!(config.size, Duration::from_secs(300));
246 }
247
248 #[test]
249 fn test_with_emit_clause() {
250 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
251
252 let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
253 assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
254
255 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
256 let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
257 assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
258 }
259
260 #[test]
261 fn test_with_late_data_clause() {
262 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
263
264 let late_clause = LateDataClause::side_output_only("late_events".to_string());
265 let config = config.with_late_data_clause(&late_clause).unwrap();
266
267 assert_eq!(
268 config.late_data_side_output,
269 Some("late_events".to_string())
270 );
271 }
272
273 #[test]
274 fn test_append_only_compatible() {
275 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
276
277 assert!(config.is_append_only_compatible());
279
280 let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
282 assert!(!config2.is_append_only_compatible());
283
284 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
286 .with_emit_strategy(EmitStrategy::Changelog);
287 assert!(!config3.is_append_only_compatible());
288 }
289
290 #[test]
291 fn test_has_late_data_handling() {
292 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
293
294 assert!(!config.has_late_data_handling());
296
297 let config2 = config
299 .clone()
300 .with_allowed_lateness(Duration::from_secs(60));
301 assert!(config2.has_late_data_handling());
302
303 let config3 = config.with_late_data_side_output("late".to_string());
305 assert!(config3.has_late_data_handling());
306 }
307}