Skip to main content

reddb_server/storage/query/planner/
optimizer.rs

1//! Query Optimizer
2//!
3//! Multi-pass query optimization with pluggable strategies.
4//!
5//! # Optimization Passes
6//!
7//! 1. **PredicatePushdown**: Move filters to data sources
8//! 2. **JoinReordering**: Optimal join order via IDP algorithm
9//! 3. **IndexSelection**: Choose best indexes for scans
10//! 4. **ProjectionPushdown**: Eliminate unused columns early
11//! 5. **ExpressionSimplification**: Simplify complex expressions
12
13use crate::storage::query::ast::{JoinQuery, JoinType, QueryExpr};
14use crate::storage::query::sql_lowering::{effective_table_filter, effective_vector_filter};
15
16/// An optimization pass that transforms query expressions
17pub trait OptimizationPass: Send + Sync {
18    /// Pass name for debugging
19    fn name(&self) -> &str;
20
21    /// Apply the optimization pass
22    fn apply(&self, query: QueryExpr) -> QueryExpr;
23
24    /// Estimated benefit (higher = more important)
25    fn benefit(&self) -> u32;
26}
27
28/// Query optimizer with multiple passes
29pub struct QueryOptimizer {
30    /// Ordered optimization passes
31    passes: Vec<Box<dyn OptimizationPass>>,
32    /// Enable cost-based optimization
33    cost_based: bool,
34}
35
36impl QueryOptimizer {
37    /// Create a new optimizer with default passes
38    pub fn new() -> Self {
39        let passes: Vec<Box<dyn OptimizationPass>> = vec![
40            Box::new(PredicatePushdownPass),
41            Box::new(ProjectionPushdownPass),
42            Box::new(JoinReorderingPass),
43            Box::new(IndexSelectionPass),
44            Box::new(LimitPushdownPass),
45        ];
46
47        Self {
48            passes,
49            cost_based: true,
50        }
51    }
52
53    /// Add a custom optimization pass
54    pub fn add_pass(&mut self, pass: Box<dyn OptimizationPass>) {
55        self.passes.push(pass);
56        // Sort by benefit (highest first)
57        self.passes.sort_by_key(|b| std::cmp::Reverse(b.benefit()));
58    }
59
60    /// Optimize a query expression
61    pub fn optimize(&self, query: QueryExpr) -> (QueryExpr, Vec<String>) {
62        let mut optimized = query;
63        let mut applied_passes = Vec::new();
64
65        for pass in &self.passes {
66            let before = format!("{:?}", optimized);
67            optimized = pass.apply(optimized);
68            let after = format!("{:?}", optimized);
69
70            if before != after {
71                applied_passes.push(pass.name().to_string());
72            }
73        }
74
75        (optimized, applied_passes)
76    }
77
78    /// Optimize with hints
79    pub fn optimize_with_hints(&self, query: QueryExpr, hints: &OptimizationHints) -> QueryExpr {
80        let mut optimized = query;
81
82        for pass in &self.passes {
83            // Check if pass is disabled by hints
84            if hints.disabled_passes.contains(&pass.name().to_string()) {
85                continue;
86            }
87
88            optimized = pass.apply(optimized);
89        }
90
91        optimized
92    }
93}
94
95impl Default for QueryOptimizer {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101/// Hints to control optimization
102#[derive(Debug, Clone, Default)]
103pub struct OptimizationHints {
104    /// Disabled optimization passes
105    pub disabled_passes: Vec<String>,
106    /// Force specific join order
107    pub join_order: Option<Vec<String>>,
108    /// Force specific index usage
109    pub force_index: Option<String>,
110    /// Disable parallel execution
111    pub no_parallel: bool,
112}
113
114// =============================================================================
115// Built-in Optimization Passes
116// =============================================================================
117
118/// Push predicates down to data sources
119struct PredicatePushdownPass;
120
121impl OptimizationPass for PredicatePushdownPass {
122    fn name(&self) -> &str {
123        "PredicatePushdown"
124    }
125
126    fn apply(&self, query: QueryExpr) -> QueryExpr {
127        match query {
128            QueryExpr::Join(jq) => self.optimize_join(jq),
129            other => other,
130        }
131    }
132
133    fn benefit(&self) -> u32 {
134        100 // High priority - reduces data early
135    }
136}
137
138impl PredicatePushdownPass {
139    fn optimize_join(&self, query: JoinQuery) -> QueryExpr {
140        // Analyze join condition to find pushable predicates
141        // This is a simplified version - real implementation would analyze
142        // predicate dependencies on join columns
143
144        let left = self.apply(*query.left);
145        let right = self.apply(*query.right);
146
147        QueryExpr::Join(JoinQuery {
148            left: Box::new(left),
149            right: Box::new(right),
150            ..query
151        })
152    }
153}
154
155/// Push projections down to eliminate columns early
156struct ProjectionPushdownPass;
157
158impl OptimizationPass for ProjectionPushdownPass {
159    fn name(&self) -> &str {
160        "ProjectionPushdown"
161    }
162
163    fn apply(&self, query: QueryExpr) -> QueryExpr {
164        match query {
165            QueryExpr::Join(jq) => {
166                // Analyze which columns are actually needed
167                let left = self.apply(*jq.left);
168                let right = self.apply(*jq.right);
169
170                QueryExpr::Join(JoinQuery {
171                    left: Box::new(left),
172                    right: Box::new(right),
173                    ..jq
174                })
175            }
176            QueryExpr::Table(tq) => {
177                // Table projections already use specific column projections
178                // No transformation needed - already efficient
179                QueryExpr::Table(tq)
180            }
181            other => other,
182        }
183    }
184
185    fn benefit(&self) -> u32 {
186        80 // High priority - reduces memory
187    }
188}
189
190/// Reorder joins for optimal execution
191struct JoinReorderingPass;
192
193impl OptimizationPass for JoinReorderingPass {
194    fn name(&self) -> &str {
195        "JoinReordering"
196    }
197
198    fn apply(&self, query: QueryExpr) -> QueryExpr {
199        match query {
200            QueryExpr::Join(jq) => {
201                // For now, just ensure smaller table is on build side
202                // Real IDP algorithm would enumerate join orderings
203                self.optimize_join_order(jq)
204            }
205            other => other,
206        }
207    }
208
209    fn benefit(&self) -> u32 {
210        90 // High priority - join order greatly affects cost
211    }
212}
213
214impl JoinReorderingPass {
215    fn optimize_join_order(&self, query: JoinQuery) -> QueryExpr {
216        // Estimate cardinalities
217        let left_size = Self::estimate_size(&query.left);
218        let right_size = Self::estimate_size(&query.right);
219
220        // For hash join, smaller table should be build side (left)
221        if left_size > right_size && query.join_type == JoinType::Inner {
222            // Swap left and right
223            let JoinQuery {
224                left,
225                right,
226                join_type,
227                on,
228                filter,
229                order_by,
230                limit,
231                offset,
232                return_items,
233                return_,
234            } = query;
235            QueryExpr::Join(JoinQuery {
236                left: right,
237                right: left,
238                join_type,
239                on: swap_condition(on),
240                filter,
241                order_by,
242                limit,
243                offset,
244                return_items,
245                return_,
246            })
247        } else {
248            QueryExpr::Join(query)
249        }
250    }
251
252    fn estimate_size(query: &QueryExpr) -> f64 {
253        match query {
254            QueryExpr::Table(tq) => {
255                let base = 1000.0;
256                if effective_table_filter(tq).is_some() {
257                    base * 0.1
258                } else if tq.limit.is_some() {
259                    tq.limit.unwrap() as f64
260                } else {
261                    base
262                }
263            }
264            QueryExpr::Graph(_) => 100.0,
265            QueryExpr::Join(jq) => {
266                Self::estimate_size(&jq.left) * Self::estimate_size(&jq.right) * 0.1
267            }
268            QueryExpr::Path(_) => 10.0,
269            QueryExpr::Vector(vq) => {
270                // Vector search returns k results
271                if effective_vector_filter(vq).is_some() {
272                    (vq.k as f64).min(100.0)
273                } else {
274                    vq.k as f64
275                }
276            }
277            QueryExpr::Hybrid(hq) => {
278                // Hybrid query combines structured and vector results
279                let structured_size = Self::estimate_size(&hq.structured);
280                let vector_size = hq.vector.k as f64;
281                // Fusion typically reduces to min of both, limited by limit
282                let base = structured_size.min(vector_size);
283                hq.limit.map(|l| base.min(l as f64)).unwrap_or(base)
284            }
285            // DML/DDL/Command statements return minimal result sets
286            QueryExpr::Insert(_)
287            | QueryExpr::Update(_)
288            | QueryExpr::Delete(_)
289            | QueryExpr::CreateTable(_)
290            | QueryExpr::CreateCollection(_)
291            | QueryExpr::CreateVector(_)
292            | QueryExpr::DropTable(_)
293            | QueryExpr::DropGraph(_)
294            | QueryExpr::DropVector(_)
295            | QueryExpr::DropDocument(_)
296            | QueryExpr::DropKv(_)
297            | QueryExpr::DropCollection(_)
298            | QueryExpr::Truncate(_)
299            | QueryExpr::AlterTable(_)
300            | QueryExpr::GraphCommand(_)
301            | QueryExpr::SearchCommand(_)
302            | QueryExpr::CreateIndex(_)
303            | QueryExpr::DropIndex(_)
304            | QueryExpr::ProbabilisticCommand(_)
305            | QueryExpr::Ask(_)
306            | QueryExpr::SetConfig { .. }
307            | QueryExpr::ShowConfig { .. }
308            | QueryExpr::SetSecret { .. }
309            | QueryExpr::DeleteSecret { .. }
310            | QueryExpr::ShowSecrets { .. }
311            | QueryExpr::SetTenant(_)
312            | QueryExpr::ShowTenant
313            | QueryExpr::CreateTimeSeries(_)
314            | QueryExpr::CreateMetric(_)
315            | QueryExpr::AlterMetric(_)
316            | QueryExpr::CreateSlo(_)
317            | QueryExpr::DropTimeSeries(_)
318            | QueryExpr::CreateQueue(_)
319            | QueryExpr::AlterQueue(_)
320            | QueryExpr::DropQueue(_)
321            | QueryExpr::QueueSelect(_)
322            | QueryExpr::QueueCommand(_)
323            | QueryExpr::KvCommand(_)
324            | QueryExpr::ConfigCommand(_)
325            | QueryExpr::CreateTree(_)
326            | QueryExpr::DropTree(_)
327            | QueryExpr::TreeCommand(_)
328            | QueryExpr::ExplainAlter(_)
329            | QueryExpr::TransactionControl(_)
330            | QueryExpr::MaintenanceCommand(_)
331            | QueryExpr::CreateSchema(_)
332            | QueryExpr::DropSchema(_)
333            | QueryExpr::CreateSequence(_)
334            | QueryExpr::DropSequence(_)
335            | QueryExpr::CopyFrom(_)
336            | QueryExpr::CreateView(_)
337            | QueryExpr::DropView(_)
338            | QueryExpr::RefreshMaterializedView(_)
339            | QueryExpr::CreatePolicy(_)
340            | QueryExpr::DropPolicy(_)
341            | QueryExpr::CreateServer(_)
342            | QueryExpr::DropServer(_)
343            | QueryExpr::CreateForeignTable(_)
344            | QueryExpr::DropForeignTable(_)
345            | QueryExpr::Grant(_)
346            | QueryExpr::Revoke(_)
347            | QueryExpr::AlterUser(_)
348            | QueryExpr::CreateIamPolicy { .. }
349            | QueryExpr::DropIamPolicy { .. }
350            | QueryExpr::AttachPolicy { .. }
351            | QueryExpr::DetachPolicy { .. }
352            | QueryExpr::ShowPolicies { .. }
353            | QueryExpr::ShowEffectivePermissions { .. }
354            | QueryExpr::RankOf(_)
355            | QueryExpr::ApproxRankOf(_)
356            | QueryExpr::RankRange(_)
357            | QueryExpr::SimulatePolicy { .. }
358            | QueryExpr::LintPolicy { .. }
359            | QueryExpr::MigratePolicyMode { .. }
360            | QueryExpr::CreateMigration(_)
361            | QueryExpr::ApplyMigration(_)
362            | QueryExpr::RollbackMigration(_)
363            | QueryExpr::ExplainMigration(_)
364            | QueryExpr::EventsBackfill(_)
365            | QueryExpr::EventsBackfillStatus { .. } => 1.0,
366        }
367    }
368}
369
370/// Select optimal indexes for table scans.
371///
372/// Analyzes filter predicates and annotates the query plan with index hints:
373/// - Equality predicates (`col = value`) → prefer Hash index if available
374/// - Low-cardinality equality → prefer Bitmap index
375/// - Range predicates (`col > value`, `BETWEEN`) → prefer B-tree
376/// - Spatial predicates → prefer R-tree
377///
378/// The hints are stored in the TableQuery's alias field as a prefix
379/// (e.g., `__idx:hash:col_name`) which the executor can read to skip
380/// full scans. This is a lightweight approach that avoids adding new
381/// fields to the AST while enabling index-aware execution.
382struct IndexSelectionPass;
383
384impl OptimizationPass for IndexSelectionPass {
385    fn name(&self) -> &str {
386        "IndexSelection"
387    }
388
389    fn apply(&self, query: QueryExpr) -> QueryExpr {
390        match query {
391            QueryExpr::Table(mut tq) => {
392                if let Some(filter) = effective_table_filter(&tq).as_ref() {
393                    if let Some(hint) = Self::analyze_filter(filter) {
394                        // Store index hint in expand metadata for executor
395                        let expand = tq.expand.get_or_insert_with(Default::default);
396                        expand.index_hint = Some(hint);
397                    }
398                }
399                QueryExpr::Table(tq)
400            }
401            other => other,
402        }
403    }
404
405    fn benefit(&self) -> u32 {
406        70
407    }
408}
409
410impl IndexSelectionPass {
411    /// Analyze a filter predicate and return the best index hint
412    fn analyze_filter(filter: &crate::storage::query::ast::Filter) -> Option<IndexHint> {
413        match filter {
414            // Equality on a single column → Hash index candidate
415            crate::storage::query::ast::Filter::Compare { field, op, .. }
416                if *op == crate::storage::query::ast::CompareOp::Eq =>
417            {
418                let col = Self::field_name(field);
419                Some(IndexHint {
420                    method: IndexHintMethod::Hash,
421                    column: col,
422                })
423            }
424            // Range predicates → B-tree candidate
425            crate::storage::query::ast::Filter::Compare {
426                field,
427                op:
428                    crate::storage::query::ast::CompareOp::Lt
429                    | crate::storage::query::ast::CompareOp::Le
430                    | crate::storage::query::ast::CompareOp::Gt
431                    | crate::storage::query::ast::CompareOp::Ge,
432                ..
433            } => {
434                let col = Self::field_name(field);
435                Some(IndexHint {
436                    method: IndexHintMethod::BTree,
437                    column: col,
438                })
439            }
440            // BETWEEN → B-tree candidate
441            crate::storage::query::ast::Filter::Between { field, .. } => {
442                let col = Self::field_name(field);
443                Some(IndexHint {
444                    method: IndexHintMethod::BTree,
445                    column: col,
446                })
447            }
448            // IN with few values → Bitmap candidate
449            crate::storage::query::ast::Filter::In { field, values } if values.len() <= 10 => {
450                let col = Self::field_name(field);
451                Some(IndexHint {
452                    method: IndexHintMethod::Bitmap,
453                    column: col,
454                })
455            }
456            // AND: pick the most selective hint from left or right
457            crate::storage::query::ast::Filter::And(left, right) => {
458                Self::analyze_filter(left).or_else(|| Self::analyze_filter(right))
459            }
460            _ => None,
461        }
462    }
463
464    fn field_name(field: &crate::storage::query::ast::FieldRef) -> String {
465        match field {
466            crate::storage::query::ast::FieldRef::TableColumn { column, .. } => column.clone(),
467            crate::storage::query::ast::FieldRef::NodeProperty { property, .. } => property.clone(),
468            crate::storage::query::ast::FieldRef::EdgeProperty { property, .. } => property.clone(),
469            crate::storage::query::ast::FieldRef::NodeId { alias } => {
470                format!("{}.id", alias)
471            }
472        }
473    }
474}
475
476/// Hint about which index method to prefer for a query
477#[derive(Debug, Clone)]
478pub struct IndexHint {
479    /// Preferred index method
480    pub method: IndexHintMethod,
481    /// Column the index applies to
482    pub column: String,
483}
484
485/// Which index method the optimizer recommends
486#[derive(Debug, Clone, Copy, PartialEq, Eq)]
487pub enum IndexHintMethod {
488    Hash,
489    BTree,
490    Bitmap,
491    Spatial,
492}
493
494/// Push LIMIT down through operations
495struct LimitPushdownPass;
496
497impl OptimizationPass for LimitPushdownPass {
498    fn name(&self) -> &str {
499        "LimitPushdown"
500    }
501
502    fn apply(&self, query: QueryExpr) -> QueryExpr {
503        match query {
504            QueryExpr::Join(jq) => {
505                // Can push limit through certain joins
506                let left = self.apply(*jq.left);
507                let right = self.apply(*jq.right);
508
509                QueryExpr::Join(JoinQuery {
510                    left: Box::new(left),
511                    right: Box::new(right),
512                    ..jq
513                })
514            }
515            other => other,
516        }
517    }
518
519    fn benefit(&self) -> u32 {
520        60
521    }
522}
523
524// =============================================================================
525// Helper Functions
526// =============================================================================
527
528fn swap_condition(
529    condition: crate::storage::query::ast::JoinCondition,
530) -> crate::storage::query::ast::JoinCondition {
531    crate::storage::query::ast::JoinCondition {
532        left_field: condition.right_field,
533        right_field: condition.left_field,
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use crate::storage::query::ast::{
541        DistanceMetric, FieldRef, FusionStrategy, JoinCondition, Projection, TableQuery,
542    };
543
544    fn make_table_query(name: &str) -> QueryExpr {
545        QueryExpr::Table(TableQuery {
546            table: name.to_string(),
547            source: None,
548            alias: Some(name.to_string()),
549            select_items: Vec::new(),
550            columns: vec![Projection::All],
551            where_expr: None,
552            filter: None,
553            group_by_exprs: Vec::new(),
554            group_by: Vec::new(),
555            having_expr: None,
556            having: None,
557            order_by: vec![],
558            limit: None,
559            limit_param: None,
560            offset: None,
561            offset_param: None,
562            expand: None,
563            as_of: None,
564            sessionize: None,
565        })
566    }
567
568    #[test]
569    fn test_optimizer_applies_passes() {
570        let optimizer = QueryOptimizer::new();
571        let query = make_table_query("hosts");
572
573        let (optimized, passes) = optimizer.optimize(query);
574        // Should at least attempt the passes
575        assert!(matches!(optimized, QueryExpr::Table(_)));
576    }
577
578    #[test]
579    fn test_join_reordering() {
580        let optimizer = QueryOptimizer::new();
581
582        let small = QueryExpr::Table(TableQuery {
583            table: "small".to_string(),
584            source: None,
585            alias: None,
586            select_items: Vec::new(),
587            columns: vec![Projection::All],
588            where_expr: None,
589            filter: None,
590            group_by_exprs: Vec::new(),
591            group_by: Vec::new(),
592            having_expr: None,
593            having: None,
594            order_by: vec![],
595            limit: Some(10), // Small table
596            limit_param: None,
597            offset: None,
598            offset_param: None,
599            expand: None,
600            as_of: None,
601            sessionize: None,
602        });
603
604        let large = QueryExpr::Table(TableQuery {
605            table: "large".to_string(),
606            source: None,
607            alias: None,
608            select_items: Vec::new(),
609            columns: vec![Projection::All],
610            where_expr: None,
611            filter: None,
612            group_by_exprs: Vec::new(),
613            group_by: Vec::new(),
614            having_expr: None,
615            having: None,
616            order_by: vec![],
617            limit: None, // Large table
618            limit_param: None,
619            offset: None,
620            offset_param: None,
621            expand: None,
622            as_of: None,
623            sessionize: None,
624        });
625
626        let join = QueryExpr::Join(JoinQuery {
627            left: Box::new(large.clone()),
628            right: Box::new(small.clone()),
629            join_type: JoinType::Inner,
630            on: JoinCondition {
631                left_field: FieldRef::TableColumn {
632                    table: "large".to_string(),
633                    column: "id".to_string(),
634                },
635                right_field: FieldRef::TableColumn {
636                    table: "small".to_string(),
637                    column: "id".to_string(),
638                },
639            },
640            filter: None,
641            order_by: Vec::new(),
642            limit: None,
643            offset: None,
644            return_items: Vec::new(),
645            return_: Vec::new(),
646        });
647
648        let (optimized, passes) = optimizer.optimize(join);
649
650        // Should have applied JoinReordering
651        if let QueryExpr::Join(jq) = optimized {
652            // Small table should now be on left (build side)
653            if let QueryExpr::Table(left) = *jq.left {
654                assert_eq!(left.table, "small");
655            }
656        }
657    }
658}