Skip to main content

laminar_sql/translator/
window_translator.rs

1//! Window operator configuration builder
2//!
3//! Translates parsed window functions and EMIT/late data clauses
4//! into complete operator configurations.
5
6use std::time::Duration;
7
8use crate::parser::{
9    EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12/// Type of window operation
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15    /// Fixed-size non-overlapping windows
16    Tumbling,
17    /// Fixed-size overlapping windows with slide
18    Sliding,
19    /// Dynamic windows based on activity gaps
20    Session,
21}
22
23/// Complete configuration for instantiating a window operator.
24///
25/// This structure holds all the information needed to create and configure
26/// a window operator in Ring 0.
27///
28/// # EMIT ON WINDOW CLOSE
29///
30/// When `emit_strategy` is `OnWindowClose` or `FinalOnly`, use [`validate()`](Self::validate)
31/// to ensure the configuration is valid. These strategies require:
32/// - A watermark definition on the source (timers are driven by watermark)
33/// - A windowed aggregation context (non-windowed queries cannot use EOWC)
34#[derive(Debug, Clone)]
35pub struct WindowOperatorConfig {
36    /// The type of window (tumbling, sliding, session)
37    pub window_type: WindowType,
38    /// The time column name used for windowing
39    pub time_column: String,
40    /// Window size (for tumbling and sliding)
41    pub size: Duration,
42    /// Slide interval for sliding windows
43    pub slide: Option<Duration>,
44    /// Gap interval for session windows
45    pub gap: Option<Duration>,
46    /// Maximum allowed lateness for late events
47    pub allowed_lateness: Duration,
48    /// Emit strategy (when to output results)
49    pub emit_strategy: EmitStrategy,
50    /// Side output name for late data (if configured)
51    pub late_data_side_output: Option<String>,
52}
53
54/// Format a Duration as a human-readable string (e.g., "60s", "5m", "1h").
55fn format_duration(d: Duration) -> String {
56    let secs = d.as_secs();
57    if secs == 0 {
58        return format!("{}ms", d.as_millis());
59    }
60    if secs.is_multiple_of(3600) {
61        format!("{}h", secs / 3600)
62    } else if secs.is_multiple_of(60) {
63        format!("{}m", secs / 60)
64    } else {
65        format!("{secs}s")
66    }
67}
68
69impl std::fmt::Display for WindowType {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        match self {
72            WindowType::Tumbling => write!(f, "TUMBLE"),
73            WindowType::Sliding => write!(f, "HOP"),
74            WindowType::Session => write!(f, "SESSION"),
75        }
76    }
77}
78
79impl std::fmt::Display for WindowOperatorConfig {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self.window_type {
82            WindowType::Tumbling => {
83                write!(
84                    f,
85                    "TUMBLE({}, {})",
86                    self.time_column,
87                    format_duration(self.size)
88                )
89            }
90            WindowType::Sliding => {
91                let slide = self.slide.unwrap_or(self.size);
92                write!(
93                    f,
94                    "HOP({}, {} SLIDE {})",
95                    self.time_column,
96                    format_duration(self.size),
97                    format_duration(slide)
98                )
99            }
100            WindowType::Session => {
101                let gap = self.gap.unwrap_or(Duration::ZERO);
102                write!(
103                    f,
104                    "SESSION({}, GAP {})",
105                    self.time_column,
106                    format_duration(gap)
107                )
108            }
109        }
110    }
111}
112
113impl WindowOperatorConfig {
114    /// Create a new tumbling window configuration.
115    #[must_use]
116    pub fn tumbling(time_column: String, size: Duration) -> Self {
117        Self {
118            window_type: WindowType::Tumbling,
119            time_column,
120            size,
121            slide: None,
122            gap: None,
123            allowed_lateness: Duration::ZERO,
124            emit_strategy: EmitStrategy::OnWatermark,
125            late_data_side_output: None,
126        }
127    }
128
129    /// Create a new sliding window configuration.
130    #[must_use]
131    pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
132        Self {
133            window_type: WindowType::Sliding,
134            time_column,
135            size,
136            slide: Some(slide),
137            gap: None,
138            allowed_lateness: Duration::ZERO,
139            emit_strategy: EmitStrategy::OnWatermark,
140            late_data_side_output: None,
141        }
142    }
143
144    /// Create a new session window configuration.
145    #[must_use]
146    pub fn session(time_column: String, gap: Duration) -> Self {
147        Self {
148            window_type: WindowType::Session,
149            time_column,
150            size: Duration::ZERO, // Not used for session windows
151            slide: None,
152            gap: Some(gap),
153            allowed_lateness: Duration::ZERO,
154            emit_strategy: EmitStrategy::OnWatermark,
155            late_data_side_output: None,
156        }
157    }
158
159    /// Build configuration from a parsed `WindowFunction`.
160    ///
161    /// # Errors
162    ///
163    /// Returns `ParseError::WindowError` if:
164    /// - Time column cannot be extracted
165    /// - Interval cannot be parsed
166    pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
167        let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
168            ParseError::WindowError("Cannot extract time column name".to_string())
169        })?;
170
171        match window {
172            WindowFunction::Tumble { interval, .. } => {
173                let size = WindowRewriter::parse_interval_to_duration(interval)?;
174                Ok(Self::tumbling(time_column, size))
175            }
176            WindowFunction::Hop {
177                slide_interval,
178                window_interval,
179                ..
180            } => {
181                let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
182                let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
183                Ok(Self::sliding(time_column, size, slide))
184            }
185            WindowFunction::Session { gap_interval, .. } => {
186                let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
187                Ok(Self::session(time_column, gap))
188            }
189        }
190    }
191
192    /// Apply EMIT clause configuration.
193    ///
194    /// # Errors
195    ///
196    /// Returns `ParseError::WindowError` if the emit clause cannot be converted.
197    pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
198        self.emit_strategy = emit_clause.to_emit_strategy()?;
199        Ok(self)
200    }
201
202    /// Apply late data clause configuration.
203    ///
204    /// # Errors
205    ///
206    /// Returns `ParseError::WindowError` if the allowed lateness cannot be parsed.
207    pub fn with_late_data_clause(
208        mut self,
209        late_data_clause: &LateDataClause,
210    ) -> Result<Self, ParseError> {
211        self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
212        self.late_data_side_output
213            .clone_from(&late_data_clause.side_output);
214        Ok(self)
215    }
216
217    /// Set allowed lateness duration.
218    #[must_use]
219    pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
220        self.allowed_lateness = lateness;
221        self
222    }
223
224    /// Set emit strategy.
225    #[must_use]
226    pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
227        self.emit_strategy = strategy;
228        self
229    }
230
231    /// Set late data side output.
232    #[must_use]
233    pub fn with_late_data_side_output(mut self, name: String) -> Self {
234        self.late_data_side_output = Some(name);
235        self
236    }
237
238    /// Validates that the window operator configuration is used in a valid context.
239    ///
240    /// Checks:
241    /// - `EMIT ON WINDOW CLOSE` and `EMIT FINAL` require a watermark on the source
242    /// - `EMIT ON WINDOW CLOSE` and `EMIT FINAL` require a windowed aggregation
243    ///
244    /// # Arguments
245    ///
246    /// * `has_watermark` - Whether the source has a watermark definition
247    /// * `has_window` - Whether the query contains a windowed aggregation
248    ///
249    /// # Errors
250    ///
251    /// Returns `ParseError::WindowError` if validation fails.
252    pub fn validate(&self, has_watermark: bool, has_window: bool) -> Result<(), ParseError> {
253        if matches!(
254            self.emit_strategy,
255            EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
256        ) {
257            if !has_watermark {
258                return Err(ParseError::WindowError(
259                    "EMIT ON WINDOW CLOSE requires a watermark definition \
260                     on the source. Add WATERMARK FOR <column> AS <expr> \
261                     to the CREATE SOURCE statement."
262                        .to_string(),
263                ));
264            }
265            if !has_window {
266                return Err(ParseError::WindowError(
267                    "EMIT ON WINDOW CLOSE is only valid with windowed \
268                     aggregation queries. Use EMIT ON UPDATE for \
269                     non-windowed queries."
270                        .to_string(),
271                ));
272            }
273        }
274        Ok(())
275    }
276
277    /// Check if this configuration supports append-only output.
278    ///
279    /// Append-only sinks (Kafka, S3, Delta Lake) require emit strategies
280    /// that don't produce retractions.
281    #[must_use]
282    pub fn is_append_only_compatible(&self) -> bool {
283        matches!(
284            self.emit_strategy,
285            EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
286        )
287    }
288
289    /// Check if late data handling is configured.
290    #[must_use]
291    pub fn has_late_data_handling(&self) -> bool {
292        self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use sqlparser::ast::{Expr, Ident};
300
301    fn make_tumble_window() -> WindowFunction {
302        // Create a simple tumble window for testing
303        WindowFunction::Tumble {
304            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
305            interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
306        }
307    }
308
309    #[test]
310    fn test_tumbling_config() {
311        let config =
312            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
313
314        assert_eq!(config.window_type, WindowType::Tumbling);
315        assert_eq!(config.time_column, "event_time");
316        assert_eq!(config.size, Duration::from_secs(300));
317        assert!(config.slide.is_none());
318        assert!(config.gap.is_none());
319    }
320
321    #[test]
322    fn test_sliding_config() {
323        let config = WindowOperatorConfig::sliding(
324            "ts".to_string(),
325            Duration::from_secs(300),
326            Duration::from_secs(60),
327        );
328
329        assert_eq!(config.window_type, WindowType::Sliding);
330        assert_eq!(config.size, Duration::from_secs(300));
331        assert_eq!(config.slide, Some(Duration::from_secs(60)));
332    }
333
334    #[test]
335    fn test_session_config() {
336        let config =
337            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
338
339        assert_eq!(config.window_type, WindowType::Session);
340        assert_eq!(config.gap, Some(Duration::from_secs(1800)));
341    }
342
343    #[test]
344    fn test_from_window_function() {
345        let window = make_tumble_window();
346        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
347
348        assert_eq!(config.window_type, WindowType::Tumbling);
349        assert_eq!(config.time_column, "event_time");
350        assert_eq!(config.size, Duration::from_secs(300));
351    }
352
353    #[test]
354    fn test_with_emit_clause() {
355        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
356
357        let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
358        assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
359
360        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
361        let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
362        assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
363    }
364
365    #[test]
366    fn test_with_late_data_clause() {
367        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
368
369        let late_clause = LateDataClause::side_output_only("late_events".to_string());
370        let config = config.with_late_data_clause(&late_clause).unwrap();
371
372        assert_eq!(
373            config.late_data_side_output,
374            Some("late_events".to_string())
375        );
376    }
377
378    #[test]
379    fn test_append_only_compatible() {
380        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
381
382        // Default emit strategy (OnWatermark) is append-only compatible
383        assert!(config.is_append_only_compatible());
384
385        // OnUpdate is NOT append-only compatible (produces retractions)
386        let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
387        assert!(!config2.is_append_only_compatible());
388
389        // Changelog is NOT append-only compatible
390        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
391            .with_emit_strategy(EmitStrategy::Changelog);
392        assert!(!config3.is_append_only_compatible());
393    }
394
395    #[test]
396    fn test_has_late_data_handling() {
397        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
398
399        // No late data handling by default
400        assert!(!config.has_late_data_handling());
401
402        // With allowed lateness
403        let config2 = config
404            .clone()
405            .with_allowed_lateness(Duration::from_secs(60));
406        assert!(config2.has_late_data_handling());
407
408        // With side output
409        let config3 = config.with_late_data_side_output("late".to_string());
410        assert!(config3.has_late_data_handling());
411    }
412
413    #[test]
414    fn test_eowc_without_watermark_errors() {
415        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
416            .with_emit_strategy(EmitStrategy::OnWindowClose);
417
418        let result = config.validate(false, true);
419        assert!(result.is_err());
420        let err = result.unwrap_err().to_string();
421        assert!(
422            err.contains("watermark"),
423            "Expected watermark error, got: {err}"
424        );
425    }
426
427    #[test]
428    fn test_eowc_without_window_errors() {
429        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
430            .with_emit_strategy(EmitStrategy::OnWindowClose);
431
432        let result = config.validate(true, false);
433        assert!(result.is_err());
434        let err = result.unwrap_err().to_string();
435        assert!(
436            err.contains("windowed"),
437            "Expected windowed query error, got: {err}"
438        );
439    }
440
441    #[test]
442    fn test_eowc_with_watermark_and_window_passes() {
443        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
444            .with_emit_strategy(EmitStrategy::OnWindowClose);
445
446        assert!(config.validate(true, true).is_ok());
447    }
448
449    #[test]
450    fn test_final_without_watermark_errors() {
451        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
452            .with_emit_strategy(EmitStrategy::FinalOnly);
453
454        let result = config.validate(false, true);
455        assert!(result.is_err());
456    }
457
458    #[test]
459    fn test_non_eowc_without_watermark_ok() {
460        // OnUpdate does not require watermark
461        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
462            .with_emit_strategy(EmitStrategy::OnUpdate);
463        assert!(config.validate(false, false).is_ok());
464
465        // Periodic does not require watermark
466        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
467            .with_emit_strategy(EmitStrategy::Periodic(Duration::from_secs(5)));
468        assert!(config2.validate(false, false).is_ok());
469
470        // Changelog does not require watermark
471        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
472            .with_emit_strategy(EmitStrategy::Changelog);
473        assert!(config3.validate(false, false).is_ok());
474    }
475
476    #[test]
477    fn test_display_tumbling_window() {
478        let config =
479            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(60));
480        assert_eq!(format!("{config}"), "TUMBLE(event_time, 1m)");
481    }
482
483    #[test]
484    fn test_display_sliding_window() {
485        let config = WindowOperatorConfig::sliding(
486            "ts".to_string(),
487            Duration::from_secs(300),
488            Duration::from_secs(60),
489        );
490        assert_eq!(format!("{config}"), "HOP(ts, 5m SLIDE 1m)");
491    }
492
493    #[test]
494    fn test_display_session_window() {
495        let config =
496            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
497        assert_eq!(format!("{config}"), "SESSION(click_time, GAP 30m)");
498    }
499
500    #[test]
501    fn test_display_window_type() {
502        assert_eq!(format!("{}", WindowType::Tumbling), "TUMBLE");
503        assert_eq!(format!("{}", WindowType::Sliding), "HOP");
504        assert_eq!(format!("{}", WindowType::Session), "SESSION");
505    }
506
507    #[test]
508    fn test_display_duration_formatting() {
509        // Hours
510        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(3600));
511        assert_eq!(format!("{config}"), "TUMBLE(ts, 1h)");
512
513        // Seconds (non-round minutes)
514        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(45));
515        assert_eq!(format!("{config2}"), "TUMBLE(ts, 45s)");
516
517        // Milliseconds (sub-second)
518        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_millis(500));
519        assert_eq!(format!("{config3}"), "TUMBLE(ts, 500ms)");
520    }
521}