Skip to main content

laminar_sql/translator/
window_translator.rs

1//! Window operator configuration builder
2//!
3//! Translates parsed window functions and EMIT/late data clauses
4//! into complete operator configurations.
5
6use std::time::Duration;
7
8use crate::parser::{
9    EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12/// Type of window operation
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15    /// Fixed-size non-overlapping windows
16    Tumbling,
17    /// Fixed-size overlapping windows with slide
18    Sliding,
19    /// Dynamic windows based on activity gaps
20    Session,
21    /// Incrementally growing windows within fixed-size epochs
22    Cumulate,
23}
24
25/// Complete configuration for instantiating a window operator.
26///
27/// This structure holds all the information needed to create and configure
28/// a window operator in Ring 0.
29///
30/// # EMIT ON WINDOW CLOSE
31///
32/// When `emit_strategy` is `OnWindowClose` or `FinalOnly`, use [`validate()`](Self::validate)
33/// to ensure the configuration is valid. These strategies require:
34/// - A watermark definition on the source (timers are driven by watermark)
35/// - A windowed aggregation context (non-windowed queries cannot use EOWC)
36#[derive(Debug, Clone)]
37pub struct WindowOperatorConfig {
38    /// The type of window (tumbling, sliding, session)
39    pub window_type: WindowType,
40    /// The time column name used for windowing
41    pub time_column: String,
42    /// Window size (for tumbling and sliding)
43    pub size: Duration,
44    /// Slide interval for sliding windows
45    pub slide: Option<Duration>,
46    /// Gap interval for session windows
47    pub gap: Option<Duration>,
48    /// Maximum allowed lateness for late events
49    pub allowed_lateness: Duration,
50    /// Emit strategy (when to output results)
51    pub emit_strategy: EmitStrategy,
52    /// Side output name for late data (if configured)
53    pub late_data_side_output: Option<String>,
54}
55
56/// Format a Duration as a human-readable string (e.g., "60s", "5m", "1h").
57fn format_duration(d: Duration) -> String {
58    let secs = d.as_secs();
59    if secs == 0 {
60        return format!("{}ms", d.as_millis());
61    }
62    if secs.is_multiple_of(3600) {
63        format!("{}h", secs / 3600)
64    } else if secs.is_multiple_of(60) {
65        format!("{}m", secs / 60)
66    } else {
67        format!("{secs}s")
68    }
69}
70
71impl std::fmt::Display for WindowType {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        match self {
74            WindowType::Tumbling => write!(f, "TUMBLE"),
75            WindowType::Sliding => write!(f, "HOP"),
76            WindowType::Session => write!(f, "SESSION"),
77            WindowType::Cumulate => write!(f, "CUMULATE"),
78        }
79    }
80}
81
82impl std::fmt::Display for WindowOperatorConfig {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        match self.window_type {
85            WindowType::Tumbling => {
86                write!(
87                    f,
88                    "TUMBLE({}, {})",
89                    self.time_column,
90                    format_duration(self.size)
91                )
92            }
93            WindowType::Sliding => {
94                let slide = self.slide.unwrap_or(self.size);
95                write!(
96                    f,
97                    "HOP({}, {} SLIDE {})",
98                    self.time_column,
99                    format_duration(self.size),
100                    format_duration(slide)
101                )
102            }
103            WindowType::Session => {
104                let gap = self.gap.unwrap_or(Duration::ZERO);
105                write!(
106                    f,
107                    "SESSION({}, GAP {})",
108                    self.time_column,
109                    format_duration(gap)
110                )
111            }
112            WindowType::Cumulate => {
113                let step = self.slide.unwrap_or(self.size);
114                write!(
115                    f,
116                    "CUMULATE({}, STEP {} SIZE {})",
117                    self.time_column,
118                    format_duration(step),
119                    format_duration(self.size)
120                )
121            }
122        }
123    }
124}
125
126impl WindowOperatorConfig {
127    /// Create a new tumbling window configuration.
128    #[must_use]
129    pub fn tumbling(time_column: String, size: Duration) -> Self {
130        Self {
131            window_type: WindowType::Tumbling,
132            time_column,
133            size,
134            slide: None,
135            gap: None,
136            allowed_lateness: Duration::ZERO,
137            emit_strategy: EmitStrategy::OnWatermark,
138            late_data_side_output: None,
139        }
140    }
141
142    /// Create a new sliding window configuration.
143    #[must_use]
144    pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
145        Self {
146            window_type: WindowType::Sliding,
147            time_column,
148            size,
149            slide: Some(slide),
150            gap: None,
151            allowed_lateness: Duration::ZERO,
152            emit_strategy: EmitStrategy::OnWatermark,
153            late_data_side_output: None,
154        }
155    }
156
157    /// Create a new session window configuration.
158    #[must_use]
159    pub fn session(time_column: String, gap: Duration) -> Self {
160        Self {
161            window_type: WindowType::Session,
162            time_column,
163            size: Duration::ZERO, // Not used for session windows
164            slide: None,
165            gap: Some(gap),
166            allowed_lateness: Duration::ZERO,
167            emit_strategy: EmitStrategy::OnWatermark,
168            late_data_side_output: None,
169        }
170    }
171
172    /// Create a new cumulate window configuration.
173    ///
174    /// `step` is the window growth increment and `max_size` is the epoch
175    /// size. The `slide` field is reused to store the step interval.
176    #[must_use]
177    pub fn cumulate(time_column: String, step: Duration, max_size: Duration) -> Self {
178        Self {
179            window_type: WindowType::Cumulate,
180            time_column,
181            size: max_size,
182            slide: Some(step),
183            gap: None,
184            allowed_lateness: Duration::ZERO,
185            emit_strategy: EmitStrategy::OnWatermark,
186            late_data_side_output: None,
187        }
188    }
189
190    /// Build configuration from a parsed `WindowFunction`.
191    ///
192    /// # Errors
193    ///
194    /// Returns `ParseError::WindowError` if:
195    /// - Time column cannot be extracted
196    /// - Interval cannot be parsed
197    pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
198        let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
199            ParseError::WindowError("Cannot extract time column name".to_string())
200        })?;
201
202        match window {
203            WindowFunction::Tumble { interval, .. } => {
204                let size = WindowRewriter::parse_interval_to_duration(interval)?;
205                Ok(Self::tumbling(time_column, size))
206            }
207            WindowFunction::Hop {
208                slide_interval,
209                window_interval,
210                ..
211            } => {
212                let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
213                let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
214                Ok(Self::sliding(time_column, size, slide))
215            }
216            WindowFunction::Session { gap_interval, .. } => {
217                let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
218                Ok(Self::session(time_column, gap))
219            }
220            WindowFunction::Cumulate {
221                step_interval,
222                max_size_interval,
223                ..
224            } => {
225                let step = WindowRewriter::parse_interval_to_duration(step_interval)?;
226                let max_size = WindowRewriter::parse_interval_to_duration(max_size_interval)?;
227                Ok(Self::cumulate(time_column, step, max_size))
228            }
229        }
230    }
231
232    /// Apply EMIT clause configuration.
233    ///
234    /// # Errors
235    ///
236    /// Returns `ParseError::WindowError` if the emit clause cannot be converted.
237    pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
238        self.emit_strategy = emit_clause.to_emit_strategy()?;
239        Ok(self)
240    }
241
242    /// Apply late data clause configuration.
243    ///
244    /// # Errors
245    ///
246    /// Returns `ParseError::WindowError` if the allowed lateness cannot be parsed.
247    pub fn with_late_data_clause(
248        mut self,
249        late_data_clause: &LateDataClause,
250    ) -> Result<Self, ParseError> {
251        self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
252        self.late_data_side_output
253            .clone_from(&late_data_clause.side_output);
254        Ok(self)
255    }
256
257    /// Set allowed lateness duration.
258    #[must_use]
259    pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
260        self.allowed_lateness = lateness;
261        self
262    }
263
264    /// Set emit strategy.
265    #[must_use]
266    pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
267        self.emit_strategy = strategy;
268        self
269    }
270
271    /// Set late data side output.
272    #[must_use]
273    pub fn with_late_data_side_output(mut self, name: String) -> Self {
274        self.late_data_side_output = Some(name);
275        self
276    }
277
278    /// Validates that the window operator configuration is used in a valid context.
279    ///
280    /// Checks:
281    /// - `EMIT ON WINDOW CLOSE` and `EMIT FINAL` require a watermark on the source
282    /// - `EMIT ON WINDOW CLOSE` and `EMIT FINAL` require a windowed aggregation
283    ///
284    /// # Arguments
285    ///
286    /// * `has_watermark` - Whether the source has a watermark definition
287    /// * `has_window` - Whether the query contains a windowed aggregation
288    ///
289    /// # Errors
290    ///
291    /// Returns `ParseError::WindowError` if validation fails.
292    pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
293        if matches!(
294            self.emit_strategy,
295            EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
296        ) {
297            if !has_watermark {
298                return Err(ParseError::WindowError(
299                    "EMIT ON WINDOW CLOSE requires a watermark definition \
300                     on the source. Add WATERMARK FOR <column> AS <expr> \
301                     to the CREATE SOURCE statement."
302                        .to_string(),
303                ));
304            }
305            if !has_window {
306                return Err(ParseError::WindowError(
307                    "EMIT ON WINDOW CLOSE is only valid with windowed \
308                     aggregation queries. Use EMIT ON UPDATE for \
309                     non-windowed queries."
310                        .to_string(),
311                ));
312            }
313        }
314        Ok(())
315    }
316
317    /// Check if this configuration supports append-only output.
318    ///
319    /// Append-only sinks (Kafka, S3, Delta Lake) require emit strategies
320    /// that don't produce retractions.
321    #[must_use]
322    pub fn is_append_only_compatible(&self) -> bool {
323        matches!(
324            self.emit_strategy,
325            EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
326        )
327    }
328
329    /// Check if late data handling is configured.
330    #[must_use]
331    pub fn has_late_data_handling(&self) -> bool {
332        self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use sqlparser::ast::{Expr, Ident};
340
341    fn make_tumble_window() -> WindowFunction {
342        // Create a simple tumble window for testing
343        WindowFunction::Tumble {
344            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
345            interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
346        }
347    }
348
349    #[test]
350    fn test_tumbling_config() {
351        let config =
352            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
353
354        assert_eq!(config.window_type, WindowType::Tumbling);
355        assert_eq!(config.time_column, "event_time");
356        assert_eq!(config.size, Duration::from_secs(300));
357        assert!(config.slide.is_none());
358        assert!(config.gap.is_none());
359    }
360
361    #[test]
362    fn test_sliding_config() {
363        let config = WindowOperatorConfig::sliding(
364            "ts".to_string(),
365            Duration::from_secs(300),
366            Duration::from_secs(60),
367        );
368
369        assert_eq!(config.window_type, WindowType::Sliding);
370        assert_eq!(config.size, Duration::from_secs(300));
371        assert_eq!(config.slide, Some(Duration::from_secs(60)));
372    }
373
374    #[test]
375    fn test_session_config() {
376        let config =
377            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
378
379        assert_eq!(config.window_type, WindowType::Session);
380        assert_eq!(config.gap, Some(Duration::from_secs(1800)));
381    }
382
383    #[test]
384    fn test_from_window_function() {
385        let window = make_tumble_window();
386        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
387
388        assert_eq!(config.window_type, WindowType::Tumbling);
389        assert_eq!(config.time_column, "event_time");
390        assert_eq!(config.size, Duration::from_secs(300));
391    }
392
393    #[test]
394    fn test_with_emit_clause() {
395        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
396
397        let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
398        assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
399
400        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
401        let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
402        assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
403    }
404
405    #[test]
406    fn test_with_late_data_clause() {
407        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
408
409        let late_clause = LateDataClause::side_output_only("late_events".to_string());
410        let config = config.with_late_data_clause(&late_clause).unwrap();
411
412        assert_eq!(
413            config.late_data_side_output,
414            Some("late_events".to_string())
415        );
416    }
417
418    #[test]
419    fn test_append_only_compatible() {
420        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
421
422        // Default emit strategy (OnWatermark) is append-only compatible
423        assert!(config.is_append_only_compatible());
424
425        // OnUpdate is NOT append-only compatible (produces retractions)
426        let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
427        assert!(!config2.is_append_only_compatible());
428
429        // Changelog is NOT append-only compatible
430        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
431            .with_emit_strategy(EmitStrategy::Changelog);
432        assert!(!config3.is_append_only_compatible());
433    }
434
435    #[test]
436    fn test_has_late_data_handling() {
437        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
438
439        // No late data handling by default
440        assert!(!config.has_late_data_handling());
441
442        // With allowed lateness
443        let config2 = config
444            .clone()
445            .with_allowed_lateness(Duration::from_secs(60));
446        assert!(config2.has_late_data_handling());
447
448        // With side output
449        let config3 = config.with_late_data_side_output("late".to_string());
450        assert!(config3.has_late_data_handling());
451    }
452
453    #[test]
454    fn test_eowc_without_watermark_errors() {
455        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
456            .with_emit_strategy(EmitStrategy::OnWindowClose);
457
458        let result = config.validate(false, true);
459        assert!(result.is_err());
460        let err = result.unwrap_err().to_string();
461        assert!(
462            err.contains("watermark"),
463            "Expected watermark error, got: {err}"
464        );
465    }
466
467    #[test]
468    fn test_eowc_without_window_errors() {
469        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
470            .with_emit_strategy(EmitStrategy::OnWindowClose);
471
472        let result = config.validate(true, false);
473        assert!(result.is_err());
474        let err = result.unwrap_err().to_string();
475        assert!(
476            err.contains("windowed"),
477            "Expected windowed query error, got: {err}"
478        );
479    }
480
481    #[test]
482    fn test_eowc_with_watermark_and_window_passes() {
483        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
484            .with_emit_strategy(EmitStrategy::OnWindowClose);
485
486        assert!(config.validate(true, true).is_ok());
487    }
488
489    #[test]
490    fn test_final_without_watermark_errors() {
491        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
492            .with_emit_strategy(EmitStrategy::FinalOnly);
493
494        let result = config.validate(false, true);
495        assert!(result.is_err());
496    }
497
498    #[test]
499    fn test_non_eowc_without_watermark_ok() {
500        // OnUpdate does not require watermark
501        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
502            .with_emit_strategy(EmitStrategy::OnUpdate);
503        assert!(config.validate(false, false).is_ok());
504
505        // Periodic does not require watermark
506        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
507            .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
508        assert!(config2.validate(false, false).is_ok());
509
510        // Changelog does not require watermark
511        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
512            .with_emit_strategy(EmitStrategy::Changelog);
513        assert!(config3.validate(false, false).is_ok());
514    }
515
516    #[test]
517    fn test_display_tumbling_window() {
518        let config =
519            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
520        assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
521    }
522
523    #[test]
524    fn test_display_sliding_window() {
525        let config = WindowOperatorConfig::sliding(
526            "ts".to_string(),
527            Duration::from_secs(300),
528            Duration::from_secs(60),
529        );
530        assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
531    }
532
533    #[test]
534    fn test_display_session_window() {
535        let config =
536            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
537        assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
538    }
539
540    #[test]
541    fn test_cumulate_config() {
542        let config = WindowOperatorConfig::cumulate(
543            "ts".to_string(),
544            Duration::from_secs(60),
545            Duration::from_secs(300),
546        );
547
548        assert_eq!(config.window_type, WindowType::Cumulate);
549        assert_eq!(config.time_column, "ts");
550        assert_eq!(config.size, Duration::from_secs(300));
551        assert_eq!(config.slide, Some(Duration::from_secs(60)));
552        assert!(config.gap.is_none());
553    }
554
555    #[test]
556    fn test_cumulate_from_window_function() {
557        let window = WindowFunction::Cumulate {
558            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
559            step_interval: Box::new(Expr::Identifier(Ident::new("1 MINUTE"))),
560            max_size_interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
561        };
562        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
563
564        assert_eq!(config.window_type, WindowType::Cumulate);
565        assert_eq!(config.time_column, "event_time");
566        assert_eq!(config.size, Duration::from_secs(300));
567        assert_eq!(config.slide, Some(Duration::from_secs(60)));
568    }
569
570    #[test]
571    fn test_display_cumulate_window() {
572        let config = WindowOperatorConfig::cumulate(
573            "ts".to_string(),
574            Duration::from_secs(60),
575            Duration::from_secs(300),
576        );
577        assert_eq!(format!("{config}"), "CUMULATE(ts, STEP 1m SIZE 5m)");
578    }
579
580    #[test]
581    fn test_display_window_type() {
582        assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
583        assert_eq!(format!("{}", WindowType::Sliding), "HOP");
584        assert_eq!(format!("{}", WindowType::Session), "SESSION");
585        assert_eq!(format!("{}", WindowType::Cumulate), "CUMULATE");
586    }
587
588    #[test]
589    fn test_display_duration_formatting() {
590        // Hours
591        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
592        assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
593
594        // Seconds (non-round minutes)
595        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
596        assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
597
598        // Milliseconds (sub-second)
599        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
600        assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
601    }
602}