Skip to main content

laminar_sql/translator/
analytic_translator.rs

1//! Analytic window function operator configuration builder
2//!
3//! Translates parsed analytic function analysis into Ring 0 operator
4//! configurations for LAG/LEAD/FIRST_VALUE/LAST_VALUE/NTH_VALUE execution.
5
6use crate::parser::analytic_parser::{AnalyticFunctionType, AnalyticWindowAnalysis};
7
8/// Configuration for a streaming analytic window operator.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct AnalyticWindowConfig {
11    /// Individual function configurations
12    pub functions: Vec<AnalyticFunctionConfig>,
13    /// PARTITION BY columns
14    pub partition_columns: Vec<String>,
15    /// ORDER BY columns
16    pub order_columns: Vec<String>,
17    /// Maximum number of partitions (memory safety)
18    pub max_partitions: usize,
19}
20
21/// Configuration for a single analytic function within the operator.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct AnalyticFunctionConfig {
24    /// Type of analytic function
25    pub function_type: AnalyticFunctionType,
26    /// Source column name
27    pub source_column: String,
28    /// Offset (for LAG/LEAD) or N (for NTH_VALUE)
29    pub offset: usize,
30    /// Default value as string (for LAG/LEAD)
31    pub default_value: Option<String>,
32    /// Output column alias
33    pub output_alias: Option<String>,
34}
35
36/// Default maximum partitions for analytic window operators.
37const DEFAULT_MAX_PARTITIONS: usize = 10_000;
38
39impl AnalyticWindowConfig {
40    /// Creates an operator configuration from an analytic window analysis.
41    #[must_use]
42    pub fn from_analysis(analysis: &AnalyticWindowAnalysis) -> Self {
43        let functions = analysis
44            .functions
45            .iter()
46            .map(|f| AnalyticFunctionConfig {
47                function_type: f.function_type,
48                source_column: f.column.clone(),
49                offset: f.offset,
50                default_value: f.default_value.clone(),
51                output_alias: f.alias.clone(),
52            })
53            .collect();
54
55        Self {
56            functions,
57            partition_columns: analysis.partition_columns.clone(),
58            order_columns: analysis.order_columns.clone(),
59            max_partitions: DEFAULT_MAX_PARTITIONS,
60        }
61    }
62
63    /// Sets the maximum number of partitions.
64    #[must_use]
65    pub fn with_max_partitions(mut self, max_partitions: usize) -> Self {
66        self.max_partitions = max_partitions;
67        self
68    }
69
70    /// Returns true if any function requires lookahead buffering.
71    #[must_use]
72    pub fn has_lookahead(&self) -> bool {
73        self.functions
74            .iter()
75            .any(|f| f.function_type.requires_lookahead())
76    }
77}
78
79// --- Window Frame operator configuration (F-SQL-006) ---
80
81use crate::parser::analytic_parser::{
82    FrameBound, FrameUnits, WindowFrameAnalysis, WindowFrameFunction,
83};
84
85/// Configuration for a single window frame aggregate function.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct WindowFrameFunctionConfig {
88    /// Type of aggregate function
89    pub function_type: WindowFrameFunction,
90    /// Source column name
91    pub source_column: String,
92    /// Frame unit type (ROWS or RANGE)
93    pub units: FrameUnits,
94    /// Start bound of the frame
95    pub start_bound: FrameBound,
96    /// End bound of the frame
97    pub end_bound: FrameBound,
98    /// Output column alias
99    pub output_alias: Option<String>,
100}
101
102/// Configuration for a streaming window frame operator.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct WindowFrameConfig {
105    /// Individual function configurations
106    pub functions: Vec<WindowFrameFunctionConfig>,
107    /// PARTITION BY columns
108    pub partition_columns: Vec<String>,
109    /// ORDER BY columns
110    pub order_columns: Vec<String>,
111    /// Maximum number of partitions (memory safety)
112    pub max_partitions: usize,
113}
114
115impl WindowFrameConfig {
116    /// Creates an operator configuration from a window frame analysis.
117    #[must_use]
118    pub fn from_analysis(analysis: &WindowFrameAnalysis) -> Self {
119        let functions = analysis
120            .functions
121            .iter()
122            .map(|f| WindowFrameFunctionConfig {
123                function_type: f.function_type,
124                source_column: f.column.clone(),
125                units: f.units,
126                start_bound: f.start_bound.clone(),
127                end_bound: f.end_bound.clone(),
128                output_alias: f.alias.clone(),
129            })
130            .collect();
131
132        Self {
133            functions,
134            partition_columns: analysis.partition_columns.clone(),
135            order_columns: analysis.order_columns.clone(),
136            max_partitions: DEFAULT_MAX_PARTITIONS,
137        }
138    }
139
140    /// Sets the maximum number of partitions.
141    #[must_use]
142    pub fn with_max_partitions(mut self, max_partitions: usize) -> Self {
143        self.max_partitions = max_partitions;
144        self
145    }
146
147    /// Returns true if any frame uses FOLLOWING bounds.
148    #[must_use]
149    pub fn has_following(&self) -> bool {
150        self.functions.iter().any(|f| {
151            matches!(
152                f.end_bound,
153                FrameBound::Following(_) | FrameBound::UnboundedFollowing
154            ) || matches!(
155                f.start_bound,
156                FrameBound::Following(_) | FrameBound::UnboundedFollowing
157            )
158        })
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use crate::parser::analytic_parser::{AnalyticFunctionInfo, AnalyticWindowAnalysis};
166
167    fn make_lag_analysis() -> AnalyticWindowAnalysis {
168        AnalyticWindowAnalysis {
169            functions: vec![AnalyticFunctionInfo {
170                function_type: AnalyticFunctionType::Lag,
171                column: "price".to_string(),
172                offset: 1,
173                default_value: None,
174                alias: Some("prev_price".to_string()),
175            }],
176            partition_columns: vec!["symbol".to_string()],
177            order_columns: vec!["ts".to_string()],
178        }
179    }
180
181    #[test]
182    fn test_from_analysis_lag() {
183        let analysis = make_lag_analysis();
184        let config = AnalyticWindowConfig::from_analysis(&analysis);
185        assert_eq!(config.functions.len(), 1);
186        assert_eq!(config.functions[0].function_type, AnalyticFunctionType::Lag);
187        assert_eq!(config.functions[0].source_column, "price");
188        assert_eq!(config.functions[0].offset, 1);
189        assert_eq!(
190            config.functions[0].output_alias.as_deref(),
191            Some("prev_price")
192        );
193        assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
194        assert_eq!(config.order_columns, vec!["ts".to_string()]);
195        assert!(!config.has_lookahead());
196    }
197
198    #[test]
199    fn test_from_analysis_lead() {
200        let analysis = AnalyticWindowAnalysis {
201            functions: vec![AnalyticFunctionInfo {
202                function_type: AnalyticFunctionType::Lead,
203                column: "price".to_string(),
204                offset: 2,
205                default_value: Some("0".to_string()),
206                alias: Some("next_price".to_string()),
207            }],
208            partition_columns: vec![],
209            order_columns: vec!["ts".to_string()],
210        };
211        let config = AnalyticWindowConfig::from_analysis(&analysis);
212        assert!(config.has_lookahead());
213        assert_eq!(config.functions[0].offset, 2);
214        assert_eq!(config.functions[0].default_value.as_deref(), Some("0"));
215    }
216
217    #[test]
218    fn test_max_partitions() {
219        let analysis = make_lag_analysis();
220        let config = AnalyticWindowConfig::from_analysis(&analysis).with_max_partitions(500);
221        assert_eq!(config.max_partitions, 500);
222    }
223
224    #[test]
225    fn test_multiple_functions() {
226        let analysis = AnalyticWindowAnalysis {
227            functions: vec![
228                AnalyticFunctionInfo {
229                    function_type: AnalyticFunctionType::Lag,
230                    column: "price".to_string(),
231                    offset: 1,
232                    default_value: None,
233                    alias: Some("prev".to_string()),
234                },
235                AnalyticFunctionInfo {
236                    function_type: AnalyticFunctionType::Lead,
237                    column: "price".to_string(),
238                    offset: 1,
239                    default_value: None,
240                    alias: Some("next".to_string()),
241                },
242            ],
243            partition_columns: vec!["sym".to_string()],
244            order_columns: vec!["ts".to_string()],
245        };
246        let config = AnalyticWindowConfig::from_analysis(&analysis);
247        assert_eq!(config.functions.len(), 2);
248        assert!(config.has_lookahead());
249    }
250
251    #[test]
252    fn test_default_max_partitions() {
253        let analysis = make_lag_analysis();
254        let config = AnalyticWindowConfig::from_analysis(&analysis);
255        assert_eq!(config.max_partitions, DEFAULT_MAX_PARTITIONS);
256    }
257
258    // --- Window Frame translator tests (F-SQL-006) ---
259
260    use crate::parser::analytic_parser::WindowFrameInfo;
261
262    fn make_frame_analysis() -> WindowFrameAnalysis {
263        WindowFrameAnalysis {
264            functions: vec![WindowFrameInfo {
265                function_type: WindowFrameFunction::Avg,
266                column: "price".to_string(),
267                units: FrameUnits::Rows,
268                start_bound: FrameBound::Preceding(9),
269                end_bound: FrameBound::CurrentRow,
270                alias: Some("ma".to_string()),
271            }],
272            partition_columns: vec!["symbol".to_string()],
273            order_columns: vec!["ts".to_string()],
274        }
275    }
276
277    #[test]
278    fn test_frame_from_analysis_basic() {
279        let analysis = make_frame_analysis();
280        let config = WindowFrameConfig::from_analysis(&analysis);
281        assert_eq!(config.functions.len(), 1);
282        assert_eq!(config.functions[0].function_type, WindowFrameFunction::Avg);
283        assert_eq!(config.functions[0].source_column, "price");
284        assert_eq!(config.functions[0].units, FrameUnits::Rows);
285        assert_eq!(config.functions[0].start_bound, FrameBound::Preceding(9));
286        assert_eq!(config.functions[0].end_bound, FrameBound::CurrentRow);
287        assert_eq!(config.functions[0].output_alias.as_deref(), Some("ma"));
288        assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
289        assert_eq!(config.order_columns, vec!["ts".to_string()]);
290        assert!(!config.has_following());
291    }
292
293    #[test]
294    fn test_frame_max_partitions_builder() {
295        let analysis = make_frame_analysis();
296        let config = WindowFrameConfig::from_analysis(&analysis).with_max_partitions(500);
297        assert_eq!(config.max_partitions, 500);
298    }
299
300    #[test]
301    fn test_frame_has_following() {
302        let analysis = WindowFrameAnalysis {
303            functions: vec![WindowFrameInfo {
304                function_type: WindowFrameFunction::Sum,
305                column: "amount".to_string(),
306                units: FrameUnits::Rows,
307                start_bound: FrameBound::Preceding(5),
308                end_bound: FrameBound::Following(3),
309                alias: None,
310            }],
311            partition_columns: vec![],
312            order_columns: vec!["id".to_string()],
313        };
314        let config = WindowFrameConfig::from_analysis(&analysis);
315        assert!(config.has_following());
316    }
317
318    #[test]
319    fn test_frame_multiple_functions_config() {
320        let analysis = WindowFrameAnalysis {
321            functions: vec![
322                WindowFrameInfo {
323                    function_type: WindowFrameFunction::Avg,
324                    column: "price".to_string(),
325                    units: FrameUnits::Rows,
326                    start_bound: FrameBound::Preceding(9),
327                    end_bound: FrameBound::CurrentRow,
328                    alias: Some("ma".to_string()),
329                },
330                WindowFrameInfo {
331                    function_type: WindowFrameFunction::Max,
332                    column: "price".to_string(),
333                    units: FrameUnits::Rows,
334                    start_bound: FrameBound::Preceding(4),
335                    end_bound: FrameBound::CurrentRow,
336                    alias: Some("hi".to_string()),
337                },
338            ],
339            partition_columns: vec!["symbol".to_string()],
340            order_columns: vec!["ts".to_string()],
341        };
342        let config = WindowFrameConfig::from_analysis(&analysis);
343        assert_eq!(config.functions.len(), 2);
344        assert_eq!(config.functions[0].function_type, WindowFrameFunction::Avg);
345        assert_eq!(config.functions[1].function_type, WindowFrameFunction::Max);
346    }
347}