Skip to main content

laminar_sql/translator/
window_translator.rs

1//! Window operator configuration builder
2//!
3//! Translates parsed window functions and EMIT/late data clauses
4//! into complete operator configurations.
5
6use std::time::Duration;
7
8use crate::parser::{
9    EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12/// Type of window operation
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15    /// Fixed-size non-overlapping windows
16    Tumbling,
17    /// Fixed-size overlapping windows with slide
18    Sliding,
19    /// Dynamic windows based on activity gaps
20    Session,
21    /// Incrementally growing windows within fixed-size epochs
22    Cumulate,
23}
24
25/// Complete configuration for instantiating a window operator.
26///
27/// This structure holds all the information needed to create and configure
28/// a window operator in Ring 0.
29///
30/// # EMIT ON WINDOW CLOSE
31///
32/// When `emit_strategy` is `OnWindowClose` or `FinalOnly`, use [`validate()`](Self::validate)
33/// to ensure the configuration is valid. These strategies require:
34/// - A watermark definition on the source (timers are driven by watermark)
35/// - A windowed aggregation context (non-windowed queries cannot use EOWC)
36#[derive(Debug, Clone)]
37pub struct WindowOperatorConfig {
38    /// The type of window (tumbling, sliding, session)
39    pub window_type: WindowType,
40    /// The time column name used for windowing
41    pub time_column: String,
42    /// Window size (for tumbling and sliding)
43    pub size: Duration,
44    /// Slide interval for sliding windows
45    pub slide: Option<Duration>,
46    /// Gap interval for session windows
47    pub gap: Option<Duration>,
48    /// Window offset in milliseconds for timezone-aligned windows
49    pub offset_ms: i64,
50    /// Maximum allowed lateness for late events
51    pub allowed_lateness: Duration,
52    /// Emit strategy (when to output results)
53    pub emit_strategy: EmitStrategy,
54    /// Side output name for late data (if configured)
55    pub late_data_side_output: Option<String>,
56}
57
58/// Format a Duration as a human-readable string (e.g., "60s", "5m", "1h").
59fn format_duration(d: Duration) -> String {
60    let secs = d.as_secs();
61    if secs == 0 {
62        return format!("{}ms", d.as_millis());
63    }
64    if secs.is_multiple_of(3600) {
65        format!("{}h", secs / 3600)
66    } else if secs.is_multiple_of(60) {
67        format!("{}m", secs / 60)
68    } else {
69        format!("{secs}s")
70    }
71}
72
73impl std::fmt::Display for WindowType {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        match self {
76            WindowType::Tumbling => write!(f, "TUMBLE"),
77            WindowType::Sliding => write!(f, "HOP"),
78            WindowType::Session => write!(f, "SESSION"),
79            WindowType::Cumulate => write!(f, "CUMULATE"),
80        }
81    }
82}
83
84impl std::fmt::Display for WindowOperatorConfig {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self.window_type {
87            WindowType::Tumbling => {
88                write!(
89                    f,
90                    "TUMBLE({}, {})",
91                    self.time_column,
92                    format_duration(self.size)
93                )
94            }
95            WindowType::Sliding => {
96                let slide = self.slide.unwrap_or(self.size);
97                write!(
98                    f,
99                    "HOP({}, {} SLIDE {})",
100                    self.time_column,
101                    format_duration(self.size),
102                    format_duration(slide)
103                )
104            }
105            WindowType::Session => {
106                let gap = self.gap.unwrap_or(Duration::ZERO);
107                write!(
108                    f,
109                    "SESSION({}, GAP {})",
110                    self.time_column,
111                    format_duration(gap)
112                )
113            }
114            WindowType::Cumulate => {
115                let step = self.slide.unwrap_or(self.size);
116                write!(
117                    f,
118                    "CUMULATE({}, STEP {} SIZE {})",
119                    self.time_column,
120                    format_duration(step),
121                    format_duration(self.size)
122                )
123            }
124        }
125    }
126}
127
128impl WindowOperatorConfig {
129    /// Create a new tumbling window configuration.
130    #[must_use]
131    pub fn tumbling(time_column: String, size: Duration) -> Self {
132        Self {
133            window_type: WindowType::Tumbling,
134            time_column,
135            size,
136            slide: None,
137            gap: None,
138            offset_ms: 0,
139            allowed_lateness: Duration::ZERO,
140            emit_strategy: EmitStrategy::OnWatermark,
141            late_data_side_output: None,
142        }
143    }
144
145    /// Create a new sliding window configuration.
146    #[must_use]
147    pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
148        Self {
149            window_type: WindowType::Sliding,
150            time_column,
151            size,
152            slide: Some(slide),
153            gap: None,
154            offset_ms: 0,
155            allowed_lateness: Duration::ZERO,
156            emit_strategy: EmitStrategy::OnWatermark,
157            late_data_side_output: None,
158        }
159    }
160
161    /// Create a new session window configuration.
162    #[must_use]
163    pub fn session(time_column: String, gap: Duration) -> Self {
164        Self {
165            window_type: WindowType::Session,
166            time_column,
167            size: Duration::ZERO, // Not used for session windows
168            slide: None,
169            gap: Some(gap),
170            offset_ms: 0,
171            allowed_lateness: Duration::ZERO,
172            emit_strategy: EmitStrategy::OnWatermark,
173            late_data_side_output: None,
174        }
175    }
176
177    /// Create a new cumulate window configuration.
178    ///
179    /// `step` is the window growth increment and `max_size` is the epoch
180    /// size. The `slide` field is reused to store the step interval.
181    #[must_use]
182    pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
183        Self {
184            window_type: WindowType::Cumulate,
185            time_column,
186            size: max_size,
187            slide: Some(step),
188            gap: None,
189            offset_ms: 0,
190            allowed_lateness: Duration::ZERO,
191            emit_strategy: EmitStrategy::OnWatermark,
192            late_data_side_output: None,
193        }
194    }
195
196    /// Set window offset in milliseconds.
197    #[must_use]
198    pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
199        self.offset_ms = offset_ms;
200        self
201    }
202
203    /// Build configuration from a parsed `WindowFunction`.
204    ///
205    /// # Errors
206    ///
207    /// Returns `ParseError::WindowError` if:
208    /// - Time column cannot be extracted
209    /// - Interval cannot be parsed
210    pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
211        let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
212            ParseError::WindowError("Cannot extract time column name".to_string())
213        })?;
214
215        match window {
216            WindowFunction::Tumble {
217                interval, offset, ..
218            } => {
219                let size = WindowRewriter::parse_interval_to_duration(interval)?;
220                let mut config = Self::tumbling(time_column, size);
221                if let Some(ref off) = offset {
222                    let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
223                    config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
224                }
225                Ok(config)
226            }
227            WindowFunction::Hop {
228                slide_interval,
229                window_interval,
230                offset,
231                ..
232            } => {
233                let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
234                let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
235                let mut config = Self::sliding(time_column, size, slide);
236                if let Some(ref off) = offset {
237                    let off_dur = WindowRewriter::parse_interval_to_duration(off)?;
238                    config.offset_ms = i64::try_from(off_dur.as_millis()).unwrap_or(0);
239                }
240                Ok(config)
241            }
242            WindowFunction::Session { gap_interval, .. } => {
243                let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
244                Ok(Self::session(time_column, gap))
245            }
246            WindowFunction::Cumulate {
247                step_interval,
248                max_size_interval,
249                ..
250            } => {
251                let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
252                let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
253                Ok(Self::cumulate(time_column, step, max_size))
254            }
255        }
256    }
257
258    /// Apply EMIT clause configuration.
259    ///
260    /// # Errors
261    ///
262    /// Returns `ParseError::WindowError` if the emit clause cannot be converted.
263    pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
264        self.emit_strategy = emit_clause.to_emit_strategy()?;
265        Ok(self)
266    }
267
268    /// Apply late data clause configuration.
269    ///
270    /// # Errors
271    ///
272    /// Returns `ParseError::WindowError` if the allowed lateness cannot be parsed.
273    pub fn with_late_data_clause(
274        mut self,
275        late_data_clause: &LateDataClause,
276    ) -> Result<Self, ParseError> {
277        if late_data_clause.side_output.is_some() {
278            return Err(ParseError::WindowError(
279                "LATE DATA SIDE OUTPUT is not yet supported in pipeline mode; \
280                 use ALLOWED LATENESS without a side output, or omit the clause"
281                    .to_string(),
282            ));
283        }
284        self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
285        self.late_data_side_output
286            .clone_from(&late_data_clause.side_output);
287        Ok(self)
288    }
289
290    /// Set allowed lateness duration.
291    #[must_use]
292    pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
293        self.allowed_lateness = lateness;
294        self
295    }
296
297    /// Set emit strategy.
298    #[must_use]
299    pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
300        self.emit_strategy = strategy;
301        self
302    }
303
304    /// Set late data side output.
305    #[must_use]
306    pub fn with_late_data_side_output(mut self, name: String) -> Self {
307        self.late_data_side_output = Some(name);
308        self
309    }
310
311    /// Validates that the window operator configuration is used in a valid context.
312    /// Specifically, `EMIT ON WINDOW CLOSE` and `EMIT FINAL` both require a
313    /// watermark on the source *and* a windowed aggregation in the query.
314    ///
315    /// # Errors
316    ///
317    /// Returns `ParseError::WindowError` if validation fails.
318    pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
319        if matches!(
320            self.emit_strategy,
321            EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
322        ) {
323            if !has_watermark {
324                return Err(ParseError::WindowError(
325                    "EMIT ON WINDOW CLOSE requires a watermark definition \
326                     on the source. Add WATERMARK FOR <column> AS <expr> \
327                     to the CREATE SOURCE statement."
328                        .to_string(),
329                ));
330            }
331            if !has_window {
332                return Err(ParseError::WindowError(
333                    "EMIT ON WINDOW CLOSE is only valid with windowed \
334                     aggregation queries. Use EMIT ON UPDATE for \
335                     non-windowed queries."
336                        .to_string(),
337                ));
338            }
339        }
340        Ok(())
341    }
342
343    /// Check if this configuration supports append-only output.
344    ///
345    /// Append-only sinks (Kafka, S3, Delta Lake) require emit strategies
346    /// that don't produce retractions.
347    #[must_use]
348    pub fn is_append_only_compatible(&self) -> bool {
349        matches!(
350            self.emit_strategy,
351            EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
352        )
353    }
354
355    /// Check if late data handling is configured.
356    #[must_use]
357    pub fn has_late_data_handling(&self) -> bool {
358        self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use sqlparser::ast::{Expr, Ident};
366
367    fn make_tumble_window() -> WindowFunction {
368        // Create a simple tumble window for testing
369        WindowFunction::Tumble {
370            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
371            interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
372            offset: None,
373        }
374    }
375
376    #[test]
377    fn test_tumbling_config() {
378        let config =
379            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
380
381        assert_eq!(config.window_type, WindowType::Tumbling);
382        assert_eq!(config.time_column, "event_time");
383        assert_eq!(config.size, Duration::from_secs(300));
384        assert!(config.slide.is_none());
385        assert!(config.gap.is_none());
386    }
387
388    #[test]
389    fn test_sliding_config() {
390        let config = WindowOperatorConfig::sliding(
391            "ts".to_string(),
392            Duration::from_secs(300),
393            Duration::from_secs(60),
394        );
395
396        assert_eq!(config.window_type, WindowType::Sliding);
397        assert_eq!(config.size, Duration::from_secs(300));
398        assert_eq!(config.slide, Some(Duration::from_secs(60)));
399    }
400
401    #[test]
402    fn test_session_config() {
403        let config =
404            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
405
406        assert_eq!(config.window_type, WindowType::Session);
407        assert_eq!(config.gap, Some(Duration::from_secs(1800)));
408    }
409
410    #[test]
411    fn test_from_window_function() {
412        let window = make_tumble_window();
413        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
414
415        assert_eq!(config.window_type, WindowType::Tumbling);
416        assert_eq!(config.time_column, "event_time");
417        assert_eq!(config.size, Duration::from_secs(300));
418    }
419
420    #[test]
421    fn test_with_emit_clause() {
422        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
423
424        let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
425        assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
426
427        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
428        let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
429        assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
430    }
431
432    #[test]
433    fn test_with_late_data_clause_side_output_rejected() {
434        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
435
436        // Side output is not yet wired in pipeline mode — must be rejected
437        let late_clause = LateDataClause::side_output_only("late_events".to_string());
438        let result = config.with_late_data_clause(&late_clause);
439        assert!(result.is_err());
440        assert!(result
441            .unwrap_err()
442            .to_string()
443            .contains("not yet supported"));
444    }
445
446    #[test]
447    fn test_with_late_data_clause_lateness_only_accepted() {
448        use sqlparser::ast::Expr;
449
450        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
451
452        // Allowed lateness without side output is fine
453        let late_clause = LateDataClause::with_allowed_lateness(Expr::Value(
454            sqlparser::ast::Value::SingleQuotedString("5 SECONDS".to_string()).into(),
455        ));
456        let config = config.with_late_data_clause(&late_clause).unwrap();
457        assert_eq!(config.allowed_lateness, Duration::from_secs(5));
458        assert!(config.late_data_side_output.is_none());
459    }
460
461    #[test]
462    fn test_append_only_compatible() {
463        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
464
465        // Default emit strategy (OnWatermark) is append-only compatible
466        assert!(config.is_append_only_compatible());
467
468        // OnUpdate is NOT append-only compatible (produces retractions)
469        let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
470        assert!(!config2.is_append_only_compatible());
471
472        // Changelog is NOT append-only compatible
473        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
474            .with_emit_strategy(EmitStrategy::Changelog);
475        assert!(!config3.is_append_only_compatible());
476    }
477
478    #[test]
479    fn test_has_late_data_handling() {
480        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
481
482        // No late data handling by default
483        assert!(!config.has_late_data_handling());
484
485        // With allowed lateness
486        let config2 = config
487            .clone()
488            .with_allowed_lateness(Duration::from_secs(60));
489        assert!(config2.has_late_data_handling());
490
491        // With side output
492        let config3 = config.with_late_data_side_output("late".to_string());
493        assert!(config3.has_late_data_handling());
494    }
495
496    #[test]
497    fn test_eowc_without_watermark_errors() {
498        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
499            .with_emit_strategy(EmitStrategy::OnWindowClose);
500
501        let result = config.validate(false, true);
502        assert!(result.is_err());
503        let err = result.unwrap_err().to_string();
504        assert!(
505            err.contains("watermark"),
506            "Expected watermark error, got: {err}"
507        );
508    }
509
510    #[test]
511    fn test_eowc_without_window_errors() {
512        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
513            .with_emit_strategy(EmitStrategy::OnWindowClose);
514
515        let result = config.validate(true, false);
516        assert!(result.is_err());
517        let err = result.unwrap_err().to_string();
518        assert!(
519            err.contains("windowed"),
520            "Expected windowed query error, got: {err}"
521        );
522    }
523
524    #[test]
525    fn test_eowc_with_watermark_and_window_passes() {
526        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
527            .with_emit_strategy(EmitStrategy::OnWindowClose);
528
529        assert!(config.validate(true, true).is_ok());
530    }
531
532    #[test]
533    fn test_final_without_watermark_errors() {
534        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
535            .with_emit_strategy(EmitStrategy::FinalOnly);
536
537        let result = config.validate(false, true);
538        assert!(result.is_err());
539    }
540
541    #[test]
542    fn test_non_eowc_without_watermark_ok() {
543        // OnUpdate does not require watermark
544        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
545            .with_emit_strategy(EmitStrategy::OnUpdate);
546        assert!(config.validate(false, false).is_ok());
547
548        // Periodic does not require watermark
549        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
550            .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
551        assert!(config2.validate(false, false).is_ok());
552
553        // Changelog does not require watermark
554        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
555            .with_emit_strategy(EmitStrategy::Changelog);
556        assert!(config3.validate(false, false).is_ok());
557    }
558
559    #[test]
560    fn test_display_tumbling_window() {
561        let config =
562            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
563        assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
564    }
565
566    #[test]
567    fn test_display_sliding_window() {
568        let config = WindowOperatorConfig::sliding(
569            "ts".to_string(),
570            Duration::from_secs(300),
571            Duration::from_secs(60),
572        );
573        assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
574    }
575
576    #[test]
577    fn test_display_session_window() {
578        let config =
579            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
580        assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
581    }
582
583    #[test]
584    fn test_cumulate_config() {
585        let config = WindowOperatorConfig::cumulate(
586            "ts".to_string(),
587            Duration::from_secs(60),
588            Duration::from_secs(300),
589        );
590
591        assert_eq!(config.window_type, WindowType::Cumulate);
592        assert_eq!(config.time_column, "ts");
593        assert_eq!(config.size, Duration::from_secs(300));
594        assert_eq!(config.slide, Some(Duration::from_secs(60)));
595        assert!(config.gap.is_none());
596    }
597
598    #[test]
599    fn test_cumulate_from_window_function() {
600        let window = WindowFunction::Cumulate {
601            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
602            step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
603            max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
604        };
605        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
606
607        assert_eq!(config.window_type, WindowType::Cumulate);
608        assert_eq!(config.time_column, "event_time");
609        assert_eq!(config.size, Duration::from_secs(300));
610        assert_eq!(config.slide, Some(Duration::from_secs(60)));
611    }
612
613    #[test]
614    fn test_display_cumulate_window() {
615        let config = WindowOperatorConfig::cumulate(
616            "ts".to_string(),
617            Duration::from_secs(60),
618            Duration::from_secs(300),
619        );
620        assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
621    }
622
623    #[test]
624    fn test_display_window_type() {
625        assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
626        assert_eq!(format!("{}", WindowType::Sliding), "HOP");
627        assert_eq!(format!("{}", WindowType::Session), "SESSION");
628        assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
629    }
630
631    #[test]
632    fn test_display_duration_formatting() {
633        // Hours
634        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
635        assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
636
637        // Seconds (non-round minutes)
638        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
639        assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
640
641        // Milliseconds (sub-second)
642        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
643        assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
644    }
645}