Skip to main content

laminar_sql/translator/
window_translator.rs

1//! Window operator configuration builder
2//!
3//! Translates parsed window functions and EMIT/late data clauses
4//! into complete operator configurations.
5
6use std::time::Duration;
7
8use crate::parser::{
9    EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12/// Type of window operation
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15    /// Fixed-size non-overlapping windows
16    Tumbling,
17    /// Fixed-size overlapping windows with slide
18    Sliding,
19    /// Dynamic windows based on activity gaps
20    Session,
21    /// Incrementally growing windows within fixed-size epochs
22    Cumulate,
23}
24
25/// Complete configuration for instantiating a window operator.
26///
27/// This structure holds all the information needed to create and configure
28/// a window operator in Ring 0.
29///
30/// # EMIT ON WINDOW CLOSE
31///
32/// When `emit_strategy` is `OnWindowClose` or `FinalOnly`, use [`validate()`](Self::validate)
33/// to ensure the configuration is valid. These strategies require:
34/// - A watermark definition on the source (timers are driven by watermark)
35/// - A windowed aggregation context (non-windowed queries cannot use EOWC)
36#[derive(Debug, Clone)]
37pub struct WindowOperatorConfig {
38    /// The type of window (tumbling, sliding, session)
39    pub window_type: WindowType,
40    /// The time column name used for windowing
41    pub time_column: String,
42    /// Window size (for tumbling and sliding)
43    pub size: Duration,
44    /// Slide interval for sliding windows
45    pub slide: Option<Duration>,
46    /// Gap interval for session windows
47    pub gap: Option<Duration>,
48    /// Window offset in milliseconds for timezone-aligned windows
49    pub offset_ms: i64,
50    /// Maximum allowed lateness for late events
51    pub allowed_lateness: Duration,
52    /// Emit strategy (when to output results)
53    pub emit_strategy: EmitStrategy,
54    /// Side output name for late data (if configured)
55    pub late_data_side_output: Option<String>,
56}
57
58/// Format a Duration as a human-readable string (e.g., "60s", "5m", "1h").
59fn format_duration(d: Duration) -> String {
60    let secs = d.as_secs();
61    if secs == 0 {
62        return format!("{}ms", d.as_millis());
63    }
64    if secs.is_multiple_of(3600) {
65        format!("{}h", secs / 3600)
66    } else if secs.is_multiple_of(60) {
67        format!("{}m", secs / 60)
68    } else {
69        format!("{secs}s")
70    }
71}
72
73impl std::fmt::Display for WindowType {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            WindowType::Tumbling => write!(f, "TUMBLE"),
77            WindowType::Sliding => write!(f, "HOP"),
78            WindowType::Session => write!(f, "SESSION"),
79            WindowType::Cumulate => write!(f, "CUMULATE"),
80        }
81    }
82}
83
84impl std::fmt::Display for WindowOperatorConfig {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self.window_type {
87            WindowType::Tumbling => {
88                write!(
89                    f,
90                    "TUMBLE({}, {})",
91                    self.time_column,
92                    format_duration(self.size)
93                )
94            }
95            WindowType::Sliding => {
96                let slide = self.slide.unwrap_or(self.size);
97                write!(
98                    f,
99                    "HOP({}, {} SLIDE {})",
100                    self.time_column,
101                    format_duration(self.size),
102                    format_duration(slide)
103                )
104            }
105            WindowType::Session => {
106                let gap = self.gap.unwrap_or(Duration::ZERO);
107                write!(
108                    f,
109                    "SESSION({}, GAP {})",
110                    self.time_column,
111                    format_duration(gap)
112                )
113            }
114            WindowType::Cumulate => {
115                let step = self.slide.unwrap_or(self.size);
116                write!(
117                    f,
118                    "CUMULATE({}, STEP {} SIZE {})",
119                    self.time_column,
120                    format_duration(step),
121                    format_duration(self.size)
122                )
123            }
124        }
125    }
126}
127
128impl WindowOperatorConfig {
129    /// Create a new tumbling window configuration.
130    #[must_use]
131    pub fn tumbling(time_column: String, size: Duration) -> Self {
132        Self {
133            window_type: WindowType::Tumbling,
134            time_column,
135            size,
136            slide: None,
137            gap: None,
138            offset_ms: 0,
139            allowed_lateness: Duration::ZERO,
140            emit_strategy: EmitStrategy::OnWatermark,
141            late_data_side_output: None,
142        }
143    }
144
145    /// Create a new sliding window configuration.
146    #[must_use]
147    pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
148        Self {
149            window_type: WindowType::Sliding,
150            time_column,
151            size,
152            slide: Some(slide),
153            gap: None,
154            offset_ms: 0,
155            allowed_lateness: Duration::ZERO,
156            emit_strategy: EmitStrategy::OnWatermark,
157            late_data_side_output: None,
158        }
159    }
160
161    /// Create a new session window configuration.
162    #[must_use]
163    pub fn session(time_column: String, gap: Duration) -> Self {
164        Self {
165            window_type: WindowType::Session,
166            time_column,
167            size: Duration::ZERO, // Not used for session windows
168            slide: None,
169            gap: Some(gap),
170            offset_ms: 0,
171            allowed_lateness: Duration::ZERO,
172            emit_strategy: EmitStrategy::OnWatermark,
173            late_data_side_output: None,
174        }
175    }
176
177    /// Create a new cumulate window configuration.
178    ///
179    /// `step` is the window growth increment and `max_size` is the epoch
180    /// size. The `slide` field is reused to store the step interval.
181    #[must_use]
182    pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
183        Self {
184            window_type: WindowType::Cumulate,
185            time_column,
186            size: max_size,
187            slide: Some(step),
188            gap: None,
189            offset_ms: 0,
190            allowed_lateness: Duration::ZERO,
191            emit_strategy: EmitStrategy::OnWatermark,
192            late_data_side_output: None,
193        }
194    }
195
196    /// Set window offset in milliseconds.
197    #[must_use]
198    pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
199        self.offset_ms = offset_ms;
200        self
201    }
202
203    /// Build configuration from a parsed `WindowFunction`.
204    ///
205    /// # Errors
206    ///
207    /// Returns `ParseError::WindowError` if:
208    /// - Time column cannot be extracted
209    /// - Interval cannot be parsed
210    pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
211        let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
212            ParseError::WindowError("Cannot extract time column name".to_string())
213        })?;
214
215        match window {
216            WindowFunction::Tumble {
217                interval, offset, ..
218            } => {
219                let size = WindowRewriter::parse_interval_to_duration(interval)?;
220                let mut config = Self::tumbling(time_column, size);
221                if let Some(ref off) = offset {
222                    let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
223                    config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
224                }
225                Ok(config)
226            }
227            WindowFunction::Hop {
228                slide_interval,
229                window_interval,
230                offset,
231                ..
232            } => {
233                let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
234                let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
235                let mut config = Self::sliding(time_column, size, slide);
236                if let Some(ref off) = offset {
237                    let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
238                    config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
239                }
240                Ok(config)
241            }
242            WindowFunction::Session { gap_interval, .. } => {
243                let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
244                Ok(Self::session(time_column, gap))
245            }
246            WindowFunction::Cumulate {
247                step_interval,
248                max_size_interval,
249                ..
250            } => {
251                let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
252                let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
253                Ok(Self::cumulate(time_column, step, max_size))
254            }
255        }
256    }
257
258    /// Apply EMIT clause configuration.
259    ///
260    /// # Errors
261    ///
262    /// Returns `ParseError::WindowError` if the emit clause cannot be converted.
263    pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
264        self.emit_strategy = emit_clause.to_emit_strategy()?;
265        Ok(self)
266    }
267
268    /// Apply late data clause configuration.
269    ///
270    /// # Errors
271    ///
272    /// Returns `ParseError::WindowError` if the allowed lateness cannot be parsed.
273    pub fn with_late_data_clause(
274        mut self,
275        late_data_clause: &LateDataClause,
276    ) -> Result<Self, ParseError> {
277        self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
278        self.late_data_side_output
279            .clone_from(&late_data_clause.side_output);
280        Ok(self)
281    }
282
283    /// Set allowed lateness duration.
284    #[must_use]
285    pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
286        self.allowed_lateness = lateness;
287        self
288    }
289
290    /// Set emit strategy.
291    #[must_use]
292    pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
293        self.emit_strategy = strategy;
294        self
295    }
296
297    /// Set late data side output.
298    #[must_use]
299    pub fn with_late_data_side_output(mut self, name: String) -> Self {
300        self.late_data_side_output = Some(name);
301        self
302    }
303
304    /// Validates that the window operator configuration is used in a valid context.
305    ///
306    /// Checks:
307    /// - `EMIT ON WINDOW CLOSE` and `EMIT FINAL` require a watermark on the source
308    /// - `EMIT ON WINDOW CLOSE` and `EMIT FINAL` require a windowed aggregation
309    ///
310    /// # Arguments
311    ///
312    /// * `has_watermark` - Whether the source has a watermark definition
313    /// * `has_window` - Whether the query contains a windowed aggregation
314    ///
315    /// # Errors
316    ///
317    /// Returns `ParseError::WindowError` if validation fails.
318    pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
319        if matches!(
320            self.emit_strategy,
321            EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
322        ) {
323            if !has_watermark {
324                return Err(ParseError::WindowError(
325                    "EMIT ON WINDOW CLOSE requires a watermark definition \
326                     on the source. Add WATERMARK FOR <column> AS <expr> \
327                     to the CREATE SOURCE statement."
328                        .to_string(),
329                ));
330            }
331            if !has_window {
332                return Err(ParseError::WindowError(
333                    "EMIT ON WINDOW CLOSE is only valid with windowed \
334                     aggregation queries. Use EMIT ON UPDATE for \
335                     non-windowed queries."
336                        .to_string(),
337                ));
338            }
339        }
340        Ok(())
341    }
342
343    /// Check if this configuration supports append-only output.
344    ///
345    /// Append-only sinks (Kafka, S3, Delta Lake) require emit strategies
346    /// that don't produce retractions.
347    #[must_use]
348    pub fn is_append_only_compatible(&self) -> bool {
349        matches!(
350            self.emit_strategy,
351            EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
352        )
353    }
354
355    /// Check if late data handling is configured.
356    #[must_use]
357    pub fn has_late_data_handling(&self) -> bool {
358        self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use sqlparser::ast::{Expr, Ident};
366
367    fn make_tumble_window() -> WindowFunction {
368        // Create a simple tumble window for testing
369        WindowFunction::Tumble {
370            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
371            interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
372            offset: None,
373        }
374    }
375
376    #[test]
377    fn test_tumbling_config() {
378        let config =
379            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
380
381        assert_eq!(config.window_type, WindowType::Tumbling);
382        assert_eq!(config.time_column, "event_time");
383        assert_eq!(config.size, Duration::from_secs(300));
384        assert!(config.slide.is_none());
385        assert!(config.gap.is_none());
386    }
387
388    #[test]
389    fn test_sliding_config() {
390        let config = WindowOperatorConfig::sliding(
391            "ts".to_string(),
392            Duration::from_secs(300),
393            Duration::from_secs(60),
394        );
395
396        assert_eq!(config.window_type, WindowType::Sliding);
397        assert_eq!(config.size, Duration::from_secs(300));
398        assert_eq!(config.slide, Some(Duration::from_secs(60)));
399    }
400
401    #[test]
402    fn test_session_config() {
403        let config =
404            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
405
406        assert_eq!(config.window_type, WindowType::Session);
407        assert_eq!(config.gap, Some(Duration::from_secs(1800)));
408    }
409
410    #[test]
411    fn test_from_window_function() {
412        let window = make_tumble_window();
413        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
414
415        assert_eq!(config.window_type, WindowType::Tumbling);
416        assert_eq!(config.time_column, "event_time");
417        assert_eq!(config.size, Duration::from_secs(300));
418    }
419
420    #[test]
421    fn test_with_emit_clause() {
422        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
423
424        let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
425        assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
426
427        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
428        let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
429        assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
430    }
431
432    #[test]
433    fn test_with_late_data_clause() {
434        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
435
436        let late_clause = LateDataClause::side_output_only("late_events".to_string());
437        let config = config.with_late_data_clause(&late_clause).unwrap();
438
439        assert_eq!(
440            config.late_data_side_output,
441            Some("late_events".to_string())
442        );
443    }
444
445    #[test]
446    fn test_append_only_compatible() {
447        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
448
449        // Default emit strategy (OnWatermark) is append-only compatible
450        assert!(config.is_append_only_compatible());
451
452        // OnUpdate is NOT append-only compatible (produces retractions)
453        let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
454        assert!(!config2.is_append_only_compatible());
455
456        // Changelog is NOT append-only compatible
457        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
458            .with_emit_strategy(EmitStrategy::Changelog);
459        assert!(!config3.is_append_only_compatible());
460    }
461
462    #[test]
463    fn test_has_late_data_handling() {
464        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
465
466        // No late data handling by default
467        assert!(!config.has_late_data_handling());
468
469        // With allowed lateness
470        let config2 = config
471            .clone()
472            .with_allowed_lateness(Duration::from_secs(60));
473        assert!(config2.has_late_data_handling());
474
475        // With side output
476        let config3 = config.with_late_data_side_output("late".to_string());
477        assert!(config3.has_late_data_handling());
478    }
479
480    #[test]
481    fn test_eowc_without_watermark_errors() {
482        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
483            .with_emit_strategy(EmitStrategy::OnWindowClose);
484
485        let result = config.validate(false, true);
486        assert!(result.is_err());
487        let err = result.unwrap_err().to_string();
488        assert!(
489            err.contains("watermark"),
490            "Expected watermark error, got: {err}"
491        );
492    }
493
494    #[test]
495    fn test_eowc_without_window_errors() {
496        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
497            .with_emit_strategy(EmitStrategy::OnWindowClose);
498
499        let result = config.validate(true, false);
500        assert!(result.is_err());
501        let err = result.unwrap_err().to_string();
502        assert!(
503            err.contains("windowed"),
504            "Expected windowed query error, got: {err}"
505        );
506    }
507
508    #[test]
509    fn test_eowc_with_watermark_and_window_passes() {
510        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
511            .with_emit_strategy(EmitStrategy::OnWindowClose);
512
513        assert!(config.validate(true, true).is_ok());
514    }
515
516    #[test]
517    fn test_final_without_watermark_errors() {
518        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
519            .with_emit_strategy(EmitStrategy::FinalOnly);
520
521        let result = config.validate(false, true);
522        assert!(result.is_err());
523    }
524
525    #[test]
526    fn test_non_eowc_without_watermark_ok() {
527        // OnUpdate does not require watermark
528        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
529            .with_emit_strategy(EmitStrategy::OnUpdate);
530        assert!(config.validate(false, false).is_ok());
531
532        // Periodic does not require watermark
533        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
534            .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
535        assert!(config2.validate(false, false).is_ok());
536
537        // Changelog does not require watermark
538        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
539            .with_emit_strategy(EmitStrategy::Changelog);
540        assert!(config3.validate(false, false).is_ok());
541    }
542
543    #[test]
544    fn test_display_tumbling_window() {
545        let config =
546            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
547        assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
548    }
549
550    #[test]
551    fn test_display_sliding_window() {
552        let config = WindowOperatorConfig::sliding(
553            "ts".to_string(),
554            Duration::from_secs(300),
555            Duration::from_secs(60),
556        );
557        assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
558    }
559
560    #[test]
561    fn test_display_session_window() {
562        let config =
563            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
564        assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
565    }
566
567    #[test]
568    fn test_cumulate_config() {
569        let config = WindowOperatorConfig::cumulate(
570            "ts".to_string(),
571            Duration::from_secs(60),
572            Duration::from_secs(300),
573        );
574
575        assert_eq!(config.window_type, WindowType::Cumulate);
576        assert_eq!(config.time_column, "ts");
577        assert_eq!(config.size, Duration::from_secs(300));
578        assert_eq!(config.slide, Some(Duration::from_secs(60)));
579        assert!(config.gap.is_none());
580    }
581
582    #[test]
583    fn test_cumulate_from_window_function() {
584        let window = WindowFunction::Cumulate {
585            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
586            step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
587            max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
588        };
589        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
590
591        assert_eq!(config.window_type, WindowType::Cumulate);
592        assert_eq!(config.time_column, "event_time");
593        assert_eq!(config.size, Duration::from_secs(300));
594        assert_eq!(config.slide, Some(Duration::from_secs(60)));
595    }
596
597    #[test]
598    fn test_display_cumulate_window() {
599        let config = WindowOperatorConfig::cumulate(
600            "ts".to_string(),
601            Duration::from_secs(60),
602            Duration::from_secs(300),
603        );
604        assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
605    }
606
607    #[test]
608    fn test_display_window_type() {
609        assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
610        assert_eq!(format!("{}", WindowType::Sliding), "HOP");
611        assert_eq!(format!("{}", WindowType::Session), "SESSION");
612        assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
613    }
614
615    #[test]
616    fn test_display_duration_formatting() {
617        // Hours
618        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
619        assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
620
621        // Seconds (non-round minutes)
622        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
623        assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
624
625        // Milliseconds (sub-second)
626        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
627        assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
628    }
629}