Skip to main content

laminar_sql/translator/
order_translator.rs

1//! ORDER BY operator configuration builder
2//!
3//! Translates parsed ORDER BY analysis into Ring 0 operator configurations
4//! that can be instantiated for streaming ORDER BY execution.
5
6use crate::parser::order_analyzer::{OrderAnalysis, OrderColumn, OrderPattern, RankType};
7
8/// Configuration for a streaming ORDER BY operator.
9///
10/// Each variant corresponds to a different bounded streaming sort strategy.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum OrderOperatorConfig {
13    /// Source already satisfies the ordering — no operator needed.
14    SourceSatisfied,
15
16    /// Streaming top-K: ORDER BY ... LIMIT N.
17    ///
18    /// Maintains a bounded heap of K entries with retraction-based emission.
19    TopK(TopKConfig),
20
21    /// Window-local sort: ORDER BY inside a windowed aggregation.
22    ///
23    /// Buffers per-window output, sorts on window close.
24    WindowLocalSort(WindowLocalSortConfig),
25
26    /// Watermark-bounded sort: ORDER BY event_time on out-of-order input.
27    ///
28    /// Buffers events between watermarks, emits sorted on watermark advance.
29    WatermarkBoundedSort(WatermarkSortConfig),
30
31    /// Per-group top-K: ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...) WHERE rn <= N.
32    ///
33    /// Independent top-K heaps per partition key.
34    PerGroupTopK(PerGroupTopKConfig),
35}
36
37/// Configuration for streaming top-K operator.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct TopKConfig {
40    /// Number of top entries to maintain
41    pub k: usize,
42    /// Sort columns and directions
43    pub sort_columns: Vec<OrderColumn>,
44}
45
46/// Configuration for window-local sort operator.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct WindowLocalSortConfig {
49    /// Sort columns and directions
50    pub sort_columns: Vec<OrderColumn>,
51    /// Optional LIMIT for top-N within window
52    pub limit: Option<usize>,
53}
54
55/// Configuration for watermark-bounded sort operator.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct WatermarkSortConfig {
58    /// Sort columns and directions
59    pub sort_columns: Vec<OrderColumn>,
60    /// Maximum buffer size (events)
61    pub max_buffer_size: usize,
62}
63
64/// Configuration for per-group (partitioned) top-K operator.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct PerGroupTopKConfig {
67    /// Number of top entries per partition
68    pub k: usize,
69    /// Partition key columns
70    pub partition_columns: Vec<String>,
71    /// Sort columns and directions
72    pub sort_columns: Vec<OrderColumn>,
73    /// Maximum number of partitions (memory safety)
74    pub max_partitions: usize,
75    /// Which ranking function was used (ROW_NUMBER, RANK, or DENSE_RANK)
76    pub rank_type: RankType,
77}
78
79/// Default maximum buffer size for watermark-bounded sort.
80const DEFAULT_MAX_BUFFER_SIZE: usize = 100_000;
81
82/// Default maximum partitions for per-group top-K.
83const DEFAULT_MAX_PARTITIONS: usize = 10_000;
84
85impl OrderOperatorConfig {
86    /// Creates an operator configuration from an ORDER BY analysis.
87    ///
88    /// Maps the classified pattern to the appropriate operator config.
89    /// Returns `None` for `OrderPattern::None` (no ORDER BY).
90    ///
91    /// # Errors
92    ///
93    /// Returns error string for `OrderPattern::Unbounded`.
94    pub fn from_analysis(analysis: &OrderAnalysis) -> Result<Option<Self>, String> {
95        match &analysis.pattern {
96            OrderPattern::None => Ok(None),
97            OrderPattern::SourceSatisfied => Ok(Some(Self::SourceSatisfied)),
98            OrderPattern::TopK { k } => Ok(Some(Self::TopK(TopKConfig {
99                k: *k,
100                sort_columns: analysis.order_columns.clone(),
101            }))),
102            OrderPattern::WindowLocal => Ok(Some(Self::WindowLocalSort(WindowLocalSortConfig {
103                sort_columns: analysis.order_columns.clone(),
104                limit: analysis.limit,
105            }))),
106            OrderPattern::PerGroupTopK {
107                k,
108                partition_columns,
109                rank_type,
110            } => Ok(Some(Self::PerGroupTopK(PerGroupTopKConfig {
111                k: *k,
112                partition_columns: partition_columns.clone(),
113                sort_columns: analysis.order_columns.clone(),
114                max_partitions: DEFAULT_MAX_PARTITIONS,
115                rank_type: *rank_type,
116            }))),
117            OrderPattern::Unbounded => Err(
118                "ORDER BY without LIMIT is not supported on unbounded streams. \
119                 Add LIMIT N or use ORDER BY within a windowed aggregation."
120                    .to_string(),
121            ),
122        }
123    }
124
125    /// Creates a watermark-bounded sort config for event time ordering.
126    #[must_use]
127    pub fn watermark_bounded(sort_columns: Vec<OrderColumn>) -> Self {
128        Self::WatermarkBoundedSort(WatermarkSortConfig {
129            sort_columns,
130            max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
131        })
132    }
133}
134
135impl TopKConfig {
136    /// Creates a new top-K configuration.
137    #[must_use]
138    pub fn new(k: usize, sort_columns: Vec<OrderColumn>) -> Self {
139        Self { k, sort_columns }
140    }
141}
142
143impl PerGroupTopKConfig {
144    /// Sets the maximum number of partitions.
145    #[must_use]
146    pub fn with_max_partitions(mut self, max_partitions: usize) -> Self {
147        self.max_partitions = max_partitions;
148        self
149    }
150}
151
152impl WatermarkSortConfig {
153    /// Sets the maximum buffer size.
154    #[must_use]
155    pub fn with_max_buffer_size(mut self, max_buffer_size: usize) -> Self {
156        self.max_buffer_size = max_buffer_size;
157        self
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    fn make_sort_columns() -> Vec<OrderColumn> {
166        vec![OrderColumn {
167            column: "price".to_string(),
168            descending: true,
169            nulls_first: false,
170        }]
171    }
172
173    #[test]
174    fn test_topk_config_from_analysis() {
175        let analysis = OrderAnalysis {
176            order_columns: make_sort_columns(),
177            limit: Some(10),
178            is_windowed: false,
179            pattern: OrderPattern::TopK { k: 10 },
180        };
181        let config = OrderOperatorConfig::from_analysis(&analysis)
182            .unwrap()
183            .unwrap();
184        match config {
185            OrderOperatorConfig::TopK(cfg) => {
186                assert_eq!(cfg.k, 10);
187                assert_eq!(cfg.sort_columns.len(), 1);
188                assert_eq!(cfg.sort_columns[0].column, "price");
189            }
190            _ => panic!("Expected TopK config"),
191        }
192    }
193
194    #[test]
195    fn test_per_group_topk_config() {
196        let analysis = OrderAnalysis {
197            order_columns: make_sort_columns(),
198            limit: Some(5),
199            is_windowed: false,
200            pattern: OrderPattern::PerGroupTopK {
201                k: 5,
202                partition_columns: vec!["category".to_string()],
203                rank_type: RankType::RowNumber,
204            },
205        };
206        let config = OrderOperatorConfig::from_analysis(&analysis)
207            .unwrap()
208            .unwrap();
209        match config {
210            OrderOperatorConfig::PerGroupTopK(cfg) => {
211                assert_eq!(cfg.k, 5);
212                assert_eq!(cfg.partition_columns, vec!["category".to_string()]);
213                assert_eq!(cfg.max_partitions, DEFAULT_MAX_PARTITIONS);
214                assert_eq!(cfg.rank_type, RankType::RowNumber);
215            }
216            _ => panic!("Expected PerGroupTopK config"),
217        }
218    }
219
220    #[test]
221    fn test_window_local_sort_config() {
222        let analysis = OrderAnalysis {
223            order_columns: make_sort_columns(),
224            limit: None,
225            is_windowed: true,
226            pattern: OrderPattern::WindowLocal,
227        };
228        let config = OrderOperatorConfig::from_analysis(&analysis)
229            .unwrap()
230            .unwrap();
231        match config {
232            OrderOperatorConfig::WindowLocalSort(cfg) => {
233                assert_eq!(cfg.sort_columns.len(), 1);
234                assert!(cfg.limit.is_none());
235            }
236            _ => panic!("Expected WindowLocalSort config"),
237        }
238    }
239
240    #[test]
241    fn test_source_satisfied_config() {
242        let analysis = OrderAnalysis {
243            order_columns: make_sort_columns(),
244            limit: None,
245            is_windowed: false,
246            pattern: OrderPattern::SourceSatisfied,
247        };
248        let config = OrderOperatorConfig::from_analysis(&analysis)
249            .unwrap()
250            .unwrap();
251        assert_eq!(config, OrderOperatorConfig::SourceSatisfied);
252    }
253
254    #[test]
255    fn test_unbounded_rejected() {
256        let analysis = OrderAnalysis {
257            order_columns: make_sort_columns(),
258            limit: None,
259            is_windowed: false,
260            pattern: OrderPattern::Unbounded,
261        };
262        let result = OrderOperatorConfig::from_analysis(&analysis);
263        assert!(result.is_err());
264        assert!(result.unwrap_err().contains("ORDER BY without LIMIT"));
265    }
266
267    #[test]
268    fn test_no_order_by_returns_none() {
269        let analysis = OrderAnalysis {
270            order_columns: vec![],
271            limit: None,
272            is_windowed: false,
273            pattern: OrderPattern::None,
274        };
275        let config = OrderOperatorConfig::from_analysis(&analysis).unwrap();
276        assert!(config.is_none());
277    }
278
279    #[test]
280    fn test_watermark_bounded_config() {
281        let sort_cols = make_sort_columns();
282        let config = OrderOperatorConfig::watermark_bounded(sort_cols);
283        match config {
284            OrderOperatorConfig::WatermarkBoundedSort(cfg) => {
285                assert_eq!(cfg.max_buffer_size, DEFAULT_MAX_BUFFER_SIZE);
286            }
287            _ => panic!("Expected WatermarkBoundedSort config"),
288        }
289    }
290
291    #[test]
292    fn test_per_group_topk_with_max_partitions() {
293        let cfg = PerGroupTopKConfig {
294            k: 5,
295            partition_columns: vec!["cat".to_string()],
296            sort_columns: make_sort_columns(),
297            max_partitions: DEFAULT_MAX_PARTITIONS,
298            rank_type: RankType::RowNumber,
299        }
300        .with_max_partitions(500);
301        assert_eq!(cfg.max_partitions, 500);
302    }
303
304    #[test]
305    fn test_watermark_sort_with_max_buffer() {
306        let cfg = WatermarkSortConfig {
307            sort_columns: make_sort_columns(),
308            max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
309        }
310        .with_max_buffer_size(50_000);
311        assert_eq!(cfg.max_buffer_size, 50_000);
312    }
313
314    #[test]
315    fn test_per_group_topk_rank_type() {
316        let analysis = OrderAnalysis {
317            order_columns: make_sort_columns(),
318            limit: Some(3),
319            is_windowed: false,
320            pattern: OrderPattern::PerGroupTopK {
321                k: 3,
322                partition_columns: vec!["region".to_string()],
323                rank_type: RankType::Rank,
324            },
325        };
326        let config = OrderOperatorConfig::from_analysis(&analysis)
327            .unwrap()
328            .unwrap();
329        match config {
330            OrderOperatorConfig::PerGroupTopK(cfg) => {
331                assert_eq!(cfg.rank_type, RankType::Rank);
332            }
333            _ => panic!("Expected PerGroupTopK config"),
334        }
335    }
336
337    #[test]
338    fn test_per_group_topk_dense_rank() {
339        let analysis = OrderAnalysis {
340            order_columns: make_sort_columns(),
341            limit: Some(10),
342            is_windowed: false,
343            pattern: OrderPattern::PerGroupTopK {
344                k: 10,
345                partition_columns: vec!["cat".to_string()],
346                rank_type: RankType::DenseRank,
347            },
348        };
349        let config = OrderOperatorConfig::from_analysis(&analysis)
350            .unwrap()
351            .unwrap();
352        match config {
353            OrderOperatorConfig::PerGroupTopK(cfg) => {
354                assert_eq!(cfg.rank_type, RankType::DenseRank);
355                assert_eq!(cfg.k, 10);
356            }
357            _ => panic!("Expected PerGroupTopK config"),
358        }
359    }
360}