laminar_sql/translator/
analytic_translator.rs1use crate::parser::analytic_parser::{AnalyticFunctionType, AnalyticWindowAnalysis};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct AnalyticWindowConfig {
11 pub functions: Vec<AnalyticFunctionConfig>,
13 pub partition_columns: Vec<String>,
15 pub order_columns: Vec<String>,
17 pub max_partitions: usize,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct AnalyticFunctionConfig {
24 pub function_type: AnalyticFunctionType,
26 pub source_column: String,
28 pub offset: usize,
30 pub default_value: Option<String>,
32 pub output_alias: Option<String>,
34}
35
36const DEFAULT_MAX_PARTITIONS: usize = 10_000;
38
39impl AnalyticWindowConfig {
40 #[must_use]
42 pub fn from_analysis(analysis: &AnalyticWindowAnalysis) -> Self {
43 let functions = analysis
44 .functions
45 .iter()
46 .map(|f| AnalyticFunctionConfig {
47 function_type: f.function_type,
48 source_column: f.column.clone(),
49 offset: f.offset,
50 default_value: f.default_value.clone(),
51 output_alias: f.alias.clone(),
52 })
53 .collect();
54
55 Self {
56 functions,
57 partition_columns: analysis.partition_columns.clone(),
58 order_columns: analysis.order_columns.clone(),
59 max_partitions: DEFAULT_MAX_PARTITIONS,
60 }
61 }
62
63 #[must_use]
65 pub fn with_max_partitions(mut self, max_partitions: usize) -> Self {
66 self.max_partitions = max_partitions;
67 self
68 }
69
70 #[must_use]
72 pub fn has_lookahead(&self) -> bool {
73 self.functions
74 .iter()
75 .any(|f| f.function_type.requires_lookahead())
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use crate::parser::analytic_parser::{AnalyticFunctionInfo, AnalyticWindowAnalysis};
83
84 fn make_lag_analysis() -> AnalyticWindowAnalysis {
85 AnalyticWindowAnalysis {
86 functions: vec![AnalyticFunctionInfo {
87 function_type: AnalyticFunctionType::Lag,
88 column: "price".to_string(),
89 offset: 1,
90 default_value: None,
91 alias: Some("prev_price".to_string()),
92 }],
93 partition_columns: vec!["symbol".to_string()],
94 order_columns: vec!["ts".to_string()],
95 }
96 }
97
98 #[test]
99 fn test_from_analysis_lag() {
100 let analysis = make_lag_analysis();
101 let config = AnalyticWindowConfig::from_analysis(&analysis);
102 assert_eq!(config.functions.len(), 1);
103 assert_eq!(config.functions[0].function_type, AnalyticFunctionType::Lag);
104 assert_eq!(config.functions[0].source_column, "price");
105 assert_eq!(config.functions[0].offset, 1);
106 assert_eq!(
107 config.functions[0].output_alias.as_deref(),
108 Some("prev_price")
109 );
110 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
111 assert_eq!(config.order_columns, vec!["ts".to_string()]);
112 assert!(!config.has_lookahead());
113 }
114
115 #[test]
116 fn test_from_analysis_lead() {
117 let analysis = AnalyticWindowAnalysis {
118 functions: vec![AnalyticFunctionInfo {
119 function_type: AnalyticFunctionType::Lead,
120 column: "price".to_string(),
121 offset: 2,
122 default_value: Some("0".to_string()),
123 alias: Some("next_price".to_string()),
124 }],
125 partition_columns: vec![],
126 order_columns: vec!["ts".to_string()],
127 };
128 let config = AnalyticWindowConfig::from_analysis(&analysis);
129 assert!(config.has_lookahead());
130 assert_eq!(config.functions[0].offset, 2);
131 assert_eq!(config.functions[0].default_value.as_deref(), Some("0"));
132 }
133
134 #[test]
135 fn test_max_partitions() {
136 let analysis = make_lag_analysis();
137 let config = AnalyticWindowConfig::from_analysis(&analysis).with_max_partitions(500);
138 assert_eq!(config.max_partitions, 500);
139 }
140
141 #[test]
142 fn test_multiple_functions() {
143 let analysis = AnalyticWindowAnalysis {
144 functions: vec![
145 AnalyticFunctionInfo {
146 function_type: AnalyticFunctionType::Lag,
147 column: "price".to_string(),
148 offset: 1,
149 default_value: None,
150 alias: Some("prev".to_string()),
151 },
152 AnalyticFunctionInfo {
153 function_type: AnalyticFunctionType::Lead,
154 column: "price".to_string(),
155 offset: 1,
156 default_value: None,
157 alias: Some("next".to_string()),
158 },
159 ],
160 partition_columns: vec!["sym".to_string()],
161 order_columns: vec!["ts".to_string()],
162 };
163 let config = AnalyticWindowConfig::from_analysis(&analysis);
164 assert_eq!(config.functions.len(), 2);
165 assert!(config.has_lookahead());
166 }
167
168 #[test]
169 fn test_default_max_partitions() {
170 let analysis = make_lag_analysis();
171 let config = AnalyticWindowConfig::from_analysis(&analysis);
172 assert_eq!(config.max_partitions, DEFAULT_MAX_PARTITIONS);
173 }
174}