Skip to main content

laminar_sql/translator/
window_translator.rs

1//! Window operator configuration builder
2//!
3//! Translates parsed window functions and EMIT/late data clauses
4//! into complete operator configurations.
5
6use std::time::Duration;
7
8use crate::parser::{
9    EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12/// Type of window operation
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15    /// Fixed-size non-overlapping windows
16    Tumbling,
17    /// Fixed-size overlapping windows with slide
18    Sliding,
19    /// Dynamic windows based on activity gaps
20    Session,
21    /// Incrementally growing windows within fixed-size epochs
22    Cumulate,
23}
24
25/// Complete configuration for instantiating a window operator.
26///
27/// This structure holds all the information needed to create and configure
28/// a window operator in Ring 0.
29///
30/// # EMIT ON WINDOW CLOSE
31///
32/// When `emit_strategy` is `OnWindowClose` or `FinalOnly`, use [`validate()`](Self::validate)
33/// to ensure the configuration is valid. These strategies require:
34/// - A watermark definition on the source (timers are driven by watermark)
35/// - A windowed aggregation context (non-windowed queries cannot use EOWC)
36#[derive(Debug, Clone)]
37pub struct WindowOperatorConfig {
38    /// The type of window (tumbling, sliding, session)
39    pub window_type: WindowType,
40    /// The time column name used for windowing
41    pub time_column: String,
42    /// Window size (for tumbling and sliding)
43    pub size: Duration,
44    /// Slide interval for sliding windows
45    pub slide: Option<Duration>,
46    /// Gap interval for session windows
47    pub gap: Option<Duration>,
48    /// Window offset in milliseconds for timezone-aligned windows
49    pub offset_ms: i64,
50    /// Maximum allowed lateness for late events
51    pub allowed_lateness: Duration,
52    /// Emit strategy (when to output results)
53    pub emit_strategy: EmitStrategy,
54    /// Side output name for late data (if configured)
55    pub late_data_side_output: Option<String>,
56}
57
58/// Format a Duration as a human-readable string (e.g., "60s", "5m", "1h").
59fn format_duration(d: Duration) -> String {
60    let secs = d.as_secs();
61    if secs == 0 {
62        return format!("{}ms", d.as_millis());
63    }
64    if secs.is_multiple_of(3600) {
65        format!("{}h", secs / 3600)
66    } else if secs.is_multiple_of(60) {
67        format!("{}m", secs / 60)
68    } else {
69        format!("{secs}s")
70    }
71}
72
73impl std::fmt::Display for WindowType {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            WindowType::Tumbling => write!(f, "TUMBLE"),
77            WindowType::Sliding => write!(f, "HOP"),
78            WindowType::Session => write!(f, "SESSION"),
79            WindowType::Cumulate => write!(f, "CUMULATE"),
80        }
81    }
82}
83
84impl std::fmt::Display for WindowOperatorConfig {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self.window_type {
87            WindowType::Tumbling => {
88                write!(
89                    f,
90                    "TUMBLE({}, {})",
91                    self.time_column,
92                    format_duration(self.size)
93                )
94            }
95            WindowType::Sliding => {
96                let slide = self.slide.unwrap_or(self.size);
97                write!(
98                    f,
99                    "HOP({}, {} SLIDE {})",
100                    self.time_column,
101                    format_duration(self.size),
102                    format_duration(slide)
103                )
104            }
105            WindowType::Session => {
106                let gap = self.gap.unwrap_or(Duration::ZERO);
107                write!(
108                    f,
109                    "SESSION({}, GAP {})",
110                    self.time_column,
111                    format_duration(gap)
112                )
113            }
114            WindowType::Cumulate => {
115                let step = self.slide.unwrap_or(self.size);
116                write!(
117                    f,
118                    "CUMULATE({}, STEP {} SIZE {})",
119                    self.time_column,
120                    format_duration(step),
121                    format_duration(self.size)
122                )
123            }
124        }
125    }
126}
127
128impl WindowOperatorConfig {
129    /// Create a new tumbling window configuration.
130    #[must_use]
131    pub fn tumbling(time_column: String, size: Duration) -> Self {
132        Self {
133            window_type: WindowType::Tumbling,
134            time_column,
135            size,
136            slide: None,
137            gap: None,
138            offset_ms: 0,
139            allowed_lateness: Duration::ZERO,
140            emit_strategy: EmitStrategy::OnWatermark,
141            late_data_side_output: None,
142        }
143    }
144
145    /// Create a new sliding window configuration.
146    #[must_use]
147    pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
148        Self {
149            window_type: WindowType::Sliding,
150            time_column,
151            size,
152            slide: Some(slide),
153            gap: None,
154            offset_ms: 0,
155            allowed_lateness: Duration::ZERO,
156            emit_strategy: EmitStrategy::OnWatermark,
157            late_data_side_output: None,
158        }
159    }
160
161    /// Create a new session window configuration.
162    #[must_use]
163    pub fn session(time_column: String, gap: Duration) -> Self {
164        Self {
165            window_type: WindowType::Session,
166            time_column,
167            size: Duration::ZERO, // Not used for session windows
168            slide: None,
169            gap: Some(gap),
170            offset_ms: 0,
171            allowed_lateness: Duration::ZERO,
172            emit_strategy: EmitStrategy::OnWatermark,
173            late_data_side_output: None,
174        }
175    }
176
177    /// Create a new cumulate window configuration.
178    ///
179    /// `step` is the window growth increment and `max_size` is the epoch
180    /// size. The `slide` field is reused to store the step interval.
181    #[must_use]
182    pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
183        Self {
184            window_type: WindowType::Cumulate,
185            time_column,
186            size: max_size,
187            slide: Some(step),
188            gap: None,
189            offset_ms: 0,
190            allowed_lateness: Duration::ZERO,
191            emit_strategy: EmitStrategy::OnWatermark,
192            late_data_side_output: None,
193        }
194    }
195
196    /// Set window offset in milliseconds.
197    #[must_use]
198    pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
199        self.offset_ms = offset_ms;
200        self
201    }
202
203    /// Build configuration from a parsed `WindowFunction`.
204    ///
205    /// # Errors
206    ///
207    /// Returns `ParseError::WindowError` if:
208    /// - Time column cannot be extracted
209    /// - Interval cannot be parsed
210    pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
211        let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
212            ParseError::WindowError("Cannot extract time column name".to_string())
213        })?;
214
215        match window {
216            WindowFunction::Tumble {
217                interval, offset, ..
218            } => {
219                let size = WindowRewriter::parse_interval_to_duration(interval)?;
220                let mut config = Self::tumbling(time_column, size);
221                if let Some(ref off) = offset {
222                    let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
223                    config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
224                }
225                Ok(config)
226            }
227            WindowFunction::Hop {
228                slide_interval,
229                window_interval,
230                offset,
231                ..
232            } => {
233                let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
234                let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
235                let mut config = Self::sliding(time_column, size, slide);
236                if let Some(ref off) = offset {
237                    let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
238                    config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
239                }
240                Ok(config)
241            }
242            WindowFunction::Session { gap_interval, .. } => {
243                let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
244                Ok(Self::session(time_column, gap))
245            }
246            WindowFunction::Cumulate {
247                step_interval,
248                max_size_interval,
249                ..
250            } => {
251                let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
252                let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
253                Ok(Self::cumulate(time_column, step, max_size))
254            }
255        }
256    }
257
258    /// Apply EMIT clause configuration.
259    ///
260    /// # Errors
261    ///
262    /// Returns `ParseError::WindowError` if the emit clause cannot be converted.
263    pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
264        self.emit_strategy = emit_clause.to_emit_strategy()?;
265        Ok(self)
266    }
267
268    /// Apply late data clause configuration.
269    ///
270    /// # Errors
271    ///
272    /// Returns `ParseError::WindowError` if the allowed lateness cannot be parsed.
273    pub fn with_late_data_clause(
274        mut self,
275        late_data_clause: &LateDataClause,
276    ) -> Result<Self, ParseError> {
277        if late_data_clause.side_output.is_some() {
278            return Err(ParseError::WindowError(
279                "LATE DATA SIDE OUTPUT is not yet supported in pipeline mode; \
280                 use ALLOWED LATENESS without a side output, or omit the clause"
281                    .to_string(),
282            ));
283        }
284        self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
285        self.late_data_side_output
286            .clone_from(&late_data_clause.side_output);
287        Ok(self)
288    }
289
290    /// Set allowed lateness duration.
291    #[must_use]
292    pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
293        self.allowed_lateness = lateness;
294        self
295    }
296
297    /// Set emit strategy.
298    #[must_use]
299    pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
300        self.emit_strategy = strategy;
301        self
302    }
303
304    /// Set late data side output.
305    #[must_use]
306    pub fn with_late_data_side_output(mut self, name: String) -> Self {
307        self.late_data_side_output = Some(name);
308        self
309    }
310
311    /// Validates that the window operator configuration is used in a valid context.
312    ///
313    /// Checks:
314    /// - `EMIT ON WINDOW CLOSE` and `EMIT FINAL` require a watermark on the source
315    /// - `EMIT ON WINDOW CLOSE` and `EMIT FINAL` require a windowed aggregation
316    ///
317    /// # Arguments
318    ///
319    /// * `has_watermark` - Whether the source has a watermark definition
320    /// * `has_window` - Whether the query contains a windowed aggregation
321    ///
322    /// # Errors
323    ///
324    /// Returns `ParseError::WindowError` if validation fails.
325    pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
326        if matches!(
327            self.emit_strategy,
328            EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
329        ) {
330            if !has_watermark {
331                return Err(ParseError::WindowError(
332                    "EMIT ON WINDOW CLOSE requires a watermark definition \
333                     on the source. Add WATERMARK FOR <column> AS <expr> \
334                     to the CREATE SOURCE statement."
335                        .to_string(),
336                ));
337            }
338            if !has_window {
339                return Err(ParseError::WindowError(
340                    "EMIT ON WINDOW CLOSE is only valid with windowed \
341                     aggregation queries. Use EMIT ON UPDATE for \
342                     non-windowed queries."
343                        .to_string(),
344                ));
345            }
346        }
347        Ok(())
348    }
349
350    /// Check if this configuration supports append-only output.
351    ///
352    /// Append-only sinks (Kafka, S3, Delta Lake) require emit strategies
353    /// that don't produce retractions.
354    #[must_use]
355    pub fn is_append_only_compatible(&self) -> bool {
356        matches!(
357            self.emit_strategy,
358            EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
359        )
360    }
361
362    /// Check if late data handling is configured.
363    #[must_use]
364    pub fn has_late_data_handling(&self) -> bool {
365        self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    use sqlparser::ast::{Expr, Ident};
373
374    fn make_tumble_window() -> WindowFunction {
375        // Create a simple tumble window for testing
376        WindowFunction::Tumble {
377            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
378            interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
379            offset: None,
380        }
381    }
382
383    #[test]
384    fn test_tumbling_config() {
385        let config =
386            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
387
388        assert_eq!(config.window_type, WindowType::Tumbling);
389        assert_eq!(config.time_column, "event_time");
390        assert_eq!(config.size, Duration::from_secs(300));
391        assert!(config.slide.is_none());
392        assert!(config.gap.is_none());
393    }
394
395    #[test]
396    fn test_sliding_config() {
397        let config = WindowOperatorConfig::sliding(
398            "ts".to_string(),
399            Duration::from_secs(300),
400            Duration::from_secs(60),
401        );
402
403        assert_eq!(config.window_type, WindowType::Sliding);
404        assert_eq!(config.size, Duration::from_secs(300));
405        assert_eq!(config.slide, Some(Duration::from_secs(60)));
406    }
407
408    #[test]
409    fn test_session_config() {
410        let config =
411            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
412
413        assert_eq!(config.window_type, WindowType::Session);
414        assert_eq!(config.gap, Some(Duration::from_secs(1800)));
415    }
416
417    #[test]
418    fn test_from_window_function() {
419        let window = make_tumble_window();
420        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
421
422        assert_eq!(config.window_type, WindowType::Tumbling);
423        assert_eq!(config.time_column, "event_time");
424        assert_eq!(config.size, Duration::from_secs(300));
425    }
426
427    #[test]
428    fn test_with_emit_clause() {
429        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
430
431        let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
432        assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
433
434        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
435        let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
436        assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
437    }
438
439    #[test]
440    fn test_with_late_data_clause_side_output_rejected() {
441        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
442
443        // Side output is not yet wired in pipeline mode — must be rejected
444        let late_clause = LateDataClause::side_output_only("late_events".to_string());
445        let result = config.with_late_data_clause(&late_clause);
446        assert!(result.is_err());
447        assert!(result
448            .unwrap_err()
449            .to_string()
450            .contains("not yet supported"));
451    }
452
453    #[test]
454    fn test_with_late_data_clause_lateness_only_accepted() {
455        use sqlparser::ast::Expr;
456
457        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
458
459        // Allowed lateness without side output is fine
460        let late_clause = LateDataClause::with_allowed_lateness(Expr::Value(
461            sqlparser::ast::Value::SingleQuotedString("5 SECONDS".to_string()).into(),
462        ));
463        let config = config.with_late_data_clause(&late_clause).unwrap();
464        assert_eq!(config.allowed_lateness, Duration::from_secs(5));
465        assert!(config.late_data_side_output.is_none());
466    }
467
468    #[test]
469    fn test_append_only_compatible() {
470        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
471
472        // Default emit strategy (OnWatermark) is append-only compatible
473        assert!(config.is_append_only_compatible());
474
475        // OnUpdate is NOT append-only compatible (produces retractions)
476        let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
477        assert!(!config2.is_append_only_compatible());
478
479        // Changelog is NOT append-only compatible
480        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
481            .with_emit_strategy(EmitStrategy::Changelog);
482        assert!(!config3.is_append_only_compatible());
483    }
484
485    #[test]
486    fn test_has_late_data_handling() {
487        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
488
489        // No late data handling by default
490        assert!(!config.has_late_data_handling());
491
492        // With allowed lateness
493        let config2 = config
494            .clone()
495            .with_allowed_lateness(Duration::from_secs(60));
496        assert!(config2.has_late_data_handling());
497
498        // With side output
499        let config3 = config.with_late_data_side_output("late".to_string());
500        assert!(config3.has_late_data_handling());
501    }
502
503    #[test]
504    fn test_eowc_without_watermark_errors() {
505        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
506            .with_emit_strategy(EmitStrategy::OnWindowClose);
507
508        let result = config.validate(false, true);
509        assert!(result.is_err());
510        let err = result.unwrap_err().to_string();
511        assert!(
512            err.contains("watermark"),
513            "Expected watermark error, got: {err}"
514        );
515    }
516
517    #[test]
518    fn test_eowc_without_window_errors() {
519        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
520            .with_emit_strategy(EmitStrategy::OnWindowClose);
521
522        let result = config.validate(true, false);
523        assert!(result.is_err());
524        let err = result.unwrap_err().to_string();
525        assert!(
526            err.contains("windowed"),
527            "Expected windowed query error, got: {err}"
528        );
529    }
530
531    #[test]
532    fn test_eowc_with_watermark_and_window_passes() {
533        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
534            .with_emit_strategy(EmitStrategy::OnWindowClose);
535
536        assert!(config.validate(true, true).is_ok());
537    }
538
539    #[test]
540    fn test_final_without_watermark_errors() {
541        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
542            .with_emit_strategy(EmitStrategy::FinalOnly);
543
544        let result = config.validate(false, true);
545        assert!(result.is_err());
546    }
547
548    #[test]
549    fn test_non_eowc_without_watermark_ok() {
550        // OnUpdate does not require watermark
551        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
552            .with_emit_strategy(EmitStrategy::OnUpdate);
553        assert!(config.validate(false, false).is_ok());
554
555        // Periodic does not require watermark
556        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
557            .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
558        assert!(config2.validate(false, false).is_ok());
559
560        // Changelog does not require watermark
561        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
562            .with_emit_strategy(EmitStrategy::Changelog);
563        assert!(config3.validate(false, false).is_ok());
564    }
565
566    #[test]
567    fn test_display_tumbling_window() {
568        let config =
569            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
570        assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
571    }
572
573    #[test]
574    fn test_display_sliding_window() {
575        let config = WindowOperatorConfig::sliding(
576            "ts".to_string(),
577            Duration::from_secs(300),
578            Duration::from_secs(60),
579        );
580        assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
581    }
582
583    #[test]
584    fn test_display_session_window() {
585        let config =
586            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
587        assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
588    }
589
590    #[test]
591    fn test_cumulate_config() {
592        let config = WindowOperatorConfig::cumulate(
593            "ts".to_string(),
594            Duration::from_secs(60),
595            Duration::from_secs(300),
596        );
597
598        assert_eq!(config.window_type, WindowType::Cumulate);
599        assert_eq!(config.time_column, "ts");
600        assert_eq!(config.size, Duration::from_secs(300));
601        assert_eq!(config.slide, Some(Duration::from_secs(60)));
602        assert!(config.gap.is_none());
603    }
604
605    #[test]
606    fn test_cumulate_from_window_function() {
607        let window = WindowFunction::Cumulate {
608            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
609            step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
610            max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
611        };
612        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
613
614        assert_eq!(config.window_type, WindowType::Cumulate);
615        assert_eq!(config.time_column, "event_time");
616        assert_eq!(config.size, Duration::from_secs(300));
617        assert_eq!(config.slide, Some(Duration::from_secs(60)));
618    }
619
620    #[test]
621    fn test_display_cumulate_window() {
622        let config = WindowOperatorConfig::cumulate(
623            "ts".to_string(),
624            Duration::from_secs(60),
625            Duration::from_secs(300),
626        );
627        assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
628    }
629
630    #[test]
631    fn test_display_window_type() {
632        assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
633        assert_eq!(format!("{}", WindowType::Sliding), "HOP");
634        assert_eq!(format!("{}", WindowType::Session), "SESSION");
635        assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
636    }
637
638    #[test]
639    fn test_display_duration_formatting() {
640        // Hours
641        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
642        assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
643
644        // Seconds (non-round minutes)
645        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
646        assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
647
648        // Milliseconds (sub-second)
649        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
650        assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
651    }
652}