1use crate::parser::order_analyzer::{OrderAnalysis, OrderColumn, OrderPattern, RankType};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum OrderOperatorConfig {
13 SourceSatisfied,
15
16 TopK(TopKConfig),
20
21 WindowLocalSort(WindowLocalSortConfig),
25
26 WatermarkBoundedSort(WatermarkSortConfig),
30
31 PerGroupTopK(PerGroupTopKConfig),
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct TopKConfig {
40 pub k: usize,
42 pub sort_columns: Vec<OrderColumn>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct WindowLocalSortConfig {
49 pub sort_columns: Vec<OrderColumn>,
51 pub limit: Option<usize>,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct WatermarkSortConfig {
58 pub sort_columns: Vec<OrderColumn>,
60 pub max_buffer_size: usize,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct PerGroupTopKConfig {
67 pub k: usize,
69 pub partition_columns: Vec<String>,
71 pub sort_columns: Vec<OrderColumn>,
73 pub max_partitions: usize,
75 pub rank_type: RankType,
77}
78
79const DEFAULT_MAX_BUFFER_SIZE: usize = 100_000;
81
82const DEFAULT_MAX_PARTITIONS: usize = 10_000;
84
85impl OrderOperatorConfig {
86 pub fn from_analysis(analysis: &OrderAnalysis) -> Result<Option<Self>, String> {
95 match &analysis.pattern {
96 OrderPattern::None => Ok(None),
97 OrderPattern::SourceSatisfied => Ok(Some(Self::SourceSatisfied)),
98 OrderPattern::TopK { k } => Ok(Some(Self::TopK(TopKConfig {
99 k: *k,
100 sort_columns: analysis.order_columns.clone(),
101 }))),
102 OrderPattern::WindowLocal => Ok(Some(Self::WindowLocalSort(WindowLocalSortConfig {
103 sort_columns: analysis.order_columns.clone(),
104 limit: analysis.limit,
105 }))),
106 OrderPattern::PerGroupTopK {
107 k,
108 partition_columns,
109 rank_type,
110 } => Ok(Some(Self::PerGroupTopK(PerGroupTopKConfig {
111 k: *k,
112 partition_columns: partition_columns.clone(),
113 sort_columns: analysis.order_columns.clone(),
114 max_partitions: DEFAULT_MAX_PARTITIONS,
115 rank_type: *rank_type,
116 }))),
117 OrderPattern::Unbounded => Err(
118 "ORDER BY without LIMIT is not supported on unbounded streams. \
119 Add LIMIT N or use ORDER BY within a windowed aggregation."
120 .to_string(),
121 ),
122 }
123 }
124
125 #[must_use]
127 pub fn watermark_bounded(sort_columns: Vec<OrderColumn>) -> Self {
128 Self::WatermarkBoundedSort(WatermarkSortConfig {
129 sort_columns,
130 max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
131 })
132 }
133}
134
135impl TopKConfig {
136 #[must_use]
138 pub fn new(k: usize, sort_columns: Vec<OrderColumn>) -> Self {
139 Self { k, sort_columns }
140 }
141}
142
143impl PerGroupTopKConfig {
144 #[must_use]
146 pub fn with_max_partitions(mut self, max_partitions: usize) -> Self {
147 self.max_partitions = max_partitions;
148 self
149 }
150}
151
152impl WatermarkSortConfig {
153 #[must_use]
155 pub fn with_max_buffer_size(mut self, max_buffer_size: usize) -> Self {
156 self.max_buffer_size = max_buffer_size;
157 self
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 fn make_sort_columns() -> Vec<OrderColumn> {
166 vec![OrderColumn {
167 column: "price".to_string(),
168 descending: true,
169 nulls_first: false,
170 }]
171 }
172
173 #[test]
174 fn test_topk_config_from_analysis() {
175 let analysis = OrderAnalysis {
176 order_columns: make_sort_columns(),
177 limit: Some(10),
178 is_windowed: false,
179 pattern: OrderPattern::TopK { k: 10 },
180 };
181 let config = OrderOperatorConfig::from_analysis(&analysis)
182 .unwrap()
183 .unwrap();
184 match config {
185 OrderOperatorConfig::TopK(cfg) => {
186 assert_eq!(cfg.k, 10);
187 assert_eq!(cfg.sort_columns.len(), 1);
188 assert_eq!(cfg.sort_columns[0].column, "price");
189 }
190 _ => panic!("Expected TopK config"),
191 }
192 }
193
194 #[test]
195 fn test_per_group_topk_config() {
196 let analysis = OrderAnalysis {
197 order_columns: make_sort_columns(),
198 limit: Some(5),
199 is_windowed: false,
200 pattern: OrderPattern::PerGroupTopK {
201 k: 5,
202 partition_columns: vec!["category".to_string()],
203 rank_type: RankType::RowNumber,
204 },
205 };
206 let config = OrderOperatorConfig::from_analysis(&analysis)
207 .unwrap()
208 .unwrap();
209 match config {
210 OrderOperatorConfig::PerGroupTopK(cfg) => {
211 assert_eq!(cfg.k, 5);
212 assert_eq!(cfg.partition_columns, vec!["category".to_string()]);
213 assert_eq!(cfg.max_partitions, DEFAULT_MAX_PARTITIONS);
214 assert_eq!(cfg.rank_type, RankType::RowNumber);
215 }
216 _ => panic!("Expected PerGroupTopK config"),
217 }
218 }
219
220 #[test]
221 fn test_window_local_sort_config() {
222 let analysis = OrderAnalysis {
223 order_columns: make_sort_columns(),
224 limit: None,
225 is_windowed: true,
226 pattern: OrderPattern::WindowLocal,
227 };
228 let config = OrderOperatorConfig::from_analysis(&analysis)
229 .unwrap()
230 .unwrap();
231 match config {
232 OrderOperatorConfig::WindowLocalSort(cfg) => {
233 assert_eq!(cfg.sort_columns.len(), 1);
234 assert!(cfg.limit.is_none());
235 }
236 _ => panic!("Expected WindowLocalSort config"),
237 }
238 }
239
240 #[test]
241 fn test_source_satisfied_config() {
242 let analysis = OrderAnalysis {
243 order_columns: make_sort_columns(),
244 limit: None,
245 is_windowed: false,
246 pattern: OrderPattern::SourceSatisfied,
247 };
248 let config = OrderOperatorConfig::from_analysis(&analysis)
249 .unwrap()
250 .unwrap();
251 assert_eq!(config, OrderOperatorConfig::SourceSatisfied);
252 }
253
254 #[test]
255 fn test_unbounded_rejected() {
256 let analysis = OrderAnalysis {
257 order_columns: make_sort_columns(),
258 limit: None,
259 is_windowed: false,
260 pattern: OrderPattern::Unbounded,
261 };
262 let result = OrderOperatorConfig::from_analysis(&analysis);
263 assert!(result.is_err());
264 assert!(result.unwrap_err().contains("ORDER BY without LIMIT"));
265 }
266
267 #[test]
268 fn test_no_order_by_returns_none() {
269 let analysis = OrderAnalysis {
270 order_columns: vec![],
271 limit: None,
272 is_windowed: false,
273 pattern: OrderPattern::None,
274 };
275 let config = OrderOperatorConfig::from_analysis(&analysis).unwrap();
276 assert!(config.is_none());
277 }
278
279 #[test]
280 fn test_watermark_bounded_config() {
281 let sort_cols = make_sort_columns();
282 let config = OrderOperatorConfig::watermark_bounded(sort_cols);
283 match config {
284 OrderOperatorConfig::WatermarkBoundedSort(cfg) => {
285 assert_eq!(cfg.max_buffer_size, DEFAULT_MAX_BUFFER_SIZE);
286 }
287 _ => panic!("Expected WatermarkBoundedSort config"),
288 }
289 }
290
291 #[test]
292 fn test_per_group_topk_with_max_partitions() {
293 let cfg = PerGroupTopKConfig {
294 k: 5,
295 partition_columns: vec!["cat".to_string()],
296 sort_columns: make_sort_columns(),
297 max_partitions: DEFAULT_MAX_PARTITIONS,
298 rank_type: RankType::RowNumber,
299 }
300 .with_max_partitions(500);
301 assert_eq!(cfg.max_partitions, 500);
302 }
303
304 #[test]
305 fn test_watermark_sort_with_max_buffer() {
306 let cfg = WatermarkSortConfig {
307 sort_columns: make_sort_columns(),
308 max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
309 }
310 .with_max_buffer_size(50_000);
311 assert_eq!(cfg.max_buffer_size, 50_000);
312 }
313
314 #[test]
315 fn test_per_group_topk_rank_type() {
316 let analysis = OrderAnalysis {
317 order_columns: make_sort_columns(),
318 limit: Some(3),
319 is_windowed: false,
320 pattern: OrderPattern::PerGroupTopK {
321 k: 3,
322 partition_columns: vec!["region".to_string()],
323 rank_type: RankType::Rank,
324 },
325 };
326 let config = OrderOperatorConfig::from_analysis(&analysis)
327 .unwrap()
328 .unwrap();
329 match config {
330 OrderOperatorConfig::PerGroupTopK(cfg) => {
331 assert_eq!(cfg.rank_type, RankType::Rank);
332 }
333 _ => panic!("Expected PerGroupTopK config"),
334 }
335 }
336
337 #[test]
338 fn test_per_group_topk_dense_rank() {
339 let analysis = OrderAnalysis {
340 order_columns: make_sort_columns(),
341 limit: Some(10),
342 is_windowed: false,
343 pattern: OrderPattern::PerGroupTopK {
344 k: 10,
345 partition_columns: vec!["cat".to_string()],
346 rank_type: RankType::DenseRank,
347 },
348 };
349 let config = OrderOperatorConfig::from_analysis(&analysis)
350 .unwrap()
351 .unwrap();
352 match config {
353 OrderOperatorConfig::PerGroupTopK(cfg) => {
354 assert_eq!(cfg.rank_type, RankType::DenseRank);
355 assert_eq!(cfg.k, 10);
356 }
357 _ => panic!("Expected PerGroupTopK config"),
358 }
359 }
360}