1use std::time::Duration;
7
8use crate::parser::{
9 EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15 Tumbling,
17 Sliding,
19 Session,
21}
22
23#[derive(Debug, Clone)]
35pub struct WindowOperatorConfig {
36 pub window_type: WindowType,
38 pub time_column: String,
40 pub size: Duration,
42 pub slide: Option<Duration>,
44 pub gap: Option<Duration>,
46 pub allowed_lateness: Duration,
48 pub emit_strategy: EmitStrategy,
50 pub late_data_side_output: Option<String>,
52}
53
54fn format_duration(d: Duration) -> String {
56 let secs = d.as_secs();
57 if secs == 0 {
58 return format!("{}ms", d.as_millis());
59 }
60 if secs.is_multiple_of(3600) {
61 format!("{}h", secs / 3600)
62 } else if secs.is_multiple_of(60) {
63 format!("{}m", secs / 60)
64 } else {
65 format!("{secs}s")
66 }
67}
68
69impl std::fmt::Display for WindowType {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match self {
72 WindowType::Tumbling => write!(f, "TUMBLE"),
73 WindowType::Sliding => write!(f, "HOP"),
74 WindowType::Session => write!(f, "SESSION"),
75 }
76 }
77}
78
79impl std::fmt::Display for WindowOperatorConfig {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self.window_type {
82 WindowType::Tumbling => {
83 write!(
84 f,
85 "TUMBLE({}, {})",
86 self.time_column,
87 format_duration(self.size)
88 )
89 }
90 WindowType::Sliding => {
91 let slide = self.slide.unwrap_or(self.size);
92 write!(
93 f,
94 "HOP({}, {} SLIDE {})",
95 self.time_column,
96 format_duration(self.size),
97 format_duration(slide)
98 )
99 }
100 WindowType::Session => {
101 let gap = self.gap.unwrap_or(Duration::ZERO);
102 write!(
103 f,
104 "SESSION({}, GAP {})",
105 self.time_column,
106 format_duration(gap)
107 )
108 }
109 }
110 }
111}
112
113impl WindowOperatorConfig {
114 #[must_use]
116 pub fn tumbling(time_column: String, size: Duration) -> Self {
117 Self {
118 window_type: WindowType::Tumbling,
119 time_column,
120 size,
121 slide: None,
122 gap: None,
123 allowed_lateness: Duration::ZERO,
124 emit_strategy: EmitStrategy::OnWatermark,
125 late_data_side_output: None,
126 }
127 }
128
129 #[must_use]
131 pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
132 Self {
133 window_type: WindowType::Sliding,
134 time_column,
135 size,
136 slide: Some(slide),
137 gap: None,
138 allowed_lateness: Duration::ZERO,
139 emit_strategy: EmitStrategy::OnWatermark,
140 late_data_side_output: None,
141 }
142 }
143
144 #[must_use]
146 pub fn session(time_column: String, gap: Duration) -> Self {
147 Self {
148 window_type: WindowType::Session,
149 time_column,
150 size: Duration::ZERO, slide: None,
152 gap: Some(gap),
153 allowed_lateness: Duration::ZERO,
154 emit_strategy: EmitStrategy::OnWatermark,
155 late_data_side_output: None,
156 }
157 }
158
159 pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
167 let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
168 ParseError::WindowError("Cannot extract time column name".to_string())
169 })?;
170
171 match window {
172 WindowFunction::Tumble { interval, .. } => {
173 let size = WindowRewriter::parse_interval_to_duration(interval)?;
174 Ok(Self::tumbling(time_column, size))
175 }
176 WindowFunction::Hop {
177 slide_interval,
178 window_interval,
179 ..
180 } => {
181 let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
182 let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
183 Ok(Self::sliding(time_column, size, slide))
184 }
185 WindowFunction::Session { gap_interval, .. } => {
186 let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
187 Ok(Self::session(time_column, gap))
188 }
189 }
190 }
191
192 pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
198 self.emit_strategy = emit_clause.to_emit_strategy()?;
199 Ok(self)
200 }
201
202 pub fn with_late_data_clause(
208 mut self,
209 late_data_clause: &LateDataClause,
210 ) -> Result<Self, ParseError> {
211 self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
212 self.late_data_side_output
213 .clone_from(&late_data_clause.side_output);
214 Ok(self)
215 }
216
217 #[must_use]
219 pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
220 self.allowed_lateness = lateness;
221 self
222 }
223
224 #[must_use]
226 pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
227 self.emit_strategy = strategy;
228 self
229 }
230
231 #[must_use]
233 pub fn with_late_data_side_output(mut self, name: String) -> Self {
234 self.late_data_side_output = Some(name);
235 self
236 }
237
238 pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
253 if matches!(
254 self.emit_strategy,
255 EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
256 ) {
257 if !has_watermark {
258 return Err(ParseError::WindowError(
259 "EMIT ON WINDOW CLOSE requires a watermark definition \
260 on the source. Add WATERMARK FOR <column> AS <expr> \
261 to the CREATE SOURCE statement."
262 .to_string(),
263 ));
264 }
265 if !has_window {
266 return Err(ParseError::WindowError(
267 "EMIT ON WINDOW CLOSE is only valid with windowed \
268 aggregation queries. Use EMIT ON UPDATE for \
269 non-windowed queries."
270 .to_string(),
271 ));
272 }
273 }
274 Ok(())
275 }
276
277 #[must_use]
282 pub fn is_append_only_compatible(&self) -> bool {
283 matches!(
284 self.emit_strategy,
285 EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
286 )
287 }
288
289 #[must_use]
291 pub fn has_late_data_handling(&self) -> bool {
292 self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use sqlparser::ast::{Expr, Ident};
300
301 fn make_tumble_window() -> WindowFunction {
302 WindowFunction::Tumble {
304 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
305 interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
306 }
307 }
308
309 #[test]
310 fn test_tumbling_config() {
311 let config =
312 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
313
314 assert_eq!(config.window_type, WindowType::Tumbling);
315 assert_eq!(config.time_column, "event_time");
316 assert_eq!(config.size, Duration::from_secs(300));
317 assert!(config.slide.is_none());
318 assert!(config.gap.is_none());
319 }
320
321 #[test]
322 fn test_sliding_config() {
323 let config = WindowOperatorConfig::sliding(
324 "ts".to_string(),
325 Duration::from_secs(300),
326 Duration::from_secs(60),
327 );
328
329 assert_eq!(config.window_type, WindowType::Sliding);
330 assert_eq!(config.size, Duration::from_secs(300));
331 assert_eq!(config.slide, Some(Duration::from_secs(60)));
332 }
333
334 #[test]
335 fn test_session_config() {
336 let config =
337 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
338
339 assert_eq!(config.window_type, WindowType::Session);
340 assert_eq!(config.gap, Some(Duration::from_secs(1800)));
341 }
342
343 #[test]
344 fn test_from_window_function() {
345 let window = make_tumble_window();
346 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
347
348 assert_eq!(config.window_type, WindowType::Tumbling);
349 assert_eq!(config.time_column, "event_time");
350 assert_eq!(config.size, Duration::from_secs(300));
351 }
352
353 #[test]
354 fn test_with_emit_clause() {
355 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
356
357 let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
358 assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
359
360 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
361 let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
362 assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
363 }
364
365 #[test]
366 fn test_with_late_data_clause() {
367 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
368
369 let late_clause = LateDataClause::side_output_only("late_events".to_string());
370 let config = config.with_late_data_clause(&late_clause).unwrap();
371
372 assert_eq!(
373 config.late_data_side_output,
374 Some("late_events".to_string())
375 );
376 }
377
378 #[test]
379 fn test_append_only_compatible() {
380 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
381
382 assert!(config.is_append_only_compatible());
384
385 let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
387 assert!(!config2.is_append_only_compatible());
388
389 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
391 .with_emit_strategy(EmitStrategy::Changelog);
392 assert!(!config3.is_append_only_compatible());
393 }
394
395 #[test]
396 fn test_has_late_data_handling() {
397 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
398
399 assert!(!config.has_late_data_handling());
401
402 let config2 = config
404 .clone()
405 .with_allowed_lateness(Duration::from_secs(60));
406 assert!(config2.has_late_data_handling());
407
408 let config3 = config.with_late_data_side_output("late".to_string());
410 assert!(config3.has_late_data_handling());
411 }
412
413 #[test]
414 fn test_eowc_without_watermark_errors() {
415 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
416 .with_emit_strategy(EmitStrategy::OnWindowClose);
417
418 let result = config.validate(false, true);
419 assert!(result.is_err());
420 let err = result.unwrap_err().to_string();
421 assert!(
422 err.contains("watermark"),
423 "Expected watermark error, got: {err}"
424 );
425 }
426
427 #[test]
428 fn test_eowc_without_window_errors() {
429 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
430 .with_emit_strategy(EmitStrategy::OnWindowClose);
431
432 let result = config.validate(true, false);
433 assert!(result.is_err());
434 let err = result.unwrap_err().to_string();
435 assert!(
436 err.contains("windowed"),
437 "Expected windowed query error, got: {err}"
438 );
439 }
440
441 #[test]
442 fn test_eowc_with_watermark_and_window_passes() {
443 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
444 .with_emit_strategy(EmitStrategy::OnWindowClose);
445
446 assert!(config.validate(true, true).is_ok());
447 }
448
449 #[test]
450 fn test_final_without_watermark_errors() {
451 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
452 .with_emit_strategy(EmitStrategy::FinalOnly);
453
454 let result = config.validate(false, true);
455 assert!(result.is_err());
456 }
457
458 #[test]
459 fn test_non_eowc_without_watermark_ok() {
460 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
462 .with_emit_strategy(EmitStrategy::OnUpdate);
463 assert!(config.validate(false, false).is_ok());
464
465 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
467 .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
468 assert!(config2.validate(false, false).is_ok());
469
470 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
472 .with_emit_strategy(EmitStrategy::Changelog);
473 assert!(config3.validate(false, false).is_ok());
474 }
475
476 #[test]
477 fn test_display_tumbling_window() {
478 let config =
479 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
480 assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
481 }
482
483 #[test]
484 fn test_display_sliding_window() {
485 let config = WindowOperatorConfig::sliding(
486 "ts".to_string(),
487 Duration::from_secs(300),
488 Duration::from_secs(60),
489 );
490 assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
491 }
492
493 #[test]
494 fn test_display_session_window() {
495 let config =
496 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
497 assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
498 }
499
500 #[test]
501 fn test_display_window_type() {
502 assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
503 assert_eq!(format!("{}", WindowType::Sliding), "HOP");
504 assert_eq!(format!("{}", WindowType::Session), "SESSION");
505 }
506
507 #[test]
508 fn test_display_duration_formatting() {
509 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
511 assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
512
513 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
515 assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
516
517 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
519 assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
520 }
521}