1use crate::parser::analytic_parser::{AnalyticFunctionType, AnalyticWindowAnalysis};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct AnalyticWindowConfig {
11 pub functions: Vec<AnalyticFunctionConfig>,
13 pub partition_columns: Vec<String>,
15 pub order_columns: Vec<String>,
17 pub max_partitions: usize,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct AnalyticFunctionConfig {
24 pub function_type: AnalyticFunctionType,
26 pub source_column: String,
28 pub offset: usize,
30 pub default_value: Option<String>,
32 pub output_alias: Option<String>,
34}
35
36const DEFAULT_MAX_PARTITIONS: usize = 10_000;
38
39impl AnalyticWindowConfig {
40 #[must_use]
42 pub fn from_analysis(analysis: &AnalyticWindowAnalysis) -> Self {
43 let functions = analysis
44 .functions
45 .iter()
46 .map(|f| AnalyticFunctionConfig {
47 function_type: f.function_type,
48 source_column: f.column.clone(),
49 offset: f.offset,
50 default_value: f.default_value.clone(),
51 output_alias: f.alias.clone(),
52 })
53 .collect();
54
55 Self {
56 functions,
57 partition_columns: analysis.partition_columns.clone(),
58 order_columns: analysis.order_columns.clone(),
59 max_partitions: DEFAULT_MAX_PARTITIONS,
60 }
61 }
62
63 #[must_use]
65 pub fn with_max_partitions(mut self, max_partitions: usize) -> Self {
66 self.max_partitions = max_partitions;
67 self
68 }
69
70 #[must_use]
72 pub fn has_lookahead(&self) -> bool {
73 self.functions
74 .iter()
75 .any(|f| f.function_type.requires_lookahead())
76 }
77}
78
79use crate::parser::analytic_parser::{
82 FrameBound, FrameUnits, WindowFrameAnalysis, WindowFrameFunction,
83};
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct WindowFrameFunctionConfig {
88 pub function_type: WindowFrameFunction,
90 pub source_column: String,
92 pub units: FrameUnits,
94 pub start_bound: FrameBound,
96 pub end_bound: FrameBound,
98 pub output_alias: Option<String>,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct WindowFrameConfig {
105 pub functions: Vec<WindowFrameFunctionConfig>,
107 pub partition_columns: Vec<String>,
109 pub order_columns: Vec<String>,
111 pub max_partitions: usize,
113}
114
115impl WindowFrameConfig {
116 #[must_use]
118 pub fn from_analysis(analysis: &WindowFrameAnalysis) -> Self {
119 let functions = analysis
120 .functions
121 .iter()
122 .map(|f| WindowFrameFunctionConfig {
123 function_type: f.function_type,
124 source_column: f.column.clone(),
125 units: f.units,
126 start_bound: f.start_bound.clone(),
127 end_bound: f.end_bound.clone(),
128 output_alias: f.alias.clone(),
129 })
130 .collect();
131
132 Self {
133 functions,
134 partition_columns: analysis.partition_columns.clone(),
135 order_columns: analysis.order_columns.clone(),
136 max_partitions: DEFAULT_MAX_PARTITIONS,
137 }
138 }
139
140 #[must_use]
142 pub fn with_max_partitions(mut self, max_partitions: usize) -> Self {
143 self.max_partitions = max_partitions;
144 self
145 }
146
147 #[must_use]
149 pub fn has_following(&self) -> bool {
150 self.functions.iter().any(|f| {
151 matches!(
152 f.end_bound,
153 FrameBound::Following(_) | FrameBound::UnboundedFollowing
154 ) || matches!(
155 f.start_bound,
156 FrameBound::Following(_) | FrameBound::UnboundedFollowing
157 )
158 })
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use crate::parser::analytic_parser::{AnalyticFunctionInfo, AnalyticWindowAnalysis};
166
167 fn make_lag_analysis() -> AnalyticWindowAnalysis {
168 AnalyticWindowAnalysis {
169 functions: vec![AnalyticFunctionInfo {
170 function_type: AnalyticFunctionType::Lag,
171 column: "price".to_string(),
172 offset: 1,
173 default_value: None,
174 alias: Some("prev_price".to_string()),
175 }],
176 partition_columns: vec!["symbol".to_string()],
177 order_columns: vec!["ts".to_string()],
178 }
179 }
180
181 #[test]
182 fn test_from_analysis_lag() {
183 let analysis = make_lag_analysis();
184 let config = AnalyticWindowConfig::from_analysis(&analysis);
185 assert_eq!(config.functions.len(), 1);
186 assert_eq!(config.functions[0].function_type, AnalyticFunctionType::Lag);
187 assert_eq!(config.functions[0].source_column, "price");
188 assert_eq!(config.functions[0].offset, 1);
189 assert_eq!(
190 config.functions[0].output_alias.as_deref(),
191 Some("prev_price")
192 );
193 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
194 assert_eq!(config.order_columns, vec!["ts".to_string()]);
195 assert!(!config.has_lookahead());
196 }
197
198 #[test]
199 fn test_from_analysis_lead() {
200 let analysis = AnalyticWindowAnalysis {
201 functions: vec![AnalyticFunctionInfo {
202 function_type: AnalyticFunctionType::Lead,
203 column: "price".to_string(),
204 offset: 2,
205 default_value: Some("0".to_string()),
206 alias: Some("next_price".to_string()),
207 }],
208 partition_columns: vec![],
209 order_columns: vec!["ts".to_string()],
210 };
211 let config = AnalyticWindowConfig::from_analysis(&analysis);
212 assert!(config.has_lookahead());
213 assert_eq!(config.functions[0].offset, 2);
214 assert_eq!(config.functions[0].default_value.as_deref(), Some("0"));
215 }
216
217 #[test]
218 fn test_max_partitions() {
219 let analysis = make_lag_analysis();
220 let config = AnalyticWindowConfig::from_analysis(&analysis).with_max_partitions(500);
221 assert_eq!(config.max_partitions, 500);
222 }
223
224 #[test]
225 fn test_multiple_functions() {
226 let analysis = AnalyticWindowAnalysis {
227 functions: vec![
228 AnalyticFunctionInfo {
229 function_type: AnalyticFunctionType::Lag,
230 column: "price".to_string(),
231 offset: 1,
232 default_value: None,
233 alias: Some("prev".to_string()),
234 },
235 AnalyticFunctionInfo {
236 function_type: AnalyticFunctionType::Lead,
237 column: "price".to_string(),
238 offset: 1,
239 default_value: None,
240 alias: Some("next".to_string()),
241 },
242 ],
243 partition_columns: vec!["sym".to_string()],
244 order_columns: vec!["ts".to_string()],
245 };
246 let config = AnalyticWindowConfig::from_analysis(&analysis);
247 assert_eq!(config.functions.len(), 2);
248 assert!(config.has_lookahead());
249 }
250
251 #[test]
252 fn test_default_max_partitions() {
253 let analysis = make_lag_analysis();
254 let config = AnalyticWindowConfig::from_analysis(&analysis);
255 assert_eq!(config.max_partitions, DEFAULT_MAX_PARTITIONS);
256 }
257
258 use crate::parser::analytic_parser::WindowFrameInfo;
261
262 fn make_frame_analysis() -> WindowFrameAnalysis {
263 WindowFrameAnalysis {
264 functions: vec![WindowFrameInfo {
265 function_type: WindowFrameFunction::Avg,
266 column: "price".to_string(),
267 units: FrameUnits::Rows,
268 start_bound: FrameBound::Preceding(9),
269 end_bound: FrameBound::CurrentRow,
270 alias: Some("ma".to_string()),
271 }],
272 partition_columns: vec!["symbol".to_string()],
273 order_columns: vec!["ts".to_string()],
274 }
275 }
276
277 #[test]
278 fn test_frame_from_analysis_basic() {
279 let analysis = make_frame_analysis();
280 let config = WindowFrameConfig::from_analysis(&analysis);
281 assert_eq!(config.functions.len(), 1);
282 assert_eq!(config.functions[0].function_type, WindowFrameFunction::Avg);
283 assert_eq!(config.functions[0].source_column, "price");
284 assert_eq!(config.functions[0].units, FrameUnits::Rows);
285 assert_eq!(config.functions[0].start_bound, FrameBound::Preceding(9));
286 assert_eq!(config.functions[0].end_bound, FrameBound::CurrentRow);
287 assert_eq!(config.functions[0].output_alias.as_deref(), Some("ma"));
288 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
289 assert_eq!(config.order_columns, vec!["ts".to_string()]);
290 assert!(!config.has_following());
291 }
292
293 #[test]
294 fn test_frame_max_partitions_builder() {
295 let analysis = make_frame_analysis();
296 let config = WindowFrameConfig::from_analysis(&analysis).with_max_partitions(500);
297 assert_eq!(config.max_partitions, 500);
298 }
299
300 #[test]
301 fn test_frame_has_following() {
302 let analysis = WindowFrameAnalysis {
303 functions: vec![WindowFrameInfo {
304 function_type: WindowFrameFunction::Sum,
305 column: "amount".to_string(),
306 units: FrameUnits::Rows,
307 start_bound: FrameBound::Preceding(5),
308 end_bound: FrameBound::Following(3),
309 alias: None,
310 }],
311 partition_columns: vec![],
312 order_columns: vec!["id".to_string()],
313 };
314 let config = WindowFrameConfig::from_analysis(&analysis);
315 assert!(config.has_following());
316 }
317
318 #[test]
319 fn test_frame_multiple_functions_config() {
320 let analysis = WindowFrameAnalysis {
321 functions: vec![
322 WindowFrameInfo {
323 function_type: WindowFrameFunction::Avg,
324 column: "price".to_string(),
325 units: FrameUnits::Rows,
326 start_bound: FrameBound::Preceding(9),
327 end_bound: FrameBound::CurrentRow,
328 alias: Some("ma".to_string()),
329 },
330 WindowFrameInfo {
331 function_type: WindowFrameFunction::Max,
332 column: "price".to_string(),
333 units: FrameUnits::Rows,
334 start_bound: FrameBound::Preceding(4),
335 end_bound: FrameBound::CurrentRow,
336 alias: Some("hi".to_string()),
337 },
338 ],
339 partition_columns: vec!["symbol".to_string()],
340 order_columns: vec!["ts".to_string()],
341 };
342 let config = WindowFrameConfig::from_analysis(&analysis);
343 assert_eq!(config.functions.len(), 2);
344 assert_eq!(config.functions[0].function_type, WindowFrameFunction::Avg);
345 assert_eq!(config.functions[1].function_type, WindowFrameFunction::Max);
346 }
347}