Skip to main content

cynos_query/optimizer/
order_by_index.rs

1//! Order by index pass - leverages indexes to avoid explicit sorting.
2//!
3//! This pass identifies Sort nodes where the sort columns match an available
4//! index, and replaces the TableScan + Sort with an IndexScan that produces
5//! results in the desired order.
6//!
7//! Example:
8//! ```text
9//! Sort(id ASC)                =>    IndexScan(idx_id, ASC)
10//!      |                                  |
11//! TableScan(users)                  (no Sort needed)
12//! ```
13//!
14//! This optimization is beneficial when:
15//! 1. The sort columns match an index's columns in order
16//! 2. The sort direction matches (or is exactly reversed from) the index order
17//! 3. There are no intervening operations that would disrupt ordering
18
19use crate::ast::{Expr, SortOrder};
20use crate::context::ExecutionContext;
21use crate::planner::PhysicalPlan;
22use alloc::boxed::Box;
23use alloc::string::String;
24use alloc::vec::Vec;
25
26/// Pass that leverages indexes to avoid explicit sorting.
27pub struct OrderByIndexPass<'a> {
28    ctx: &'a ExecutionContext,
29}
30
31impl<'a> OrderByIndexPass<'a> {
32    /// Creates a new OrderByIndexPass with the given execution context.
33    pub fn new(ctx: &'a ExecutionContext) -> Self {
34        Self { ctx }
35    }
36
37    /// Optimizes the physical plan by leveraging indexes for sorting.
38    pub fn optimize(&self, plan: PhysicalPlan) -> PhysicalPlan {
39        self.traverse(plan)
40    }
41
42    fn traverse(&self, plan: PhysicalPlan) -> PhysicalPlan {
43        match plan {
44            PhysicalPlan::Sort { input, order_by } => {
45                let optimized_input = self.traverse(*input);
46
47                // Try to optimize using TableScan
48                if let Some(optimized) =
49                    self.try_optimize_table_scan(&optimized_input, &order_by)
50                {
51                    return optimized;
52                }
53
54                // Try to optimize using existing IndexScan
55                if let Some(optimized) =
56                    self.try_optimize_index_scan(optimized_input.clone(), &order_by)
57                {
58                    return optimized;
59                }
60
61                // No optimization possible, keep the Sort
62                PhysicalPlan::Sort {
63                    input: Box::new(optimized_input),
64                    order_by,
65                }
66            }
67
68            // Recursively process other nodes
69            PhysicalPlan::Filter { input, predicate } => PhysicalPlan::Filter {
70                input: Box::new(self.traverse(*input)),
71                predicate,
72            },
73
74            PhysicalPlan::Project { input, columns } => PhysicalPlan::Project {
75                input: Box::new(self.traverse(*input)),
76                columns,
77            },
78
79            PhysicalPlan::HashJoin {
80                left,
81                right,
82                condition,
83                join_type,
84            } => PhysicalPlan::HashJoin {
85                left: Box::new(self.traverse(*left)),
86                right: Box::new(self.traverse(*right)),
87                condition,
88                join_type,
89            },
90
91            PhysicalPlan::SortMergeJoin {
92                left,
93                right,
94                condition,
95                join_type,
96            } => PhysicalPlan::SortMergeJoin {
97                left: Box::new(self.traverse(*left)),
98                right: Box::new(self.traverse(*right)),
99                condition,
100                join_type,
101            },
102
103            PhysicalPlan::NestedLoopJoin {
104                left,
105                right,
106                condition,
107                join_type,
108            } => PhysicalPlan::NestedLoopJoin {
109                left: Box::new(self.traverse(*left)),
110                right: Box::new(self.traverse(*right)),
111                condition,
112                join_type,
113            },
114
115            PhysicalPlan::IndexNestedLoopJoin {
116                outer,
117                inner_table,
118                inner_index,
119                condition,
120                join_type,
121            } => PhysicalPlan::IndexNestedLoopJoin {
122                outer: Box::new(self.traverse(*outer)),
123                inner_table,
124                inner_index,
125                condition,
126                join_type,
127            },
128
129            PhysicalPlan::HashAggregate {
130                input,
131                group_by,
132                aggregates,
133            } => PhysicalPlan::HashAggregate {
134                input: Box::new(self.traverse(*input)),
135                group_by,
136                aggregates,
137            },
138
139            PhysicalPlan::Limit {
140                input,
141                limit,
142                offset,
143            } => PhysicalPlan::Limit {
144                input: Box::new(self.traverse(*input)),
145                limit,
146                offset,
147            },
148
149            PhysicalPlan::CrossProduct { left, right } => PhysicalPlan::CrossProduct {
150                left: Box::new(self.traverse(*left)),
151                right: Box::new(self.traverse(*right)),
152            },
153
154            PhysicalPlan::NoOp { input } => PhysicalPlan::NoOp {
155                input: Box::new(self.traverse(*input)),
156            },
157
158            PhysicalPlan::TopN {
159                input,
160                order_by,
161                limit,
162                offset,
163            } => {
164                let optimized_input = self.traverse(*input);
165
166                // Try to optimize TopN(TableScan) -> IndexScan with limit
167                if let Some(optimized) =
168                    self.try_optimize_topn_table_scan(&optimized_input, &order_by, limit, offset)
169                {
170                    return optimized;
171                }
172
173                // No optimization possible, keep the TopN
174                PhysicalPlan::TopN {
175                    input: Box::new(optimized_input),
176                    order_by,
177                    limit,
178                    offset,
179                }
180            }
181
182            // Leaf nodes - no transformation
183            plan @ (PhysicalPlan::TableScan { .. }
184            | PhysicalPlan::IndexScan { .. }
185            | PhysicalPlan::IndexGet { .. }
186            | PhysicalPlan::IndexInGet { .. }
187            | PhysicalPlan::GinIndexScan { .. }
188            | PhysicalPlan::GinIndexScanMulti { .. }
189            | PhysicalPlan::Empty) => plan,
190        }
191    }
192
193    /// Tries to replace TableScan + Sort with IndexScan.
194    fn try_optimize_table_scan(
195        &self,
196        plan: &PhysicalPlan,
197        order_by: &[(Expr, SortOrder)],
198    ) -> Option<PhysicalPlan> {
199        // Find a TableScan that can be optimized
200        let table_scan = self.find_table_scan(plan)?;
201
202        // Extract column names from order_by
203        let order_columns: Vec<String> = order_by
204            .iter()
205            .filter_map(|(expr, _)| self.extract_column_name(expr))
206            .collect();
207
208        if order_columns.len() != order_by.len() {
209            return None; // Not all order_by expressions are simple columns
210        }
211
212        // Find an index that matches the order_by columns
213        let column_refs: Vec<&str> = order_columns.iter().map(|s| s.as_str()).collect();
214        let index = self.ctx.find_index(&table_scan, &column_refs)?;
215
216        // Check if the sort order matches (natural or reversed)
217        let is_reverse = self.check_order_match(order_by, &index.columns)?;
218
219        // Create IndexScan with appropriate range (full scan)
220        Some(PhysicalPlan::IndexScan {
221            table: table_scan,
222            index: index.name.clone(),
223            range_start: None,
224            range_end: None,
225            include_start: true,
226            include_end: true,
227            limit: None,
228            offset: None,
229            reverse: is_reverse,
230        })
231    }
232
233    /// Tries to replace TopN(TableScan) with IndexScan(limit, offset).
234    /// This enables true LIMIT pushdown to the storage layer.
235    fn try_optimize_topn_table_scan(
236        &self,
237        plan: &PhysicalPlan,
238        order_by: &[(Expr, SortOrder)],
239        limit: usize,
240        offset: usize,
241    ) -> Option<PhysicalPlan> {
242        // Find a TableScan that can be optimized
243        let table_scan = self.find_table_scan(plan)?;
244
245        // Extract column names from order_by
246        let order_columns: Vec<String> = order_by
247            .iter()
248            .filter_map(|(expr, _)| self.extract_column_name(expr))
249            .collect();
250
251        if order_columns.len() != order_by.len() {
252            return None; // Not all order_by expressions are simple columns
253        }
254
255        // Find an index that matches the order_by columns
256        let column_refs: Vec<&str> = order_columns.iter().map(|s| s.as_str()).collect();
257        let index = self.ctx.find_index(&table_scan, &column_refs)?;
258
259        // Check if the sort order matches (natural or reversed)
260        let is_reverse = self.check_order_match(order_by, &index.columns)?;
261
262        // Create IndexScan with limit and offset pushed down
263        Some(PhysicalPlan::IndexScan {
264            table: table_scan,
265            index: index.name.clone(),
266            range_start: None,
267            range_end: None,
268            include_start: true,
269            include_end: true,
270            limit: Some(limit),
271            offset: Some(offset),
272            reverse: is_reverse,
273        })
274    }
275
276    /// Tries to optimize an existing IndexScan by setting its reverse flag.
277    fn try_optimize_index_scan(
278        &self,
279        plan: PhysicalPlan,
280        order_by: &[(Expr, SortOrder)],
281    ) -> Option<PhysicalPlan> {
282        // Find an IndexScan in the plan
283        if let PhysicalPlan::IndexScan {
284            table,
285            index,
286            range_start,
287            range_end,
288            include_start,
289            include_end,
290            limit,
291            offset,
292            ..
293        } = &plan
294        {
295            // Get index info
296            let index_info = self.ctx.find_index(table, &[])?;
297
298            // Check if this index matches the order_by
299            let order_columns: Vec<String> = order_by
300                .iter()
301                .filter_map(|(expr, _)| self.extract_column_name(expr))
302                .collect();
303
304            if order_columns.len() != order_by.len() {
305                return None;
306            }
307
308            // Check if columns match
309            if !index_info
310                .columns
311                .iter()
312                .zip(order_columns.iter())
313                .all(|(a, b)| a == b)
314            {
315                return None;
316            }
317
318            // Check if the sort order matches (natural or reversed)
319            let is_reverse = self.check_order_match(order_by, &index_info.columns)?;
320
321            // Return IndexScan with correct reverse flag
322            return Some(PhysicalPlan::IndexScan {
323                table: table.clone(),
324                index: index.clone(),
325                range_start: range_start.clone(),
326                range_end: range_end.clone(),
327                include_start: *include_start,
328                include_end: *include_end,
329                limit: *limit,
330                offset: *offset,
331                reverse: is_reverse,
332            });
333        }
334
335        None
336    }
337
338    /// Finds a TableScan in the plan (only if it's directly accessible).
339    fn find_table_scan(&self, plan: &PhysicalPlan) -> Option<String> {
340        match plan {
341            PhysicalPlan::TableScan { table } => Some(table.clone()),
342            // Can look through Filter and Project nodes
343            PhysicalPlan::Filter { input, .. } | PhysicalPlan::Project { input, .. } => {
344                self.find_table_scan(input)
345            }
346            // Stop at nodes that would disrupt ordering
347            _ => None,
348        }
349    }
350
351    /// Extracts the column name from a column expression.
352    fn extract_column_name(&self, expr: &Expr) -> Option<String> {
353        match expr {
354            Expr::Column(col_ref) => Some(col_ref.column.clone()),
355            _ => None,
356        }
357    }
358
359    /// Checks if the order_by matches the index columns (natural or reversed).
360    /// Returns Some(true) for reversed, Some(false) for natural, None for no match.
361    fn check_order_match(
362        &self,
363        order_by: &[(Expr, SortOrder)],
364        index_columns: &[String],
365    ) -> Option<bool> {
366        if order_by.len() > index_columns.len() {
367            return None;
368        }
369
370        // Check if all columns match
371        for (i, (expr, _)) in order_by.iter().enumerate() {
372            let col_name = self.extract_column_name(expr)?;
373            if col_name != index_columns[i] {
374                return None;
375            }
376        }
377
378        // Check if all orders are the same (all ASC or all DESC)
379        let first_order = order_by.first().map(|(_, o)| o)?;
380        let all_same = order_by.iter().all(|(_, o)| o == first_order);
381
382        if !all_same {
383            return None; // Mixed orders not supported
384        }
385
386        // ASC = natural order, DESC = reversed
387        Some(*first_order == SortOrder::Desc)
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394    use crate::ast::Expr;
395    use crate::context::{IndexInfo, TableStats};
396
397    fn create_test_context() -> ExecutionContext {
398        let mut ctx = ExecutionContext::new();
399
400        ctx.register_table(
401            "users",
402            TableStats {
403                row_count: 1000,
404                is_sorted: false,
405                indexes: alloc::vec![
406                    IndexInfo::new("idx_id", alloc::vec!["id".into()], true),
407                    IndexInfo::new("idx_name", alloc::vec!["name".into()], false),
408                ],
409            },
410        );
411
412        ctx
413    }
414
415    #[test]
416    fn test_sort_to_index_scan() {
417        let ctx = create_test_context();
418        let pass = OrderByIndexPass::new(&ctx);
419
420        // Create: Sort(id ASC) -> TableScan(users)
421        let plan = PhysicalPlan::Sort {
422            input: Box::new(PhysicalPlan::table_scan("users")),
423            order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
424        };
425
426        let result = pass.optimize(plan);
427
428        // Should become IndexScan
429        assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
430        if let PhysicalPlan::IndexScan { table, index, .. } = result {
431            assert_eq!(table, "users");
432            assert_eq!(index, "idx_id");
433        }
434    }
435
436    #[test]
437    fn test_sort_no_matching_index() {
438        let ctx = create_test_context();
439        let pass = OrderByIndexPass::new(&ctx);
440
441        // Create: Sort(email ASC) -> TableScan(users)
442        // No index on 'email'
443        let plan = PhysicalPlan::Sort {
444            input: Box::new(PhysicalPlan::table_scan("users")),
445            order_by: alloc::vec![(Expr::column("users", "email", 2), SortOrder::Asc)],
446        };
447
448        let result = pass.optimize(plan);
449
450        // Should remain as Sort
451        assert!(matches!(result, PhysicalPlan::Sort { .. }));
452    }
453
454    #[test]
455    fn test_sort_with_filter() {
456        let ctx = create_test_context();
457        let pass = OrderByIndexPass::new(&ctx);
458
459        // Create: Sort(id ASC) -> Filter -> TableScan(users)
460        let plan = PhysicalPlan::Sort {
461            input: Box::new(PhysicalPlan::Filter {
462                input: Box::new(PhysicalPlan::table_scan("users")),
463                predicate: Expr::gt(Expr::column("users", "age", 1), Expr::literal(18i64)),
464            }),
465            order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
466        };
467
468        let result = pass.optimize(plan);
469
470        // Should become IndexScan (Filter is transparent for this optimization)
471        assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
472    }
473
474    #[test]
475    fn test_sort_after_join_not_optimized() {
476        let ctx = create_test_context();
477        let pass = OrderByIndexPass::new(&ctx);
478
479        // Create: Sort -> HashJoin -> (TableScan, TableScan)
480        // Joins disrupt ordering, so this shouldn't be optimized
481        let plan = PhysicalPlan::Sort {
482            input: Box::new(PhysicalPlan::HashJoin {
483                left: Box::new(PhysicalPlan::table_scan("users")),
484                right: Box::new(PhysicalPlan::table_scan("orders")),
485                condition: Expr::eq(
486                    Expr::column("users", "id", 0),
487                    Expr::column("orders", "user_id", 0),
488                ),
489                join_type: crate::ast::JoinType::Inner,
490            }),
491            order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
492        };
493
494        let result = pass.optimize(plan);
495
496        // Should remain as Sort (join disrupts ordering)
497        assert!(matches!(result, PhysicalPlan::Sort { .. }));
498    }
499
500    #[test]
501    fn test_check_order_match() {
502        let ctx = create_test_context();
503        let pass = OrderByIndexPass::new(&ctx);
504
505        let index_columns = alloc::vec!["id".into(), "name".into()];
506
507        // Natural order (ASC)
508        let order_by = alloc::vec![(Expr::column("t", "id", 0), SortOrder::Asc)];
509        assert_eq!(pass.check_order_match(&order_by, &index_columns), Some(false));
510
511        // Reversed order (DESC)
512        let order_by = alloc::vec![(Expr::column("t", "id", 0), SortOrder::Desc)];
513        assert_eq!(pass.check_order_match(&order_by, &index_columns), Some(true));
514
515        // Column mismatch
516        let order_by = alloc::vec![(Expr::column("t", "email", 0), SortOrder::Asc)];
517        assert_eq!(pass.check_order_match(&order_by, &index_columns), None);
518    }
519
520    #[test]
521    fn test_sort_desc_to_index_scan_reverse() {
522        let ctx = create_test_context();
523        let pass = OrderByIndexPass::new(&ctx);
524
525        // Create: Sort(id DESC) -> TableScan(users)
526        let plan = PhysicalPlan::Sort {
527            input: Box::new(PhysicalPlan::table_scan("users")),
528            order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Desc)],
529        };
530
531        let result = pass.optimize(plan);
532
533        // Should become IndexScan with reverse=true
534        assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
535        if let PhysicalPlan::IndexScan { table, index, reverse, .. } = result {
536            assert_eq!(table, "users");
537            assert_eq!(index, "idx_id");
538            assert!(reverse, "IndexScan should have reverse=true for DESC ordering");
539        }
540    }
541
542    #[test]
543    fn test_topn_desc_to_index_scan_reverse() {
544        let ctx = create_test_context();
545        let pass = OrderByIndexPass::new(&ctx);
546
547        // Create: TopN(id DESC, limit=10, offset=5) -> TableScan(users)
548        let plan = PhysicalPlan::TopN {
549            input: Box::new(PhysicalPlan::table_scan("users")),
550            order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Desc)],
551            limit: 10,
552            offset: 5,
553        };
554
555        let result = pass.optimize(plan);
556
557        // Should become IndexScan with reverse=true, limit=10, offset=5
558        assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
559        if let PhysicalPlan::IndexScan { table, index, reverse, limit, offset, .. } = result {
560            assert_eq!(table, "users");
561            assert_eq!(index, "idx_id");
562            assert!(reverse, "IndexScan should have reverse=true for DESC ordering");
563            assert_eq!(limit, Some(10));
564            assert_eq!(offset, Some(5));
565        }
566    }
567
568    #[test]
569    fn test_sort_asc_to_index_scan_forward() {
570        let ctx = create_test_context();
571        let pass = OrderByIndexPass::new(&ctx);
572
573        // Create: Sort(id ASC) -> TableScan(users)
574        let plan = PhysicalPlan::Sort {
575            input: Box::new(PhysicalPlan::table_scan("users")),
576            order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
577        };
578
579        let result = pass.optimize(plan);
580
581        // Should become IndexScan with reverse=false
582        assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
583        if let PhysicalPlan::IndexScan { reverse, .. } = result {
584            assert!(!reverse, "IndexScan should have reverse=false for ASC ordering");
585        }
586    }
587}