1use crate::ast::{Expr, SortOrder};
20use crate::context::ExecutionContext;
21use crate::planner::PhysicalPlan;
22use alloc::boxed::Box;
23use alloc::string::String;
24use alloc::vec::Vec;
25
26pub struct OrderByIndexPass<'a> {
28 ctx: &'a ExecutionContext,
29}
30
31impl<'a> OrderByIndexPass<'a> {
32 pub fn new(ctx: &'a ExecutionContext) -> Self {
34 Self { ctx }
35 }
36
37 pub fn optimize(&self, plan: PhysicalPlan) -> PhysicalPlan {
39 self.traverse(plan)
40 }
41
42 fn traverse(&self, plan: PhysicalPlan) -> PhysicalPlan {
43 match plan {
44 PhysicalPlan::Sort { input, order_by } => {
45 let optimized_input = self.traverse(*input);
46
47 if let Some(optimized) =
49 self.try_optimize_table_scan(&optimized_input, &order_by)
50 {
51 return optimized;
52 }
53
54 if let Some(optimized) =
56 self.try_optimize_index_scan(optimized_input.clone(), &order_by)
57 {
58 return optimized;
59 }
60
61 PhysicalPlan::Sort {
63 input: Box::new(optimized_input),
64 order_by,
65 }
66 }
67
68 PhysicalPlan::Filter { input, predicate } => PhysicalPlan::Filter {
70 input: Box::new(self.traverse(*input)),
71 predicate,
72 },
73
74 PhysicalPlan::Project { input, columns } => PhysicalPlan::Project {
75 input: Box::new(self.traverse(*input)),
76 columns,
77 },
78
79 PhysicalPlan::HashJoin {
80 left,
81 right,
82 condition,
83 join_type,
84 } => PhysicalPlan::HashJoin {
85 left: Box::new(self.traverse(*left)),
86 right: Box::new(self.traverse(*right)),
87 condition,
88 join_type,
89 },
90
91 PhysicalPlan::SortMergeJoin {
92 left,
93 right,
94 condition,
95 join_type,
96 } => PhysicalPlan::SortMergeJoin {
97 left: Box::new(self.traverse(*left)),
98 right: Box::new(self.traverse(*right)),
99 condition,
100 join_type,
101 },
102
103 PhysicalPlan::NestedLoopJoin {
104 left,
105 right,
106 condition,
107 join_type,
108 } => PhysicalPlan::NestedLoopJoin {
109 left: Box::new(self.traverse(*left)),
110 right: Box::new(self.traverse(*right)),
111 condition,
112 join_type,
113 },
114
115 PhysicalPlan::IndexNestedLoopJoin {
116 outer,
117 inner_table,
118 inner_index,
119 condition,
120 join_type,
121 } => PhysicalPlan::IndexNestedLoopJoin {
122 outer: Box::new(self.traverse(*outer)),
123 inner_table,
124 inner_index,
125 condition,
126 join_type,
127 },
128
129 PhysicalPlan::HashAggregate {
130 input,
131 group_by,
132 aggregates,
133 } => PhysicalPlan::HashAggregate {
134 input: Box::new(self.traverse(*input)),
135 group_by,
136 aggregates,
137 },
138
139 PhysicalPlan::Limit {
140 input,
141 limit,
142 offset,
143 } => PhysicalPlan::Limit {
144 input: Box::new(self.traverse(*input)),
145 limit,
146 offset,
147 },
148
149 PhysicalPlan::CrossProduct { left, right } => PhysicalPlan::CrossProduct {
150 left: Box::new(self.traverse(*left)),
151 right: Box::new(self.traverse(*right)),
152 },
153
154 PhysicalPlan::NoOp { input } => PhysicalPlan::NoOp {
155 input: Box::new(self.traverse(*input)),
156 },
157
158 PhysicalPlan::TopN {
159 input,
160 order_by,
161 limit,
162 offset,
163 } => {
164 let optimized_input = self.traverse(*input);
165
166 if let Some(optimized) =
168 self.try_optimize_topn_table_scan(&optimized_input, &order_by, limit, offset)
169 {
170 return optimized;
171 }
172
173 PhysicalPlan::TopN {
175 input: Box::new(optimized_input),
176 order_by,
177 limit,
178 offset,
179 }
180 }
181
182 plan @ (PhysicalPlan::TableScan { .. }
184 | PhysicalPlan::IndexScan { .. }
185 | PhysicalPlan::IndexGet { .. }
186 | PhysicalPlan::IndexInGet { .. }
187 | PhysicalPlan::GinIndexScan { .. }
188 | PhysicalPlan::GinIndexScanMulti { .. }
189 | PhysicalPlan::Empty) => plan,
190 }
191 }
192
193 fn try_optimize_table_scan(
195 &self,
196 plan: &PhysicalPlan,
197 order_by: &[(Expr, SortOrder)],
198 ) -> Option<PhysicalPlan> {
199 let table_scan = self.find_table_scan(plan)?;
201
202 let order_columns: Vec<String> = order_by
204 .iter()
205 .filter_map(|(expr, _)| self.extract_column_name(expr))
206 .collect();
207
208 if order_columns.len() != order_by.len() {
209 return None; }
211
212 let column_refs: Vec<&str> = order_columns.iter().map(|s| s.as_str()).collect();
214 let index = self.ctx.find_index(&table_scan, &column_refs)?;
215
216 let is_reverse = self.check_order_match(order_by, &index.columns)?;
218
219 Some(PhysicalPlan::IndexScan {
221 table: table_scan,
222 index: index.name.clone(),
223 range_start: None,
224 range_end: None,
225 include_start: true,
226 include_end: true,
227 limit: None,
228 offset: None,
229 reverse: is_reverse,
230 })
231 }
232
233 fn try_optimize_topn_table_scan(
236 &self,
237 plan: &PhysicalPlan,
238 order_by: &[(Expr, SortOrder)],
239 limit: usize,
240 offset: usize,
241 ) -> Option<PhysicalPlan> {
242 let table_scan = self.find_table_scan(plan)?;
244
245 let order_columns: Vec<String> = order_by
247 .iter()
248 .filter_map(|(expr, _)| self.extract_column_name(expr))
249 .collect();
250
251 if order_columns.len() != order_by.len() {
252 return None; }
254
255 let column_refs: Vec<&str> = order_columns.iter().map(|s| s.as_str()).collect();
257 let index = self.ctx.find_index(&table_scan, &column_refs)?;
258
259 let is_reverse = self.check_order_match(order_by, &index.columns)?;
261
262 Some(PhysicalPlan::IndexScan {
264 table: table_scan,
265 index: index.name.clone(),
266 range_start: None,
267 range_end: None,
268 include_start: true,
269 include_end: true,
270 limit: Some(limit),
271 offset: Some(offset),
272 reverse: is_reverse,
273 })
274 }
275
276 fn try_optimize_index_scan(
278 &self,
279 plan: PhysicalPlan,
280 order_by: &[(Expr, SortOrder)],
281 ) -> Option<PhysicalPlan> {
282 if let PhysicalPlan::IndexScan {
284 table,
285 index,
286 range_start,
287 range_end,
288 include_start,
289 include_end,
290 limit,
291 offset,
292 ..
293 } = &plan
294 {
295 let index_info = self.ctx.find_index(table, &[])?;
297
298 let order_columns: Vec<String> = order_by
300 .iter()
301 .filter_map(|(expr, _)| self.extract_column_name(expr))
302 .collect();
303
304 if order_columns.len() != order_by.len() {
305 return None;
306 }
307
308 if !index_info
310 .columns
311 .iter()
312 .zip(order_columns.iter())
313 .all(|(a, b)| a == b)
314 {
315 return None;
316 }
317
318 let is_reverse = self.check_order_match(order_by, &index_info.columns)?;
320
321 return Some(PhysicalPlan::IndexScan {
323 table: table.clone(),
324 index: index.clone(),
325 range_start: range_start.clone(),
326 range_end: range_end.clone(),
327 include_start: *include_start,
328 include_end: *include_end,
329 limit: *limit,
330 offset: *offset,
331 reverse: is_reverse,
332 });
333 }
334
335 None
336 }
337
338 fn find_table_scan(&self, plan: &PhysicalPlan) -> Option<String> {
340 match plan {
341 PhysicalPlan::TableScan { table } => Some(table.clone()),
342 PhysicalPlan::Filter { input, .. } | PhysicalPlan::Project { input, .. } => {
344 self.find_table_scan(input)
345 }
346 _ => None,
348 }
349 }
350
351 fn extract_column_name(&self, expr: &Expr) -> Option<String> {
353 match expr {
354 Expr::Column(col_ref) => Some(col_ref.column.clone()),
355 _ => None,
356 }
357 }
358
359 fn check_order_match(
362 &self,
363 order_by: &[(Expr, SortOrder)],
364 index_columns: &[String],
365 ) -> Option<bool> {
366 if order_by.len() > index_columns.len() {
367 return None;
368 }
369
370 for (i, (expr, _)) in order_by.iter().enumerate() {
372 let col_name = self.extract_column_name(expr)?;
373 if col_name != index_columns[i] {
374 return None;
375 }
376 }
377
378 let first_order = order_by.first().map(|(_, o)| o)?;
380 let all_same = order_by.iter().all(|(_, o)| o == first_order);
381
382 if !all_same {
383 return None; }
385
386 Some(*first_order == SortOrder::Desc)
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394 use crate::ast::Expr;
395 use crate::context::{IndexInfo, TableStats};
396
397 fn create_test_context() -> ExecutionContext {
398 let mut ctx = ExecutionContext::new();
399
400 ctx.register_table(
401 "users",
402 TableStats {
403 row_count: 1000,
404 is_sorted: false,
405 indexes: alloc::vec![
406 IndexInfo::new("idx_id", alloc::vec!["id".into()], true),
407 IndexInfo::new("idx_name", alloc::vec!["name".into()], false),
408 ],
409 },
410 );
411
412 ctx
413 }
414
415 #[test]
416 fn test_sort_to_index_scan() {
417 let ctx = create_test_context();
418 let pass = OrderByIndexPass::new(&ctx);
419
420 let plan = PhysicalPlan::Sort {
422 input: Box::new(PhysicalPlan::table_scan("users")),
423 order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
424 };
425
426 let result = pass.optimize(plan);
427
428 assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
430 if let PhysicalPlan::IndexScan { table, index, .. } = result {
431 assert_eq!(table, "users");
432 assert_eq!(index, "idx_id");
433 }
434 }
435
436 #[test]
437 fn test_sort_no_matching_index() {
438 let ctx = create_test_context();
439 let pass = OrderByIndexPass::new(&ctx);
440
441 let plan = PhysicalPlan::Sort {
444 input: Box::new(PhysicalPlan::table_scan("users")),
445 order_by: alloc::vec![(Expr::column("users", "email", 2), SortOrder::Asc)],
446 };
447
448 let result = pass.optimize(plan);
449
450 assert!(matches!(result, PhysicalPlan::Sort { .. }));
452 }
453
454 #[test]
455 fn test_sort_with_filter() {
456 let ctx = create_test_context();
457 let pass = OrderByIndexPass::new(&ctx);
458
459 let plan = PhysicalPlan::Sort {
461 input: Box::new(PhysicalPlan::Filter {
462 input: Box::new(PhysicalPlan::table_scan("users")),
463 predicate: Expr::gt(Expr::column("users", "age", 1), Expr::literal(18i64)),
464 }),
465 order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
466 };
467
468 let result = pass.optimize(plan);
469
470 assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
472 }
473
474 #[test]
475 fn test_sort_after_join_not_optimized() {
476 let ctx = create_test_context();
477 let pass = OrderByIndexPass::new(&ctx);
478
479 let plan = PhysicalPlan::Sort {
482 input: Box::new(PhysicalPlan::HashJoin {
483 left: Box::new(PhysicalPlan::table_scan("users")),
484 right: Box::new(PhysicalPlan::table_scan("orders")),
485 condition: Expr::eq(
486 Expr::column("users", "id", 0),
487 Expr::column("orders", "user_id", 0),
488 ),
489 join_type: crate::ast::JoinType::Inner,
490 }),
491 order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
492 };
493
494 let result = pass.optimize(plan);
495
496 assert!(matches!(result, PhysicalPlan::Sort { .. }));
498 }
499
500 #[test]
501 fn test_check_order_match() {
502 let ctx = create_test_context();
503 let pass = OrderByIndexPass::new(&ctx);
504
505 let index_columns = alloc::vec!["id".into(), "name".into()];
506
507 let order_by = alloc::vec![(Expr::column("t", "id", 0), SortOrder::Asc)];
509 assert_eq!(pass.check_order_match(&order_by, &index_columns), Some(false));
510
511 let order_by = alloc::vec![(Expr::column("t", "id", 0), SortOrder::Desc)];
513 assert_eq!(pass.check_order_match(&order_by, &index_columns), Some(true));
514
515 let order_by = alloc::vec![(Expr::column("t", "email", 0), SortOrder::Asc)];
517 assert_eq!(pass.check_order_match(&order_by, &index_columns), None);
518 }
519
520 #[test]
521 fn test_sort_desc_to_index_scan_reverse() {
522 let ctx = create_test_context();
523 let pass = OrderByIndexPass::new(&ctx);
524
525 let plan = PhysicalPlan::Sort {
527 input: Box::new(PhysicalPlan::table_scan("users")),
528 order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Desc)],
529 };
530
531 let result = pass.optimize(plan);
532
533 assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
535 if let PhysicalPlan::IndexScan { table, index, reverse, .. } = result {
536 assert_eq!(table, "users");
537 assert_eq!(index, "idx_id");
538 assert!(reverse, "IndexScan should have reverse=true for DESC ordering");
539 }
540 }
541
542 #[test]
543 fn test_topn_desc_to_index_scan_reverse() {
544 let ctx = create_test_context();
545 let pass = OrderByIndexPass::new(&ctx);
546
547 let plan = PhysicalPlan::TopN {
549 input: Box::new(PhysicalPlan::table_scan("users")),
550 order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Desc)],
551 limit: 10,
552 offset: 5,
553 };
554
555 let result = pass.optimize(plan);
556
557 assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
559 if let PhysicalPlan::IndexScan { table, index, reverse, limit, offset, .. } = result {
560 assert_eq!(table, "users");
561 assert_eq!(index, "idx_id");
562 assert!(reverse, "IndexScan should have reverse=true for DESC ordering");
563 assert_eq!(limit, Some(10));
564 assert_eq!(offset, Some(5));
565 }
566 }
567
568 #[test]
569 fn test_sort_asc_to_index_scan_forward() {
570 let ctx = create_test_context();
571 let pass = OrderByIndexPass::new(&ctx);
572
573 let plan = PhysicalPlan::Sort {
575 input: Box::new(PhysicalPlan::table_scan("users")),
576 order_by: alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
577 };
578
579 let result = pass.optimize(plan);
580
581 assert!(matches!(result, PhysicalPlan::IndexScan { .. }));
583 if let PhysicalPlan::IndexScan { reverse, .. } = result {
584 assert!(!reverse, "IndexScan should have reverse=false for ASC ordering");
585 }
586 }
587}