1use std::time::Duration;
7
8use crate::parser::{
9 EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15 Tumbling,
17 Sliding,
19 Session,
21 Cumulate,
23}
24
25#[derive(Debug, Clone)]
37pub struct WindowOperatorConfig {
38 pub window_type: WindowType,
40 pub time_column: String,
42 pub size: Duration,
44 pub slide: Option<Duration>,
46 pub gap: Option<Duration>,
48 pub offset_ms: i64,
50 pub allowed_lateness: Duration,
52 pub emit_strategy: EmitStrategy,
54 pub late_data_side_output: Option<String>,
56}
57
58fn format_duration(d: Duration) -> String {
60 let secs = d.as_secs();
61 if secs == 0 {
62 return format!("{}ms", d.as_millis());
63 }
64 if secs.is_multiple_of(3600) {
65 format!("{}h", secs / 3600)
66 } else if secs.is_multiple_of(60) {
67 format!("{}m", secs / 60)
68 } else {
69 format!("{secs}s")
70 }
71}
72
73impl std::fmt::Display for WindowType {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 match self {
76 WindowType::Tumbling => write!(f, "TUMBLE"),
77 WindowType::Sliding => write!(f, "HOP"),
78 WindowType::Session => write!(f, "SESSION"),
79 WindowType::Cumulate => write!(f, "CUMULATE"),
80 }
81 }
82}
83
84impl std::fmt::Display for WindowOperatorConfig {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 match self.window_type {
87 WindowType::Tumbling => {
88 write!(
89 f,
90 "TUMBLE({}, {})",
91 self.time_column,
92 format_duration(self.size)
93 )
94 }
95 WindowType::Sliding => {
96 let slide = self.slide.unwrap_or(self.size);
97 write!(
98 f,
99 "HOP({}, {} SLIDE {})",
100 self.time_column,
101 format_duration(self.size),
102 format_duration(slide)
103 )
104 }
105 WindowType::Session => {
106 let gap = self.gap.unwrap_or(Duration::ZERO);
107 write!(
108 f,
109 "SESSION({}, GAP {})",
110 self.time_column,
111 format_duration(gap)
112 )
113 }
114 WindowType::Cumulate => {
115 let step = self.slide.unwrap_or(self.size);
116 write!(
117 f,
118 "CUMULATE({}, STEP {} SIZE {})",
119 self.time_column,
120 format_duration(step),
121 format_duration(self.size)
122 )
123 }
124 }
125 }
126}
127
128impl WindowOperatorConfig {
129 #[must_use]
131 pub fn tumbling(time_column: String, size: Duration) -> Self {
132 Self {
133 window_type: WindowType::Tumbling,
134 time_column,
135 size,
136 slide: None,
137 gap: None,
138 offset_ms: 0,
139 allowed_lateness: Duration::ZERO,
140 emit_strategy: EmitStrategy::OnWatermark,
141 late_data_side_output: None,
142 }
143 }
144
145 #[must_use]
147 pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
148 Self {
149 window_type: WindowType::Sliding,
150 time_column,
151 size,
152 slide: Some(slide),
153 gap: None,
154 offset_ms: 0,
155 allowed_lateness: Duration::ZERO,
156 emit_strategy: EmitStrategy::OnWatermark,
157 late_data_side_output: None,
158 }
159 }
160
161 #[must_use]
163 pub fn session(time_column: String, gap: Duration) -> Self {
164 Self {
165 window_type: WindowType::Session,
166 time_column,
167 size: Duration::ZERO, slide: None,
169 gap: Some(gap),
170 offset_ms: 0,
171 allowed_lateness: Duration::ZERO,
172 emit_strategy: EmitStrategy::OnWatermark,
173 late_data_side_output: None,
174 }
175 }
176
177 #[must_use]
182 pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
183 Self {
184 window_type: WindowType::Cumulate,
185 time_column,
186 size: max_size,
187 slide: Some(step),
188 gap: None,
189 offset_ms: 0,
190 allowed_lateness: Duration::ZERO,
191 emit_strategy: EmitStrategy::OnWatermark,
192 late_data_side_output: None,
193 }
194 }
195
196 #[must_use]
198 pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
199 self.offset_ms = offset_ms;
200 self
201 }
202
203 pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
211 let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
212 ParseError::WindowError("Cannot extract time column name".to_string())
213 })?;
214
215 match window {
216 WindowFunction::Tumble {
217 interval, offset, ..
218 } => {
219 let size = WindowRewriter::parse_interval_to_duration(interval)?;
220 let mut config = Self::tumbling(time_column, size);
221 if let Some(ref off) = offset {
222 let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
223 config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
224 }
225 Ok(config)
226 }
227 WindowFunction::Hop {
228 slide_interval,
229 window_interval,
230 offset,
231 ..
232 } => {
233 let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
234 let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
235 let mut config = Self::sliding(time_column, size, slide);
236 if let Some(ref off) = offset {
237 let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
238 config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
239 }
240 Ok(config)
241 }
242 WindowFunction::Session { gap_interval, .. } => {
243 let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
244 Ok(Self::session(time_column, gap))
245 }
246 WindowFunction::Cumulate {
247 step_interval,
248 max_size_interval,
249 ..
250 } => {
251 let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
252 let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
253 Ok(Self::cumulate(time_column, step, max_size))
254 }
255 }
256 }
257
258 pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
264 self.emit_strategy = emit_clause.to_emit_strategy()?;
265 Ok(self)
266 }
267
268 pub fn with_late_data_clause(
274 mut self,
275 late_data_clause: &LateDataClause,
276 ) -> Result<Self, ParseError> {
277 if late_data_clause.side_output.is_some() {
278 return Err(ParseError::WindowError(
279 "LATE DATA SIDE OUTPUT is not yet supported in pipeline mode; \
280 use ALLOWED LATENESS without a side output, or omit the clause"
281 .to_string(),
282 ));
283 }
284 self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
285 self.late_data_side_output
286 .clone_from(&late_data_clause.side_output);
287 Ok(self)
288 }
289
290 #[must_use]
292 pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
293 self.allowed_lateness = lateness;
294 self
295 }
296
297 #[must_use]
299 pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
300 self.emit_strategy = strategy;
301 self
302 }
303
304 #[must_use]
306 pub fn with_late_data_side_output(mut self, name: String) -> Self {
307 self.late_data_side_output = Some(name);
308 self
309 }
310
311 pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
326 if matches!(
327 self.emit_strategy,
328 EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
329 ) {
330 if !has_watermark {
331 return Err(ParseError::WindowError(
332 "EMIT ON WINDOW CLOSE requires a watermark definition \
333 on the source. Add WATERMARK FOR <column> AS <expr> \
334 to the CREATE SOURCE statement."
335 .to_string(),
336 ));
337 }
338 if !has_window {
339 return Err(ParseError::WindowError(
340 "EMIT ON WINDOW CLOSE is only valid with windowed \
341 aggregation queries. Use EMIT ON UPDATE for \
342 non-windowed queries."
343 .to_string(),
344 ));
345 }
346 }
347 Ok(())
348 }
349
350 #[must_use]
355 pub fn is_append_only_compatible(&self) -> bool {
356 matches!(
357 self.emit_strategy,
358 EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
359 )
360 }
361
362 #[must_use]
364 pub fn has_late_data_handling(&self) -> bool {
365 self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use sqlparser::ast::{Expr, Ident};
373
374 fn make_tumble_window() -> WindowFunction {
375 WindowFunction::Tumble {
377 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
378 interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
379 offset: None,
380 }
381 }
382
383 #[test]
384 fn test_tumbling_config() {
385 let config =
386 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
387
388 assert_eq!(config.window_type, WindowType::Tumbling);
389 assert_eq!(config.time_column, "event_time");
390 assert_eq!(config.size, Duration::from_secs(300));
391 assert!(config.slide.is_none());
392 assert!(config.gap.is_none());
393 }
394
395 #[test]
396 fn test_sliding_config() {
397 let config = WindowOperatorConfig::sliding(
398 "ts".to_string(),
399 Duration::from_secs(300),
400 Duration::from_secs(60),
401 );
402
403 assert_eq!(config.window_type, WindowType::Sliding);
404 assert_eq!(config.size, Duration::from_secs(300));
405 assert_eq!(config.slide, Some(Duration::from_secs(60)));
406 }
407
408 #[test]
409 fn test_session_config() {
410 let config =
411 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
412
413 assert_eq!(config.window_type, WindowType::Session);
414 assert_eq!(config.gap, Some(Duration::from_secs(1800)));
415 }
416
417 #[test]
418 fn test_from_window_function() {
419 let window = make_tumble_window();
420 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
421
422 assert_eq!(config.window_type, WindowType::Tumbling);
423 assert_eq!(config.time_column, "event_time");
424 assert_eq!(config.size, Duration::from_secs(300));
425 }
426
427 #[test]
428 fn test_with_emit_clause() {
429 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
430
431 let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
432 assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
433
434 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
435 let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
436 assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
437 }
438
439 #[test]
440 fn test_with_late_data_clause_side_output_rejected() {
441 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
442
443 let late_clause = LateDataClause::side_output_only("late_events".to_string());
445 let result = config.with_late_data_clause(&late_clause);
446 assert!(result.is_err());
447 assert!(result
448 .unwrap_err()
449 .to_string()
450 .contains("not yet supported"));
451 }
452
453 #[test]
454 fn test_with_late_data_clause_lateness_only_accepted() {
455 use sqlparser::ast::Expr;
456
457 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
458
459 let late_clause = LateDataClause::with_allowed_lateness(Expr::Value(
461 sqlparser::ast::Value::SingleQuotedString("5 SECONDS".to_string()).into(),
462 ));
463 let config = config.with_late_data_clause(&late_clause).unwrap();
464 assert_eq!(config.allowed_lateness, Duration::from_secs(5));
465 assert!(config.late_data_side_output.is_none());
466 }
467
468 #[test]
469 fn test_append_only_compatible() {
470 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
471
472 assert!(config.is_append_only_compatible());
474
475 let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
477 assert!(!config2.is_append_only_compatible());
478
479 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
481 .with_emit_strategy(EmitStrategy::Changelog);
482 assert!(!config3.is_append_only_compatible());
483 }
484
485 #[test]
486 fn test_has_late_data_handling() {
487 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
488
489 assert!(!config.has_late_data_handling());
491
492 let config2 = config
494 .clone()
495 .with_allowed_lateness(Duration::from_secs(60));
496 assert!(config2.has_late_data_handling());
497
498 let config3 = config.with_late_data_side_output("late".to_string());
500 assert!(config3.has_late_data_handling());
501 }
502
503 #[test]
504 fn test_eowc_without_watermark_errors() {
505 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
506 .with_emit_strategy(EmitStrategy::OnWindowClose);
507
508 let result = config.validate(false, true);
509 assert!(result.is_err());
510 let err = result.unwrap_err().to_string();
511 assert!(
512 err.contains("watermark"),
513 "Expected watermark error, got: {err}"
514 );
515 }
516
517 #[test]
518 fn test_eowc_without_window_errors() {
519 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
520 .with_emit_strategy(EmitStrategy::OnWindowClose);
521
522 let result = config.validate(true, false);
523 assert!(result.is_err());
524 let err = result.unwrap_err().to_string();
525 assert!(
526 err.contains("windowed"),
527 "Expected windowed query error, got: {err}"
528 );
529 }
530
531 #[test]
532 fn test_eowc_with_watermark_and_window_passes() {
533 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
534 .with_emit_strategy(EmitStrategy::OnWindowClose);
535
536 assert!(config.validate(true, true).is_ok());
537 }
538
539 #[test]
540 fn test_final_without_watermark_errors() {
541 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
542 .with_emit_strategy(EmitStrategy::FinalOnly);
543
544 let result = config.validate(false, true);
545 assert!(result.is_err());
546 }
547
548 #[test]
549 fn test_non_eowc_without_watermark_ok() {
550 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
552 .with_emit_strategy(EmitStrategy::OnUpdate);
553 assert!(config.validate(false, false).is_ok());
554
555 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
557 .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
558 assert!(config2.validate(false, false).is_ok());
559
560 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
562 .with_emit_strategy(EmitStrategy::Changelog);
563 assert!(config3.validate(false, false).is_ok());
564 }
565
566 #[test]
567 fn test_display_tumbling_window() {
568 let config =
569 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
570 assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
571 }
572
573 #[test]
574 fn test_display_sliding_window() {
575 let config = WindowOperatorConfig::sliding(
576 "ts".to_string(),
577 Duration::from_secs(300),
578 Duration::from_secs(60),
579 );
580 assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
581 }
582
583 #[test]
584 fn test_display_session_window() {
585 let config =
586 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
587 assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
588 }
589
590 #[test]
591 fn test_cumulate_config() {
592 let config = WindowOperatorConfig::cumulate(
593 "ts".to_string(),
594 Duration::from_secs(60),
595 Duration::from_secs(300),
596 );
597
598 assert_eq!(config.window_type, WindowType::Cumulate);
599 assert_eq!(config.time_column, "ts");
600 assert_eq!(config.size, Duration::from_secs(300));
601 assert_eq!(config.slide, Some(Duration::from_secs(60)));
602 assert!(config.gap.is_none());
603 }
604
605 #[test]
606 fn test_cumulate_from_window_function() {
607 let window = WindowFunction::Cumulate {
608 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
609 step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
610 max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
611 };
612 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
613
614 assert_eq!(config.window_type, WindowType::Cumulate);
615 assert_eq!(config.time_column, "event_time");
616 assert_eq!(config.size, Duration::from_secs(300));
617 assert_eq!(config.slide, Some(Duration::from_secs(60)));
618 }
619
620 #[test]
621 fn test_display_cumulate_window() {
622 let config = WindowOperatorConfig::cumulate(
623 "ts".to_string(),
624 Duration::from_secs(60),
625 Duration::from_secs(300),
626 );
627 assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
628 }
629
630 #[test]
631 fn test_display_window_type() {
632 assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
633 assert_eq!(format!("{}", WindowType::Sliding), "HOP");
634 assert_eq!(format!("{}", WindowType::Session), "SESSION");
635 assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
636 }
637
638 #[test]
639 fn test_display_duration_formatting() {
640 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
642 assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
643
644 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
646 assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
647
648 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
650 assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
651 }
652}