1use crate::ast::{JoinQuery, JoinType, QueryExpr};
14use crate::sql_lowering::{effective_table_filter, effective_vector_filter};
15
16pub trait OptimizationPass: Send + Sync {
18 fn name(&self) -> &str;
20
21 fn apply(&self, query: QueryExpr) -> QueryExpr;
23
24 fn benefit(&self) -> u32;
26}
27
28pub struct QueryOptimizer {
30 passes: Vec<Box<dyn OptimizationPass>>,
32 cost_based: bool,
34}
35
36impl QueryOptimizer {
37 pub fn new() -> Self {
39 let passes: Vec<Box<dyn OptimizationPass>> = vec![
40 Box::new(PredicatePushdownPass),
41 Box::new(ProjectionPushdownPass),
42 Box::new(JoinReorderingPass),
43 Box::new(IndexSelectionPass),
44 Box::new(LimitPushdownPass),
45 ];
46
47 Self {
48 passes,
49 cost_based: true,
50 }
51 }
52
53 pub fn add_pass(&mut self, pass: Box<dyn OptimizationPass>) {
55 self.passes.push(pass);
56 self.passes.sort_by_key(|b| std::cmp::Reverse(b.benefit()));
58 }
59
60 pub fn optimize(&self, query: QueryExpr) -> (QueryExpr, Vec<String>) {
62 let mut optimized = query;
63 let mut applied_passes = Vec::new();
64
65 for pass in &self.passes {
66 let before = format!("{:?}", optimized);
67 optimized = pass.apply(optimized);
68 let after = format!("{:?}", optimized);
69
70 if before != after {
71 applied_passes.push(pass.name().to_string());
72 }
73 }
74
75 (optimized, applied_passes)
76 }
77
78 pub fn optimize_with_hints(&self, query: QueryExpr, hints: &OptimizationHints) -> QueryExpr {
80 let mut optimized = query;
81
82 for pass in &self.passes {
83 if hints.disabled_passes.contains(&pass.name().to_string()) {
85 continue;
86 }
87
88 optimized = pass.apply(optimized);
89 }
90
91 optimized
92 }
93}
94
95impl Default for QueryOptimizer {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101#[derive(Debug, Clone, Default)]
103pub struct OptimizationHints {
104 pub disabled_passes: Vec<String>,
106 pub join_order: Option<Vec<String>>,
108 pub force_index: Option<String>,
110 pub no_parallel: bool,
112}
113
114struct PredicatePushdownPass;
120
121impl OptimizationPass for PredicatePushdownPass {
122 fn name(&self) -> &str {
123 "PredicatePushdown"
124 }
125
126 fn apply(&self, query: QueryExpr) -> QueryExpr {
127 match query {
128 QueryExpr::Join(jq) => self.optimize_join(jq),
129 other => other,
130 }
131 }
132
133 fn benefit(&self) -> u32 {
134 100 }
136}
137
138impl PredicatePushdownPass {
139 fn optimize_join(&self, query: JoinQuery) -> QueryExpr {
140 let left = self.apply(*query.left);
145 let right = self.apply(*query.right);
146
147 QueryExpr::Join(JoinQuery {
148 left: Box::new(left),
149 right: Box::new(right),
150 ..query
151 })
152 }
153}
154
155struct ProjectionPushdownPass;
157
158impl OptimizationPass for ProjectionPushdownPass {
159 fn name(&self) -> &str {
160 "ProjectionPushdown"
161 }
162
163 fn apply(&self, query: QueryExpr) -> QueryExpr {
164 match query {
165 QueryExpr::Join(jq) => {
166 let left = self.apply(*jq.left);
168 let right = self.apply(*jq.right);
169
170 QueryExpr::Join(JoinQuery {
171 left: Box::new(left),
172 right: Box::new(right),
173 ..jq
174 })
175 }
176 QueryExpr::Table(tq) => {
177 QueryExpr::Table(tq)
180 }
181 other => other,
182 }
183 }
184
185 fn benefit(&self) -> u32 {
186 80 }
188}
189
190struct JoinReorderingPass;
192
193impl OptimizationPass for JoinReorderingPass {
194 fn name(&self) -> &str {
195 "JoinReordering"
196 }
197
198 fn apply(&self, query: QueryExpr) -> QueryExpr {
199 match query {
200 QueryExpr::Join(jq) => {
201 self.optimize_join_order(jq)
204 }
205 other => other,
206 }
207 }
208
209 fn benefit(&self) -> u32 {
210 90 }
212}
213
214impl JoinReorderingPass {
215 fn optimize_join_order(&self, query: JoinQuery) -> QueryExpr {
216 let left_size = Self::estimate_size(&query.left);
218 let right_size = Self::estimate_size(&query.right);
219
220 if left_size > right_size && query.join_type == JoinType::Inner {
222 let JoinQuery {
224 left,
225 right,
226 join_type,
227 on,
228 filter,
229 order_by,
230 limit,
231 offset,
232 return_items,
233 return_,
234 } = query;
235 QueryExpr::Join(JoinQuery {
236 left: right,
237 right: left,
238 join_type,
239 on: swap_condition(on),
240 filter,
241 order_by,
242 limit,
243 offset,
244 return_items,
245 return_,
246 })
247 } else {
248 QueryExpr::Join(query)
249 }
250 }
251
252 fn estimate_size(query: &QueryExpr) -> f64 {
253 match query {
254 QueryExpr::Table(tq) => {
255 let base = 1000.0;
256 if effective_table_filter(tq).is_some() {
257 base * 0.1
258 } else if tq.limit.is_some() {
259 tq.limit.unwrap() as f64
260 } else {
261 base
262 }
263 }
264 QueryExpr::Graph(_) => 100.0,
265 QueryExpr::Join(jq) => {
266 Self::estimate_size(&jq.left) * Self::estimate_size(&jq.right) * 0.1
267 }
268 QueryExpr::Path(_) => 10.0,
269 QueryExpr::Vector(vq) => {
270 if effective_vector_filter(vq).is_some() {
272 (vq.k as f64).min(100.0)
273 } else {
274 vq.k as f64
275 }
276 }
277 QueryExpr::Hybrid(hq) => {
278 let structured_size = Self::estimate_size(&hq.structured);
280 let vector_size = hq.vector.k as f64;
281 let base = structured_size.min(vector_size);
283 hq.limit.map(|l| base.min(l as f64)).unwrap_or(base)
284 }
285 QueryExpr::Insert(_)
287 | QueryExpr::Update(_)
288 | QueryExpr::Delete(_)
289 | QueryExpr::CreateTable(_)
290 | QueryExpr::CreateCollection(_)
291 | QueryExpr::CreateVector(_)
292 | QueryExpr::DropTable(_)
293 | QueryExpr::DropGraph(_)
294 | QueryExpr::DropVector(_)
295 | QueryExpr::DropDocument(_)
296 | QueryExpr::DropKv(_)
297 | QueryExpr::DropCollection(_)
298 | QueryExpr::Truncate(_)
299 | QueryExpr::AlterTable(_)
300 | QueryExpr::GraphCommand(_)
301 | QueryExpr::SearchCommand(_)
302 | QueryExpr::CreateIndex(_)
303 | QueryExpr::DropIndex(_)
304 | QueryExpr::ProbabilisticCommand(_)
305 | QueryExpr::Ask(_)
306 | QueryExpr::SetConfig { .. }
307 | QueryExpr::ShowConfig { .. }
308 | QueryExpr::SetSecret { .. }
309 | QueryExpr::DeleteSecret { .. }
310 | QueryExpr::ShowSecrets { .. }
311 | QueryExpr::SetTenant(_)
312 | QueryExpr::ShowTenant
313 | QueryExpr::CreateTimeSeries(_)
314 | QueryExpr::CreateMetric(_)
315 | QueryExpr::AlterMetric(_)
316 | QueryExpr::CreateSlo(_)
317 | QueryExpr::DropTimeSeries(_)
318 | QueryExpr::CreateQueue(_)
319 | QueryExpr::AlterQueue(_)
320 | QueryExpr::DropQueue(_)
321 | QueryExpr::QueueSelect(_)
322 | QueryExpr::QueueCommand(_)
323 | QueryExpr::KvCommand(_)
324 | QueryExpr::ConfigCommand(_)
325 | QueryExpr::CreateTree(_)
326 | QueryExpr::DropTree(_)
327 | QueryExpr::TreeCommand(_)
328 | QueryExpr::ExplainAlter(_)
329 | QueryExpr::TransactionControl(_)
330 | QueryExpr::MaintenanceCommand(_)
331 | QueryExpr::CreateSchema(_)
332 | QueryExpr::DropSchema(_)
333 | QueryExpr::CreateSequence(_)
334 | QueryExpr::DropSequence(_)
335 | QueryExpr::CopyFrom(_)
336 | QueryExpr::CreateView(_)
337 | QueryExpr::DropView(_)
338 | QueryExpr::RefreshMaterializedView(_)
339 | QueryExpr::CreatePolicy(_)
340 | QueryExpr::DropPolicy(_)
341 | QueryExpr::CreateServer(_)
342 | QueryExpr::DropServer(_)
343 | QueryExpr::CreateForeignTable(_)
344 | QueryExpr::DropForeignTable(_)
345 | QueryExpr::Grant(_)
346 | QueryExpr::Revoke(_)
347 | QueryExpr::AlterUser(_)
348 | QueryExpr::CreateUser(_)
349 | QueryExpr::CreateIamPolicy { .. }
350 | QueryExpr::DropIamPolicy { .. }
351 | QueryExpr::AttachPolicy { .. }
352 | QueryExpr::DetachPolicy { .. }
353 | QueryExpr::ShowPolicies { .. }
354 | QueryExpr::ShowEffectivePermissions { .. }
355 | QueryExpr::RankOf(_)
356 | QueryExpr::ApproxRankOf(_)
357 | QueryExpr::RankRange(_)
358 | QueryExpr::SimulatePolicy { .. }
359 | QueryExpr::LintPolicy { .. }
360 | QueryExpr::MigratePolicyMode { .. }
361 | QueryExpr::CreateMigration(_)
362 | QueryExpr::ApplyMigration(_)
363 | QueryExpr::RollbackMigration(_)
364 | QueryExpr::ExplainMigration(_)
365 | QueryExpr::EventsBackfill(_)
366 | QueryExpr::EventsBackfillStatus { .. } => 1.0,
367 }
368 }
369}
370
371struct IndexSelectionPass;
384
385impl OptimizationPass for IndexSelectionPass {
386 fn name(&self) -> &str {
387 "IndexSelection"
388 }
389
390 fn apply(&self, query: QueryExpr) -> QueryExpr {
391 match query {
392 QueryExpr::Table(mut tq) => {
393 if let Some(filter) = effective_table_filter(&tq).as_ref() {
394 if let Some(hint) = Self::analyze_filter(filter) {
395 let expand = tq.expand.get_or_insert_with(Default::default);
397 expand.index_hint = Some(hint);
398 }
399 }
400 QueryExpr::Table(tq)
401 }
402 other => other,
403 }
404 }
405
406 fn benefit(&self) -> u32 {
407 70
408 }
409}
410
411impl IndexSelectionPass {
412 fn analyze_filter(filter: &crate::ast::Filter) -> Option<IndexHint> {
414 match filter {
415 crate::ast::Filter::Compare { field, op, .. } if *op == crate::ast::CompareOp::Eq => {
417 let col = Self::field_name(field);
418 Some(IndexHint {
419 method: IndexHintMethod::Hash,
420 column: col,
421 })
422 }
423 crate::ast::Filter::Compare {
425 field,
426 op:
427 crate::ast::CompareOp::Lt
428 | crate::ast::CompareOp::Le
429 | crate::ast::CompareOp::Gt
430 | crate::ast::CompareOp::Ge,
431 ..
432 } => {
433 let col = Self::field_name(field);
434 Some(IndexHint {
435 method: IndexHintMethod::BTree,
436 column: col,
437 })
438 }
439 crate::ast::Filter::Between { field, .. } => {
441 let col = Self::field_name(field);
442 Some(IndexHint {
443 method: IndexHintMethod::BTree,
444 column: col,
445 })
446 }
447 crate::ast::Filter::In { field, values } if values.len() <= 10 => {
449 let col = Self::field_name(field);
450 Some(IndexHint {
451 method: IndexHintMethod::Bitmap,
452 column: col,
453 })
454 }
455 crate::ast::Filter::And(left, right) => {
457 Self::analyze_filter(left).or_else(|| Self::analyze_filter(right))
458 }
459 _ => None,
460 }
461 }
462
463 fn field_name(field: &crate::ast::FieldRef) -> String {
464 match field {
465 crate::ast::FieldRef::TableColumn { column, .. } => column.clone(),
466 crate::ast::FieldRef::NodeProperty { property, .. } => property.clone(),
467 crate::ast::FieldRef::EdgeProperty { property, .. } => property.clone(),
468 crate::ast::FieldRef::NodeId { alias } => {
469 format!("{}.id", alias)
470 }
471 }
472 }
473}
474
475pub use reddb_types::index_hint::{IndexHint, IndexHintMethod};
480
481struct LimitPushdownPass;
483
484impl OptimizationPass for LimitPushdownPass {
485 fn name(&self) -> &str {
486 "LimitPushdown"
487 }
488
489 fn apply(&self, query: QueryExpr) -> QueryExpr {
490 match query {
491 QueryExpr::Join(jq) => {
492 let left = self.apply(*jq.left);
494 let right = self.apply(*jq.right);
495
496 QueryExpr::Join(JoinQuery {
497 left: Box::new(left),
498 right: Box::new(right),
499 ..jq
500 })
501 }
502 other => other,
503 }
504 }
505
506 fn benefit(&self) -> u32 {
507 60
508 }
509}
510
511fn swap_condition(condition: crate::ast::JoinCondition) -> crate::ast::JoinCondition {
516 crate::ast::JoinCondition {
517 left_field: condition.right_field,
518 right_field: condition.left_field,
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525 use crate::ast::{
526 CompareOp, DistanceMetric, FieldRef, Filter, FusionStrategy, JoinCondition, Projection,
527 TableQuery,
528 };
529 use reddb_types::Value;
530
531 fn make_table_query(name: &str) -> QueryExpr {
532 QueryExpr::Table(TableQuery {
533 table: name.to_string(),
534 source: None,
535 alias: Some(name.to_string()),
536 select_items: Vec::new(),
537 columns: vec![Projection::All],
538 where_expr: None,
539 filter: None,
540 group_by_exprs: Vec::new(),
541 group_by: Vec::new(),
542 having_expr: None,
543 having: None,
544 order_by: vec![],
545 limit: None,
546 limit_param: None,
547 offset: None,
548 offset_param: None,
549 expand: None,
550 as_of: None,
551 sessionize: None,
552 distinct: false,
553 })
554 }
555
556 #[test]
557 fn test_optimizer_applies_passes() {
558 let optimizer = QueryOptimizer::new();
559 let query = make_table_query("hosts");
560
561 let (optimized, passes) = optimizer.optimize(query);
562 assert!(matches!(optimized, QueryExpr::Table(_)));
564 }
565
566 #[test]
567 fn test_join_reordering() {
568 let optimizer = QueryOptimizer::new();
569
570 let small = QueryExpr::Table(TableQuery {
571 table: "small".to_string(),
572 source: None,
573 alias: None,
574 select_items: Vec::new(),
575 columns: vec![Projection::All],
576 where_expr: None,
577 filter: None,
578 group_by_exprs: Vec::new(),
579 group_by: Vec::new(),
580 having_expr: None,
581 having: None,
582 order_by: vec![],
583 limit: Some(10), limit_param: None,
585 offset: None,
586 offset_param: None,
587 expand: None,
588 as_of: None,
589 sessionize: None,
590 distinct: false,
591 });
592
593 let large = QueryExpr::Table(TableQuery {
594 table: "large".to_string(),
595 source: None,
596 alias: None,
597 select_items: Vec::new(),
598 columns: vec![Projection::All],
599 where_expr: None,
600 filter: None,
601 group_by_exprs: Vec::new(),
602 group_by: Vec::new(),
603 having_expr: None,
604 having: None,
605 order_by: vec![],
606 limit: None, limit_param: None,
608 offset: None,
609 offset_param: None,
610 expand: None,
611 as_of: None,
612 sessionize: None,
613 distinct: false,
614 });
615
616 let join = QueryExpr::Join(JoinQuery {
617 left: Box::new(large.clone()),
618 right: Box::new(small.clone()),
619 join_type: JoinType::Inner,
620 on: JoinCondition {
621 left_field: FieldRef::TableColumn {
622 table: "large".to_string(),
623 column: "id".to_string(),
624 },
625 right_field: FieldRef::TableColumn {
626 table: "small".to_string(),
627 column: "id".to_string(),
628 },
629 },
630 filter: None,
631 order_by: Vec::new(),
632 limit: None,
633 offset: None,
634 return_items: Vec::new(),
635 return_: Vec::new(),
636 });
637
638 let (optimized, passes) = optimizer.optimize(join);
639
640 assert!(passes.iter().any(|pass| pass == "JoinReordering"));
642 if let QueryExpr::Join(jq) = optimized {
643 if let QueryExpr::Table(left) = jq.left.as_ref() {
645 assert_eq!(left.table, "small");
646 }
647 assert!(matches!(
648 &jq.on.left_field,
649 FieldRef::TableColumn { table, column } if table == "small" && column == "id"
650 ));
651 }
652 }
653
654 #[test]
655 fn optimize_with_hints_can_disable_join_reordering() {
656 let optimizer = QueryOptimizer::new();
657
658 let large = make_table_query("large");
659 let mut small_table = TableQuery::new("small");
660 small_table.limit = Some(1);
661 let small = QueryExpr::Table(small_table);
662
663 let join = QueryExpr::Join(JoinQuery {
664 left: Box::new(large),
665 right: Box::new(small),
666 join_type: JoinType::Inner,
667 on: JoinCondition {
668 left_field: FieldRef::TableColumn {
669 table: "large".to_string(),
670 column: "id".to_string(),
671 },
672 right_field: FieldRef::TableColumn {
673 table: "small".to_string(),
674 column: "id".to_string(),
675 },
676 },
677 filter: None,
678 order_by: Vec::new(),
679 limit: None,
680 offset: None,
681 return_items: Vec::new(),
682 return_: Vec::new(),
683 });
684 let hints = OptimizationHints {
685 disabled_passes: vec!["JoinReordering".to_string()],
686 ..OptimizationHints::default()
687 };
688
689 let optimized = optimizer.optimize_with_hints(join, &hints);
690 let QueryExpr::Join(join) = optimized else {
691 panic!("expected join query");
692 };
693 let QueryExpr::Table(left) = join.left.as_ref() else {
694 panic!("expected table on left side");
695 };
696
697 assert_eq!(left.table, "large");
698 assert!(matches!(
699 &join.on.left_field,
700 FieldRef::TableColumn { table, column } if table == "large" && column == "id"
701 ));
702 }
703
704 #[test]
705 fn optimizer_sets_index_hint_on_table_filters() {
706 let optimizer = QueryOptimizer::new();
707 let mut table = TableQuery::new("hosts");
708 table.filter = Some(Filter::Compare {
709 field: FieldRef::column("", "host_id"),
710 op: CompareOp::Eq,
711 value: Value::Integer(7),
712 });
713
714 let (optimized, passes) = optimizer.optimize(QueryExpr::Table(table));
715 let QueryExpr::Table(table) = optimized else {
716 panic!("expected table query");
717 };
718 let hint = table
719 .expand
720 .and_then(|expand| expand.index_hint)
721 .expect("expected optimizer index hint");
722
723 assert!(passes.iter().any(|pass| pass == "IndexSelection"));
724 assert_eq!(hint.method, IndexHintMethod::Hash);
725 assert_eq!(hint.column, "host_id");
726 }
727
728 #[test]
729 fn index_selection_analyzes_supported_filter_shapes() {
730 let range = IndexSelectionPass::analyze_filter(&Filter::Between {
731 field: FieldRef::node_prop("n", "score"),
732 low: Value::Integer(1),
733 high: Value::Integer(9),
734 })
735 .expect("expected range hint");
736 assert_eq!(range.method, IndexHintMethod::BTree);
737 assert_eq!(range.column, "score");
738
739 let bitmap = IndexSelectionPass::analyze_filter(&Filter::In {
740 field: FieldRef::edge_prop("e", "kind"),
741 values: vec![Value::text("http"), Value::text("ssh")],
742 })
743 .expect("expected bitmap hint");
744 assert_eq!(bitmap.method, IndexHintMethod::Bitmap);
745 assert_eq!(bitmap.column, "kind");
746
747 assert!(IndexSelectionPass::analyze_filter(&Filter::In {
748 field: FieldRef::column("", "status"),
749 values: (0..11).map(Value::Integer).collect(),
750 })
751 .is_none());
752
753 let fallback_right = IndexSelectionPass::analyze_filter(&Filter::And(
754 Box::new(Filter::IsNull(FieldRef::column("", "deleted_at"))),
755 Box::new(Filter::Compare {
756 field: FieldRef::node_id("n"),
757 op: CompareOp::Eq,
758 value: Value::Integer(1),
759 }),
760 ))
761 .expect("expected right-side AND hint");
762 assert_eq!(fallback_right.method, IndexHintMethod::Hash);
763 assert_eq!(fallback_right.column, "n.id");
764 }
765}