Skip to main content

laminar_sql/translator/
analytic_translator.rs

1//! Analytic window function operator configuration builder
2//!
3//! Translates parsed analytic function analysis into Ring 0 operator
4//! configurations for LAG/LEAD/FIRST_VALUE/LAST_VALUE/NTH_VALUE execution.
5
6use crate::parser::analytic_parser::{AnalyticFunctionType, AnalyticWindowAnalysis};
7
8/// Configuration for a streaming analytic window operator.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct AnalyticWindowConfig {
11    /// Individual function configurations
12    pub functions: Vec<AnalyticFunctionConfig>,
13    /// PARTITION BY columns
14    pub partition_columns: Vec<String>,
15    /// ORDER BY columns
16    pub order_columns: Vec<String>,
17    /// Maximum number of partitions (memory safety)
18    pub max_partitions: usize,
19}
20
21/// Configuration for a single analytic function within the operator.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct AnalyticFunctionConfig {
24    /// Type of analytic function
25    pub function_type: AnalyticFunctionType,
26    /// Source column name
27    pub source_column: String,
28    /// Offset (for LAG/LEAD) or N (for NTH_VALUE)
29    pub offset: usize,
30    /// Default value as string (for LAG/LEAD)
31    pub default_value: Option<String>,
32    /// Output column alias
33    pub output_alias: Option<String>,
34}
35
36/// Default maximum partitions for analytic window operators.
37const DEFAULT_MAX_PARTITIONS: usize = 10_000;
38
39impl AnalyticWindowConfig {
40    /// Creates an operator configuration from an analytic window analysis.
41    #[must_use]
42    pub fn from_analysis(analysis: &AnalyticWindowAnalysis) -> Self {
43        let functions = analysis
44            .functions
45            .iter()
46            .map(|f| AnalyticFunctionConfig {
47                function_type: f.function_type,
48                source_column: f.column.clone(),
49                offset: f.offset,
50                default_value: f.default_value.clone(),
51                output_alias: f.alias.clone(),
52            })
53            .collect();
54
55        Self {
56            functions,
57            partition_columns: analysis.partition_columns.clone(),
58            order_columns: analysis.order_columns.clone(),
59            max_partitions: DEFAULT_MAX_PARTITIONS,
60        }
61    }
62
63    /// Sets the maximum number of partitions.
64    #[must_use]
65    pub fn with_max_partitions(mut self, max_partitions: usize) -> Self {
66        self.max_partitions = max_partitions;
67        self
68    }
69
70    /// Returns true if any function requires lookahead buffering.
71    #[must_use]
72    pub fn has_lookahead(&self) -> bool {
73        self.functions
74            .iter()
75            .any(|f| f.function_type.requires_lookahead())
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use crate::parser::analytic_parser::{AnalyticFunctionInfo, AnalyticWindowAnalysis};
83
84    fn make_lag_analysis() -> AnalyticWindowAnalysis {
85        AnalyticWindowAnalysis {
86            functions: vec![AnalyticFunctionInfo {
87                function_type: AnalyticFunctionType::Lag,
88                column: "price".to_string(),
89                offset: 1,
90                default_value: None,
91                alias: Some("prev_price".to_string()),
92            }],
93            partition_columns: vec!["symbol".to_string()],
94            order_columns: vec!["ts".to_string()],
95        }
96    }
97
98    #[test]
99    fn test_from_analysis_lag() {
100        let analysis = make_lag_analysis();
101        let config = AnalyticWindowConfig::from_analysis(&analysis);
102        assert_eq!(config.functions.len(), 1);
103        assert_eq!(config.functions[0].function_type, AnalyticFunctionType::Lag);
104        assert_eq!(config.functions[0].source_column, "price");
105        assert_eq!(config.functions[0].offset, 1);
106        assert_eq!(
107            config.functions[0].output_alias.as_deref(),
108            Some("prev_price")
109        );
110        assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
111        assert_eq!(config.order_columns, vec!["ts".to_string()]);
112        assert!(!config.has_lookahead());
113    }
114
115    #[test]
116    fn test_from_analysis_lead() {
117        let analysis = AnalyticWindowAnalysis {
118            functions: vec![AnalyticFunctionInfo {
119                function_type: AnalyticFunctionType::Lead,
120                column: "price".to_string(),
121                offset: 2,
122                default_value: Some("0".to_string()),
123                alias: Some("next_price".to_string()),
124            }],
125            partition_columns: vec![],
126            order_columns: vec!["ts".to_string()],
127        };
128        let config = AnalyticWindowConfig::from_analysis(&analysis);
129        assert!(config.has_lookahead());
130        assert_eq!(config.functions[0].offset, 2);
131        assert_eq!(config.functions[0].default_value.as_deref(), Some("0"));
132    }
133
134    #[test]
135    fn test_max_partitions() {
136        let analysis = make_lag_analysis();
137        let config = AnalyticWindowConfig::from_analysis(&analysis).with_max_partitions(500);
138        assert_eq!(config.max_partitions, 500);
139    }
140
141    #[test]
142    fn test_multiple_functions() {
143        let analysis = AnalyticWindowAnalysis {
144            functions: vec![
145                AnalyticFunctionInfo {
146                    function_type: AnalyticFunctionType::Lag,
147                    column: "price".to_string(),
148                    offset: 1,
149                    default_value: None,
150                    alias: Some("prev".to_string()),
151                },
152                AnalyticFunctionInfo {
153                    function_type: AnalyticFunctionType::Lead,
154                    column: "price".to_string(),
155                    offset: 1,
156                    default_value: None,
157                    alias: Some("next".to_string()),
158                },
159            ],
160            partition_columns: vec!["sym".to_string()],
161            order_columns: vec!["ts".to_string()],
162        };
163        let config = AnalyticWindowConfig::from_analysis(&analysis);
164        assert_eq!(config.functions.len(), 2);
165        assert!(config.has_lookahead());
166    }
167
168    #[test]
169    fn test_default_max_partitions() {
170        let analysis = make_lag_analysis();
171        let config = AnalyticWindowConfig::from_analysis(&analysis);
172        assert_eq!(config.max_partitions, DEFAULT_MAX_PARTITIONS);
173    }
174}