1use std::time::Duration;
7
8use crate::parser::{
9 EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15 Tumbling,
17 Sliding,
19 Session,
21 Cumulate,
23}
24
25#[derive(Debug, Clone)]
37pub struct WindowOperatorConfig {
38 pub window_type: WindowType,
40 pub time_column: String,
42 pub size: Duration,
44 pub slide: Option<Duration>,
46 pub gap: Option<Duration>,
48 pub allowed_lateness: Duration,
50 pub emit_strategy: EmitStrategy,
52 pub late_data_side_output: Option<String>,
54}
55
56fn format_duration(d: Duration) -> String {
58 let secs = d.as_secs();
59 if secs == 0 {
60 return format!("{}ms", d.as_millis());
61 }
62 if secs.is_multiple_of(3600) {
63 format!("{}h", secs / 3600)
64 } else if secs.is_multiple_of(60) {
65 format!("{}m", secs / 60)
66 } else {
67 format!("{secs}s")
68 }
69}
70
71impl std::fmt::Display for WindowType {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 match self {
74 WindowType::Tumbling => write!(f, "TUMBLE"),
75 WindowType::Sliding => write!(f, "HOP"),
76 WindowType::Session => write!(f, "SESSION"),
77 WindowType::Cumulate => write!(f, "CUMULATE"),
78 }
79 }
80}
81
82impl std::fmt::Display for WindowOperatorConfig {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self.window_type {
85 WindowType::Tumbling => {
86 write!(
87 f,
88 "TUMBLE({}, {})",
89 self.time_column,
90 format_duration(self.size)
91 )
92 }
93 WindowType::Sliding => {
94 let slide = self.slide.unwrap_or(self.size);
95 write!(
96 f,
97 "HOP({}, {} SLIDE {})",
98 self.time_column,
99 format_duration(self.size),
100 format_duration(slide)
101 )
102 }
103 WindowType::Session => {
104 let gap = self.gap.unwrap_or(Duration::ZERO);
105 write!(
106 f,
107 "SESSION({}, GAP {})",
108 self.time_column,
109 format_duration(gap)
110 )
111 }
112 WindowType::Cumulate => {
113 let step = self.slide.unwrap_or(self.size);
114 write!(
115 f,
116 "CUMULATE({}, STEP {} SIZE {})",
117 self.time_column,
118 format_duration(step),
119 format_duration(self.size)
120 )
121 }
122 }
123 }
124}
125
126impl WindowOperatorConfig {
127 #[must_use]
129 pub fn tumbling(time_column: String, size: Duration) -> Self {
130 Self {
131 window_type: WindowType::Tumbling,
132 time_column,
133 size,
134 slide: None,
135 gap: None,
136 allowed_lateness: Duration::ZERO,
137 emit_strategy: EmitStrategy::OnWatermark,
138 late_data_side_output: None,
139 }
140 }
141
142 #[must_use]
144 pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
145 Self {
146 window_type: WindowType::Sliding,
147 time_column,
148 size,
149 slide: Some(slide),
150 gap: None,
151 allowed_lateness: Duration::ZERO,
152 emit_strategy: EmitStrategy::OnWatermark,
153 late_data_side_output: None,
154 }
155 }
156
157 #[must_use]
159 pub fn session(time_column: String, gap: Duration) -> Self {
160 Self {
161 window_type: WindowType::Session,
162 time_column,
163 size: Duration::ZERO, slide: None,
165 gap: Some(gap),
166 allowed_lateness: Duration::ZERO,
167 emit_strategy: EmitStrategy::OnWatermark,
168 late_data_side_output: None,
169 }
170 }
171
172 #[must_use]
177 pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
178 Self {
179 window_type: WindowType::Cumulate,
180 time_column,
181 size: max_size,
182 slide: Some(step),
183 gap: None,
184 allowed_lateness: Duration::ZERO,
185 emit_strategy: EmitStrategy::OnWatermark,
186 late_data_side_output: None,
187 }
188 }
189
190 pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
198 let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
199 ParseError::WindowError("Cannot extract time column name".to_string())
200 })?;
201
202 match window {
203 WindowFunction::Tumble { interval, .. } => {
204 let size = WindowRewriter::parse_interval_to_duration(interval)?;
205 Ok(Self::tumbling(time_column, size))
206 }
207 WindowFunction::Hop {
208 slide_interval,
209 window_interval,
210 ..
211 } => {
212 let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
213 let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
214 Ok(Self::sliding(time_column, size, slide))
215 }
216 WindowFunction::Session { gap_interval, .. } => {
217 let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
218 Ok(Self::session(time_column, gap))
219 }
220 WindowFunction::Cumulate {
221 step_interval,
222 max_size_interval,
223 ..
224 } => {
225 let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
226 let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
227 Ok(Self::cumulate(time_column, step, max_size))
228 }
229 }
230 }
231
232 pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
238 self.emit_strategy = emit_clause.to_emit_strategy()?;
239 Ok(self)
240 }
241
242 pub fn with_late_data_clause(
248 mut self,
249 late_data_clause: &LateDataClause,
250 ) -> Result<Self, ParseError> {
251 self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
252 self.late_data_side_output
253 .clone_from(&late_data_clause.side_output);
254 Ok(self)
255 }
256
257 #[must_use]
259 pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
260 self.allowed_lateness = lateness;
261 self
262 }
263
264 #[must_use]
266 pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
267 self.emit_strategy = strategy;
268 self
269 }
270
271 #[must_use]
273 pub fn with_late_data_side_output(mut self, name: String) -> Self {
274 self.late_data_side_output = Some(name);
275 self
276 }
277
278 pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
293 if matches!(
294 self.emit_strategy,
295 EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
296 ) {
297 if !has_watermark {
298 return Err(ParseError::WindowError(
299 "EMIT ON WINDOW CLOSE requires a watermark definition \
300 on the source. Add WATERMARK FOR <column> AS <expr> \
301 to the CREATE SOURCE statement."
302 .to_string(),
303 ));
304 }
305 if !has_window {
306 return Err(ParseError::WindowError(
307 "EMIT ON WINDOW CLOSE is only valid with windowed \
308 aggregation queries. Use EMIT ON UPDATE for \
309 non-windowed queries."
310 .to_string(),
311 ));
312 }
313 }
314 Ok(())
315 }
316
317 #[must_use]
322 pub fn is_append_only_compatible(&self) -> bool {
323 matches!(
324 self.emit_strategy,
325 EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
326 )
327 }
328
329 #[must_use]
331 pub fn has_late_data_handling(&self) -> bool {
332 self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use sqlparser::ast::{Expr, Ident};
340
341 fn make_tumble_window() -> WindowFunction {
342 WindowFunction::Tumble {
344 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
345 interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
346 }
347 }
348
349 #[test]
350 fn test_tumbling_config() {
351 let config =
352 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
353
354 assert_eq!(config.window_type, WindowType::Tumbling);
355 assert_eq!(config.time_column, "event_time");
356 assert_eq!(config.size, Duration::from_secs(300));
357 assert!(config.slide.is_none());
358 assert!(config.gap.is_none());
359 }
360
361 #[test]
362 fn test_sliding_config() {
363 let config = WindowOperatorConfig::sliding(
364 "ts".to_string(),
365 Duration::from_secs(300),
366 Duration::from_secs(60),
367 );
368
369 assert_eq!(config.window_type, WindowType::Sliding);
370 assert_eq!(config.size, Duration::from_secs(300));
371 assert_eq!(config.slide, Some(Duration::from_secs(60)));
372 }
373
374 #[test]
375 fn test_session_config() {
376 let config =
377 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
378
379 assert_eq!(config.window_type, WindowType::Session);
380 assert_eq!(config.gap, Some(Duration::from_secs(1800)));
381 }
382
383 #[test]
384 fn test_from_window_function() {
385 let window = make_tumble_window();
386 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
387
388 assert_eq!(config.window_type, WindowType::Tumbling);
389 assert_eq!(config.time_column, "event_time");
390 assert_eq!(config.size, Duration::from_secs(300));
391 }
392
393 #[test]
394 fn test_with_emit_clause() {
395 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
396
397 let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
398 assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
399
400 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
401 let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
402 assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
403 }
404
405 #[test]
406 fn test_with_late_data_clause() {
407 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
408
409 let late_clause = LateDataClause::side_output_only("late_events".to_string());
410 let config = config.with_late_data_clause(&late_clause).unwrap();
411
412 assert_eq!(
413 config.late_data_side_output,
414 Some("late_events".to_string())
415 );
416 }
417
418 #[test]
419 fn test_append_only_compatible() {
420 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
421
422 assert!(config.is_append_only_compatible());
424
425 let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
427 assert!(!config2.is_append_only_compatible());
428
429 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
431 .with_emit_strategy(EmitStrategy::Changelog);
432 assert!(!config3.is_append_only_compatible());
433 }
434
435 #[test]
436 fn test_has_late_data_handling() {
437 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
438
439 assert!(!config.has_late_data_handling());
441
442 let config2 = config
444 .clone()
445 .with_allowed_lateness(Duration::from_secs(60));
446 assert!(config2.has_late_data_handling());
447
448 let config3 = config.with_late_data_side_output("late".to_string());
450 assert!(config3.has_late_data_handling());
451 }
452
453 #[test]
454 fn test_eowc_without_watermark_errors() {
455 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
456 .with_emit_strategy(EmitStrategy::OnWindowClose);
457
458 let result = config.validate(false, true);
459 assert!(result.is_err());
460 let err = result.unwrap_err().to_string();
461 assert!(
462 err.contains("watermark"),
463 "Expected watermark error, got: {err}"
464 );
465 }
466
467 #[test]
468 fn test_eowc_without_window_errors() {
469 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
470 .with_emit_strategy(EmitStrategy::OnWindowClose);
471
472 let result = config.validate(true, false);
473 assert!(result.is_err());
474 let err = result.unwrap_err().to_string();
475 assert!(
476 err.contains("windowed"),
477 "Expected windowed query error, got: {err}"
478 );
479 }
480
481 #[test]
482 fn test_eowc_with_watermark_and_window_passes() {
483 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
484 .with_emit_strategy(EmitStrategy::OnWindowClose);
485
486 assert!(config.validate(true, true).is_ok());
487 }
488
489 #[test]
490 fn test_final_without_watermark_errors() {
491 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
492 .with_emit_strategy(EmitStrategy::FinalOnly);
493
494 let result = config.validate(false, true);
495 assert!(result.is_err());
496 }
497
498 #[test]
499 fn test_non_eowc_without_watermark_ok() {
500 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
502 .with_emit_strategy(EmitStrategy::OnUpdate);
503 assert!(config.validate(false, false).is_ok());
504
505 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
507 .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
508 assert!(config2.validate(false, false).is_ok());
509
510 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
512 .with_emit_strategy(EmitStrategy::Changelog);
513 assert!(config3.validate(false, false).is_ok());
514 }
515
516 #[test]
517 fn test_display_tumbling_window() {
518 let config =
519 WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
520 assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
521 }
522
523 #[test]
524 fn test_display_sliding_window() {
525 let config = WindowOperatorConfig::sliding(
526 "ts".to_string(),
527 Duration::from_secs(300),
528 Duration::from_secs(60),
529 );
530 assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
531 }
532
533 #[test]
534 fn test_display_session_window() {
535 let config =
536 WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
537 assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
538 }
539
540 #[test]
541 fn test_cumulate_config() {
542 let config = WindowOperatorConfig::cumulate(
543 "ts".to_string(),
544 Duration::from_secs(60),
545 Duration::from_secs(300),
546 );
547
548 assert_eq!(config.window_type, WindowType::Cumulate);
549 assert_eq!(config.time_column, "ts");
550 assert_eq!(config.size, Duration::from_secs(300));
551 assert_eq!(config.slide, Some(Duration::from_secs(60)));
552 assert!(config.gap.is_none());
553 }
554
555 #[test]
556 fn test_cumulate_from_window_function() {
557 let window = WindowFunction::Cumulate {
558 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
559 step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
560 max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
561 };
562 let config = WindowOperatorConfig::from_window_function(&window).unwrap();
563
564 assert_eq!(config.window_type, WindowType::Cumulate);
565 assert_eq!(config.time_column, "event_time");
566 assert_eq!(config.size, Duration::from_secs(300));
567 assert_eq!(config.slide, Some(Duration::from_secs(60)));
568 }
569
570 #[test]
571 fn test_display_cumulate_window() {
572 let config = WindowOperatorConfig::cumulate(
573 "ts".to_string(),
574 Duration::from_secs(60),
575 Duration::from_secs(300),
576 );
577 assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
578 }
579
580 #[test]
581 fn test_display_window_type() {
582 assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
583 assert_eq!(format!("{}", WindowType::Sliding), "HOP");
584 assert_eq!(format!("{}", WindowType::Session), "SESSION");
585 assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
586 }
587
588 #[test]
589 fn test_display_duration_formatting() {
590 let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
592 assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
593
594 let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
596 assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
597
598 let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
600 assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
601 }
602}