1use std::time::Duration;
7
8use crate::parser::{
9 EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15 Tumbling,
17 Sliding,
19 Session,
21 Cumulate,
23}
24
25#[derive(Debug, Clone)]
37pub struct WindowOperatorConfig {
38 pub window_type: WindowType,
40 pub time_column: String,
42 pub size: Duration,
44 pub slide: Option<Duration>,
46 pub gap: Option<Duration>,
48 pub offset_ms: i64,
50 pub allowed_lateness: Duration,
52 pub emit_strategy: EmitStrategy,
54 pub late_data_side_output: Option<String>,
56}
57
58fn format_duration(d: Duration) -> String {
60 let secs = d.as_secs();
61 if secs == 0 {
62 return format!("{}ms", d.as_millis());
63 }
64 if secs.is_multiple_of(3600) {
65 format!("{}h", secs / 3600)
66 } else if secs.is_multiple_of(60) {
67 format!("{}m", secs / 60)
68 } else {
69 format!("{secs}s")
70 }
71}
72
73impl std::fmt::Display for WindowType {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 match self {
76 WindowType::Tumbling => write!(f, "TUMBLE"),
77 WindowType::Sliding => write!(f, "HOP"),
78 WindowType::Session => write!(f, "SESSION"),
79 WindowType::Cumulate => write!(f, "CUMULATE"),
80 }
81 }
82}
83
84impl std::fmt::Display for WindowOperatorConfig {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 match self.window_type {
87 WindowType::Tumbling => {
88 write!(
89 f,
90 "TUMBLE({}, {})",
91 self.time_column,
92 format_duration(self.size)
93 )
94 }
95 WindowType::Sliding => {
96 let slide = self.slide.unwrap_or(self.size);
97 write!(
98 f,
99 "HOP({}, {} SLIDE {})",
100 self.time_column,
101 format_duration(self.size),
102 format_duration(slide)
103 )
104 }
105 WindowType::Session => {
106 let gap = self.gap.unwrap_or(Duration::ZERO);
107 write!(
108 f,
109 "SESSION({}, GAP {})",
110 self.time_column,
111 format_duration(gap)
112 )
113 }
114 WindowType::Cumulate => {
115 let step = self.slide.unwrap_or(self.size);
116 write!(
117 f,
118 "CUMULATE({}, STEP {} SIZE {})",
119 self.time_column,
120 format_duration(step),
121 format_duration(self.size)
122 )
123 }
124 }
125 }
126}
127
128impl WindowOperatorConfig {
129 #[must_use]
131 pub fn tumbling(time_column: String, size: Duration) -> Self {
132 Self {
133 window_type: WindowType::Tumbling,
134 time_column,
135 size,
136 slide: None,
137 gap: None,
138 offset_ms: 0,
139 allowed_lateness: Duration::ZERO,
140 emit_strategy: EmitStrategy::OnWatermark,
141 late_data_side_output: None,
142 }
143 }
144
145 #[must_use]
147 pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
148 Self {
149 window_type: WindowType::Sliding,
150 time_column,
151 size,
152 slide: Some(slide),
153 gap: None,
154 offset_ms: 0,
155 allowed_lateness: Duration::ZERO,
156 emit_strategy: EmitStrategy::OnWatermark,
157 late_data_side_output: None,
158 }
159 }
160
161 #[must_use]
163 pub fn session(time_column: String, gap: Duration) -> Self {
164 Self {
165 window_type: WindowType::Session,
166 time_column,
167 size: Duration::ZERO, slide: None,
169 gap: Some(gap),
170 offset_ms: 0,
171 allowed_lateness: Duration::ZERO,
172 emit_strategy: EmitStrategy::OnWatermark,
173 late_data_side_output: None,
174 }
175 }
176
177 #[must_use]
182 pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
183 Self {
184 window_type: WindowType::Cumulate,
185 time_column,
186 size: max_size,
187 slide: Some(step),
188 gap: None,
189 offset_ms: 0,
190 allowed_lateness: Duration::ZERO,
191 emit_strategy: EmitStrategy::OnWatermark,
192 late_data_side_output: None,
193 }
194 }
195
196 #[must_use]
198 pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
199 self.offset_ms = offset_ms;
200 self
201 }
202
203 pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
211 let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
212 ParseError::WindowError("Cannot extract time column name".to_string())
213 })?;
214
215 match window {
216 WindowFunction::Tumble {
217 interval, offset, ..
218 } => {
219 let size = WindowRewriter::parse_interval_to_duration(interval)?;
220 let mut config = Self::tumbling(time_column, size);
221 if let Some(ref off) = offset {
222 let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
223 config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
224 }
225 Ok(config)
226 }
227 WindowFunction::Hop {
228 slide_interval,
229 window_interval,
230 offset,
231 ..
232 } => {
233 let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
234 let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
235 let mut config = Self::sliding(time_column, size, slide);
236 if let Some(ref off) = offset {
237 let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
238 config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
239 }
240 Ok(config)
241 }
242 WindowFunction::Session { gap_interval, .. } => {
243 let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
244 Ok(Self::session(time_column, gap))
245 }
246 WindowFunction::Cumulate {
247 step_interval,
248 max_size_interval,
249 ..
250 } => {
251 let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
252 let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
253 Ok(Self::cumulate(time_column, step, max_size))
254 }
255 }
256 }
257
258 pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
264 self.emit_strategy = emit_clause.to_emit_strategy()?;
265 Ok(self)
266 }
267
268 pub fn with_late_data_clause(
274 mut self,
275 late_data_clause: &LateDataClause,
276 ) -> Result<Self, ParseError> {
277 self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
278 self.late_data_side_output
279 .clone_from(&late_data_clause.side_output);
280 Ok(self)
281 }
282
283 #[must_use]
285 pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
286 self.allowed_lateness = lateness;
287 self
288 }
289
290 #[must_use]
292 pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
293 self.emit_strategy = strategy;
294 self
295 }
296
297 #[must_use]
299 pub fn with_late_data_side_output(mut self, name: String) -> Self {
300 self.late_data_side_output = Some(name);
301 self
302 }
303
304 pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
319 if matches!(
320 self.emit_strategy,
321 EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
322 ) {
323 if !has_watermark {
324 return Err(ParseError::WindowError(
325 "EMIT ON WINDOW CLOSE requires a watermark definition \
326 on the source. Add WATERMARK FOR <column> AS <expr> \
327 to the CREATE SOURCE statement."
328 .to_string(),
329 ));
330 }
331 if !has_window {
332 return Err(ParseError::WindowError(
333 "EMIT ON WINDOW CLOSE is only valid with windowed \
334 aggregation queries. Use EMIT ON UPDATE for \
335 non-windowed queries."
336 .to_string(),
337 ));
338 }
339 }
340 Ok(())
341 }
342
343 #[must_use]
348 pub fn is_append_only_compatible(&self) -> bool {
349 matches!(
350 self.emit_strategy,
351 EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
352 )
353 }
354
355 #[must_use]
357 pub fn has_late_data_handling(&self) -> bool {
358 self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use sqlparser::ast::{Expr, Ident};
366
367 fn make_tumble_window() -> WindowFunction {
368 WindowFunction::Tumble {
370 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
371 interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
372 offset: None,
373 }
374 }
375
376 #[test]
377 fn test_tumbling_config() {
378 let config =
379 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
380
381 assert_eq!(config.window_type, WindowType::Tumbling);
382 assert_eq!(config.time_column, "event_time");
383 assert_eq!(config.size, Duration::from_secs(300));
384 assert!(config.slide.is_none());
385 assert!(config.gap.is_none());
386 }
387
388 #[test]
389 fn test_sliding_config() {
390 let config = WindowOperatorConfig::sliding(
391 "ts".to_string(),
392 Duration::from_secs(300),
393 Duration::from_secs(60),
394 );
395
396 assert_eq!(config.window_type, WindowType::Sliding);
397 assert_eq!(config.size, Duration::from_secs(300));
398 assert_eq!(config.slide, Some(Duration::from_secs(60)));
399 }
400
401 #[test]
402 fn test_session_config() {
403 let config =
404 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
405
406 assert_eq!(config.window_type, WindowType::Session);
407 assert_eq!(config.gap, Some(Duration::from_secs(1800)));
408 }
409
410 #[test]
411 fn test_from_window_function() {
412 let window = make_tumble_window();
413 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
414
415 assert_eq!(config.window_type, WindowType::Tumbling);
416 assert_eq!(config.time_column, "event_time");
417 assert_eq!(config.size, Duration::from_secs(300));
418 }
419
420 #[test]
421 fn test_with_emit_clause() {
422 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
423
424 let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
425 assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
426
427 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
428 let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
429 assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
430 }
431
432 #[test]
433 fn test_with_late_data_clause() {
434 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
435
436 let late_clause = LateDataClause::side_output_only("late_events".to_string());
437 let config = config.with_late_data_clause(&late_clause).unwrap();
438
439 assert_eq!(
440 config.late_data_side_output,
441 Some("late_events".to_string())
442 );
443 }
444
445 #[test]
446 fn test_append_only_compatible() {
447 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
448
449 assert!(config.is_append_only_compatible());
451
452 let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
454 assert!(!config2.is_append_only_compatible());
455
456 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
458 .with_emit_strategy(EmitStrategy::Changelog);
459 assert!(!config3.is_append_only_compatible());
460 }
461
462 #[test]
463 fn test_has_late_data_handling() {
464 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
465
466 assert!(!config.has_late_data_handling());
468
469 let config2 = config
471 .clone()
472 .with_allowed_lateness(Duration::from_secs(60));
473 assert!(config2.has_late_data_handling());
474
475 let config3 = config.with_late_data_side_output("late".to_string());
477 assert!(config3.has_late_data_handling());
478 }
479
480 #[test]
481 fn test_eowc_without_watermark_errors() {
482 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
483 .with_emit_strategy(EmitStrategy::OnWindowClose);
484
485 let result = config.validate(false, true);
486 assert!(result.is_err());
487 let err = result.unwrap_err().to_string();
488 assert!(
489 err.contains("watermark"),
490 "Expected watermark error, got: {err}"
491 );
492 }
493
494 #[test]
495 fn test_eowc_without_window_errors() {
496 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
497 .with_emit_strategy(EmitStrategy::OnWindowClose);
498
499 let result = config.validate(true, false);
500 assert!(result.is_err());
501 let err = result.unwrap_err().to_string();
502 assert!(
503 err.contains("windowed"),
504 "Expected windowed query error, got: {err}"
505 );
506 }
507
508 #[test]
509 fn test_eowc_with_watermark_and_window_passes() {
510 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
511 .with_emit_strategy(EmitStrategy::OnWindowClose);
512
513 assert!(config.validate(true, true).is_ok());
514 }
515
516 #[test]
517 fn test_final_without_watermark_errors() {
518 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
519 .with_emit_strategy(EmitStrategy::FinalOnly);
520
521 let result = config.validate(false, true);
522 assert!(result.is_err());
523 }
524
525 #[test]
526 fn test_non_eowc_without_watermark_ok() {
527 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
529 .with_emit_strategy(EmitStrategy::OnUpdate);
530 assert!(config.validate(false, false).is_ok());
531
532 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
534 .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
535 assert!(config2.validate(false, false).is_ok());
536
537 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
539 .with_emit_strategy(EmitStrategy::Changelog);
540 assert!(config3.validate(false, false).is_ok());
541 }
542
543 #[test]
544 fn test_display_tumbling_window() {
545 let config =
546 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
547 assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
548 }
549
550 #[test]
551 fn test_display_sliding_window() {
552 let config = WindowOperatorConfig::sliding(
553 "ts".to_string(),
554 Duration::from_secs(300),
555 Duration::from_secs(60),
556 );
557 assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
558 }
559
560 #[test]
561 fn test_display_session_window() {
562 let config =
563 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
564 assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
565 }
566
567 #[test]
568 fn test_cumulate_config() {
569 let config = WindowOperatorConfig::cumulate(
570 "ts".to_string(),
571 Duration::from_secs(60),
572 Duration::from_secs(300),
573 );
574
575 assert_eq!(config.window_type, WindowType::Cumulate);
576 assert_eq!(config.time_column, "ts");
577 assert_eq!(config.size, Duration::from_secs(300));
578 assert_eq!(config.slide, Some(Duration::from_secs(60)));
579 assert!(config.gap.is_none());
580 }
581
582 #[test]
583 fn test_cumulate_from_window_function() {
584 let window = WindowFunction::Cumulate {
585 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
586 step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
587 max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
588 };
589 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
590
591 assert_eq!(config.window_type, WindowType::Cumulate);
592 assert_eq!(config.time_column, "event_time");
593 assert_eq!(config.size, Duration::from_secs(300));
594 assert_eq!(config.slide, Some(Duration::from_secs(60)));
595 }
596
597 #[test]
598 fn test_display_cumulate_window() {
599 let config = WindowOperatorConfig::cumulate(
600 "ts".to_string(),
601 Duration::from_secs(60),
602 Duration::from_secs(300),
603 );
604 assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
605 }
606
607 #[test]
608 fn test_display_window_type() {
609 assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
610 assert_eq!(format!("{}", WindowType::Sliding), "HOP");
611 assert_eq!(format!("{}", WindowType::Session), "SESSION");
612 assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
613 }
614
615 #[test]
616 fn test_display_duration_formatting() {
617 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
619 assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
620
621 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
623 assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
624
625 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
627 assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
628 }
629}