Skip to main content

laminar_sql/translator/
window_translator.rs

1//! Window operator configuration builder
2//!
3//! Translates parsed window functions and EMIT/late data clauses
4//! into complete operator configurations.
5
6use std::time::Duration;
7
8use crate::parser::{
9    EmitClause, EmitStrategy, LateDataClause, ParseError, WindowFunction, WindowRewriter,
10};
11
12/// Type of window operation
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum WindowType {
15    /// Fixed-size non-overlapping windows
16    Tumbling,
17    /// Fixed-size overlapping windows with slide
18    Sliding,
19    /// Dynamic windows based on activity gaps
20    Session,
21}
22
23/// Complete configuration for instantiating a window operator.
24///
25/// This structure holds all the information needed to create and configure
26/// a window operator in Ring 0.
27#[derive(Debug, Clone)]
28pub struct WindowOperatorConfig {
29    /// The type of window (tumbling, sliding, session)
30    pub window_type: WindowType,
31    /// The time column name used for windowing
32    pub time_column: String,
33    /// Window size (for tumbling and sliding)
34    pub size: Duration,
35    /// Slide interval for sliding windows
36    pub slide: Option<Duration>,
37    /// Gap interval for session windows
38    pub gap: Option<Duration>,
39    /// Maximum allowed lateness for late events
40    pub allowed_lateness: Duration,
41    /// Emit strategy (when to output results)
42    pub emit_strategy: EmitStrategy,
43    /// Side output name for late data (if configured)
44    pub late_data_side_output: Option<String>,
45}
46
47impl WindowOperatorConfig {
48    /// Create a new tumbling window configuration.
49    #[must_use]
50    pub fn tumbling(time_column: String, size: Duration) -> Self {
51        Self {
52            window_type: WindowType::Tumbling,
53            time_column,
54            size,
55            slide: None,
56            gap: None,
57            allowed_lateness: Duration::ZERO,
58            emit_strategy: EmitStrategy::OnWatermark,
59            late_data_side_output: None,
60        }
61    }
62
63    /// Create a new sliding window configuration.
64    #[must_use]
65    pub fn sliding(time_column: String, size: Duration, slide: Duration) -> Self {
66        Self {
67            window_type: WindowType::Sliding,
68            time_column,
69            size,
70            slide: Some(slide),
71            gap: None,
72            allowed_lateness: Duration::ZERO,
73            emit_strategy: EmitStrategy::OnWatermark,
74            late_data_side_output: None,
75        }
76    }
77
78    /// Create a new session window configuration.
79    #[must_use]
80    pub fn session(time_column: String, gap: Duration) -> Self {
81        Self {
82            window_type: WindowType::Session,
83            time_column,
84            size: Duration::ZERO, // Not used for session windows
85            slide: None,
86            gap: Some(gap),
87            allowed_lateness: Duration::ZERO,
88            emit_strategy: EmitStrategy::OnWatermark,
89            late_data_side_output: None,
90        }
91    }
92
93    /// Build configuration from a parsed `WindowFunction`.
94    ///
95    /// # Errors
96    ///
97    /// Returns `ParseError::WindowError` if:
98    /// - Time column cannot be extracted
99    /// - Interval cannot be parsed
100    pub fn from_window_function(window: &WindowFunction) -> Result<Self, ParseError> {
101        let time_column = WindowRewriter::get_time_column_name(window).ok_or_else(|| {
102            ParseError::WindowError("Cannot extract time column name".to_string())
103        })?;
104
105        match window {
106            WindowFunction::Tumble { interval, .. } => {
107                let size = WindowRewriter::parse_interval_to_duration(interval)?;
108                Ok(Self::tumbling(time_column, size))
109            }
110            WindowFunction::Hop {
111                slide_interval,
112                window_interval,
113                ..
114            } => {
115                let size = WindowRewriter::parse_interval_to_duration(window_interval)?;
116                let slide = WindowRewriter::parse_interval_to_duration(slide_interval)?;
117                Ok(Self::sliding(time_column, size, slide))
118            }
119            WindowFunction::Session { gap_interval, .. } => {
120                let gap = WindowRewriter::parse_interval_to_duration(gap_interval)?;
121                Ok(Self::session(time_column, gap))
122            }
123        }
124    }
125
126    /// Apply EMIT clause configuration.
127    ///
128    /// # Errors
129    ///
130    /// Returns `ParseError::WindowError` if the emit clause cannot be converted.
131    pub fn with_emit_clause(mut self, emit_clause: &EmitClause) -> Result<Self, ParseError> {
132        self.emit_strategy = emit_clause.to_emit_strategy()?;
133        Ok(self)
134    }
135
136    /// Apply late data clause configuration.
137    ///
138    /// # Errors
139    ///
140    /// Returns `ParseError::WindowError` if the allowed lateness cannot be parsed.
141    pub fn with_late_data_clause(
142        mut self,
143        late_data_clause: &LateDataClause,
144    ) -> Result<Self, ParseError> {
145        self.allowed_lateness = late_data_clause.to_allowed_lateness()?;
146        self.late_data_side_output
147            .clone_from(&late_data_clause.side_output);
148        Ok(self)
149    }
150
151    /// Set allowed lateness duration.
152    #[must_use]
153    pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
154        self.allowed_lateness = lateness;
155        self
156    }
157
158    /// Set emit strategy.
159    #[must_use]
160    pub fn with_emit_strategy(mut self, strategy: EmitStrategy) -> Self {
161        self.emit_strategy = strategy;
162        self
163    }
164
165    /// Set late data side output.
166    #[must_use]
167    pub fn with_late_data_side_output(mut self, name: String) -> Self {
168        self.late_data_side_output = Some(name);
169        self
170    }
171
172    /// Check if this configuration supports append-only output.
173    ///
174    /// Append-only sinks (Kafka, S3, Delta Lake) require emit strategies
175    /// that don't produce retractions.
176    #[must_use]
177    pub fn is_append_only_compatible(&self) -> bool {
178        matches!(
179            self.emit_strategy,
180            EmitStrategy::OnWatermark | EmitStrategy::OnWindowClose | EmitStrategy::FinalOnly
181        )
182    }
183
184    /// Check if late data handling is configured.
185    #[must_use]
186    pub fn has_late_data_handling(&self) -> bool {
187        self.allowed_lateness > Duration::ZERO || self.late_data_side_output.is_some()
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use sqlparser::ast::{Expr, Ident};
195
196    fn make_tumble_window() -> WindowFunction {
197        // Create a simple tumble window for testing
198        WindowFunction::Tumble {
199            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
200            interval: Box::new(Expr::Identifier(Ident::new("5 MINUTE"))),
201        }
202    }
203
204    #[test]
205    fn test_tumbling_config() {
206        let config =
207            WindowOperatorConfig::tumbling("event_time".to_string(), Duration::from_secs(300));
208
209        assert_eq!(config.window_type, WindowType::Tumbling);
210        assert_eq!(config.time_column, "event_time");
211        assert_eq!(config.size, Duration::from_secs(300));
212        assert!(config.slide.is_none());
213        assert!(config.gap.is_none());
214    }
215
216    #[test]
217    fn test_sliding_config() {
218        let config = WindowOperatorConfig::sliding(
219            "ts".to_string(),
220            Duration::from_secs(300),
221            Duration::from_secs(60),
222        );
223
224        assert_eq!(config.window_type, WindowType::Sliding);
225        assert_eq!(config.size, Duration::from_secs(300));
226        assert_eq!(config.slide, Some(Duration::from_secs(60)));
227    }
228
229    #[test]
230    fn test_session_config() {
231        let config =
232            WindowOperatorConfig::session("click_time".to_string(), Duration::from_secs(1800));
233
234        assert_eq!(config.window_type, WindowType::Session);
235        assert_eq!(config.gap, Some(Duration::from_secs(1800)));
236    }
237
238    #[test]
239    fn test_from_window_function() {
240        let window = make_tumble_window();
241        let config = WindowOperatorConfig::from_window_function(&window).unwrap();
242
243        assert_eq!(config.window_type, WindowType::Tumbling);
244        assert_eq!(config.time_column, "event_time");
245        assert_eq!(config.size, Duration::from_secs(300));
246    }
247
248    #[test]
249    fn test_with_emit_clause() {
250        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
251
252        let config = config.with_emit_clause(&EmitClause::OnWindowClose).unwrap();
253        assert_eq!(config.emit_strategy, EmitStrategy::OnWindowClose);
254
255        let config2 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
256        let config2 = config2.with_emit_clause(&EmitClause::Changes).unwrap();
257        assert_eq!(config2.emit_strategy, EmitStrategy::Changelog);
258    }
259
260    #[test]
261    fn test_with_late_data_clause() {
262        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
263
264        let late_clause = LateDataClause::side_output_only("late_events".to_string());
265        let config = config.with_late_data_clause(&late_clause).unwrap();
266
267        assert_eq!(
268            config.late_data_side_output,
269            Some("late_events".to_string())
270        );
271    }
272
273    #[test]
274    fn test_append_only_compatible() {
275        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
276
277        // Default emit strategy (OnWatermark) is append-only compatible
278        assert!(config.is_append_only_compatible());
279
280        // OnUpdate is NOT append-only compatible (produces retractions)
281        let config2 = config.with_emit_strategy(EmitStrategy::OnUpdate);
282        assert!(!config2.is_append_only_compatible());
283
284        // Changelog is NOT append-only compatible
285        let config3 = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300))
286            .with_emit_strategy(EmitStrategy::Changelog);
287        assert!(!config3.is_append_only_compatible());
288    }
289
290    #[test]
291    fn test_has_late_data_handling() {
292        let config = WindowOperatorConfig::tumbling("ts".to_string(), Duration::from_secs(300));
293
294        // No late data handling by default
295        assert!(!config.has_late_data_handling());
296
297        // With allowed lateness
298        let config2 = config
299            .clone()
300            .with_allowed_lateness(Duration::from_secs(60));
301        assert!(config2.has_late_data_handling());
302
303        // With side output
304        let config3 = config.with_late_data_side_output("late".to_string());
305        assert!(config3.has_late_data_handling());
306    }
307}