rustlite_core/query/
planner.rs

1/// Query planner and optimizer
2///
3/// Converts AST into optimized physical execution plans.
4use super::ast::*;
5use std::fmt;
6
7/// Physical query plan
8#[derive(Debug, Clone)]
9pub struct PhysicalPlan {
10    pub root: PhysicalOperator,
11}
12
13/// Physical operators for query execution
14#[derive(Debug, Clone)]
15pub enum PhysicalOperator {
16    /// Full table scan
17    TableScan { table: String },
18    /// Index scan with exact match
19    IndexScan {
20        table: String,
21        index: String,
22        key: Vec<u8>,
23    },
24    /// Index range scan
25    IndexRangeScan {
26        table: String,
27        index: String,
28        start: Option<Vec<u8>>,
29        end: Option<Vec<u8>>,
30    },
31    /// Filter rows based on predicate
32    Filter {
33        input: Box<PhysicalOperator>,
34        condition: Expression,
35    },
36    /// Sort rows
37    Sort {
38        input: Box<PhysicalOperator>,
39        columns: Vec<OrderByColumn>,
40    },
41    /// Limit number of results
42    Limit {
43        input: Box<PhysicalOperator>,
44        count: usize,
45        offset: usize,
46    },
47    /// Project columns (SELECT specific columns)
48    Project {
49        input: Box<PhysicalOperator>,
50        columns: Vec<SelectColumn>,
51    },
52    /// Hash join (inner, left, right, full)
53    HashJoin {
54        left: Box<PhysicalOperator>,
55        right: Box<PhysicalOperator>,
56        join_type: JoinType,
57        condition: Expression,
58    },
59    /// GROUP BY with optional aggregation
60    GroupBy {
61        input: Box<PhysicalOperator>,
62        group_columns: Vec<String>,
63        aggregates: Vec<SelectColumn>,
64        having: Option<Expression>,
65    },
66    /// Aggregation (COUNT, SUM, AVG, MIN, MAX) without grouping
67    Aggregate {
68        input: Box<PhysicalOperator>,
69        aggregates: Vec<SelectColumn>,
70    },
71}
72
73/// Query planner
74pub struct Planner {
75    /// Available indexes for optimization
76    available_indexes: Vec<IndexMetadata>,
77}
78
79/// Metadata about available indexes
80#[derive(Debug, Clone)]
81pub struct IndexMetadata {
82    pub name: String,
83    pub table: String,
84    pub index_type: String, // "BTree" or "Hash"
85}
86
87impl Planner {
88    /// Create a new planner
89    pub fn new() -> Self {
90        Self {
91            available_indexes: Vec::new(),
92        }
93    }
94
95    /// Create planner with known indexes
96    pub fn with_indexes(indexes: Vec<IndexMetadata>) -> Self {
97        Self {
98            available_indexes: indexes,
99        }
100    }
101
102    /// Plan a query
103    pub fn plan(&self, query: &Query) -> Result<PhysicalPlan, PlanError> {
104        // Start with base table access
105        let mut plan = self.plan_table_access(&query.from)?;
106
107        // Apply WHERE clause (predicate pushdown)
108        if let Some(ref where_clause) = query.where_clause {
109            plan = self.apply_filter(plan, &where_clause.condition)?;
110        }
111
112        // Check if we have aggregates or GROUP BY
113        let has_aggregates = query
114            .select
115            .columns
116            .iter()
117            .any(|col| matches!(col, SelectColumn::Aggregate { .. }));
118
119        // Apply GROUP BY and aggregation if needed
120        if let Some(ref group_by) = query.group_by {
121            // For GROUP BY, we need:
122            // 1. All columns used in GROUP BY
123            // 2. All columns referenced in aggregate functions
124            // We'll just pass through all columns (TableScan) and let GroupBy handle it
125
126            // Apply GROUP BY with aggregates
127            plan = PhysicalOperator::GroupBy {
128                input: Box::new(plan),
129                group_columns: group_by.columns.clone(),
130                aggregates: query.select.columns.clone(),
131                having: query.having.as_ref().map(|h| h.condition.clone()),
132            };
133        } else if has_aggregates {
134            // For aggregation without GROUP BY, we also need all referenced columns
135            // Pass through TableScan directly
136
137            // Aggregation without GROUP BY
138            plan = PhysicalOperator::Aggregate {
139                input: Box::new(plan),
140                aggregates: query.select.columns.clone(),
141            };
142        } else {
143            // No aggregates - normal projection
144            plan = PhysicalOperator::Project {
145                input: Box::new(plan),
146                columns: query.select.columns.clone(),
147            };
148        }
149
150        // Apply ORDER BY
151        if let Some(ref order_by) = query.order_by {
152            plan = PhysicalOperator::Sort {
153                input: Box::new(plan),
154                columns: order_by.columns.clone(),
155            };
156        }
157
158        // Apply LIMIT
159        if let Some(ref limit) = query.limit {
160            plan = PhysicalOperator::Limit {
161                input: Box::new(plan),
162                count: limit.count,
163                offset: limit.offset.unwrap_or(0),
164            };
165        }
166
167        Ok(PhysicalPlan { root: plan })
168    }
169
170    fn plan_table_access(&self, from: &FromClause) -> Result<PhysicalOperator, PlanError> {
171        let mut plan = PhysicalOperator::TableScan {
172            table: from.table.clone(),
173        };
174
175        // Plan JOINs
176        for join in &from.joins {
177            let right = PhysicalOperator::TableScan {
178                table: join.table.clone(),
179            };
180
181            plan = PhysicalOperator::HashJoin {
182                left: Box::new(plan),
183                right: Box::new(right),
184                join_type: join.join_type.clone(),
185                condition: join.condition.clone(),
186            };
187        }
188
189        Ok(plan)
190    }
191
192    fn apply_filter(
193        &self,
194        input: PhysicalOperator,
195        condition: &Expression,
196    ) -> Result<PhysicalOperator, PlanError> {
197        // Try to use index if available
198        if let Some(index_scan) = self.try_index_scan(condition) {
199            return Ok(index_scan);
200        }
201
202        // Otherwise, use filter operator
203        Ok(PhysicalOperator::Filter {
204            input: Box::new(input),
205            condition: condition.clone(),
206        })
207    }
208
209    fn try_index_scan(&self, condition: &Expression) -> Option<PhysicalOperator> {
210        // Check if condition can use an index
211        match condition {
212            Expression::BinaryOp { left, op, right } => {
213                // Extract column name and value
214                let (column, value) = match (left.as_ref(), right.as_ref()) {
215                    (Expression::Column(col), Expression::Literal(lit)) => (col, lit),
216                    _ => return None,
217                };
218
219                // Find matching index
220                for index in &self.available_indexes {
221                    // Simplified: assume index name contains column name
222                    if index.name.contains(column) {
223                        match index.index_type.as_str() {
224                            "Hash" if *op == BinaryOperator::Eq => {
225                                // Use hash index for exact match
226                                return Some(PhysicalOperator::IndexScan {
227                                    table: index.table.clone(),
228                                    index: index.name.clone(),
229                                    key: literal_to_bytes(value),
230                                });
231                            }
232                            "BTree" => {
233                                // Use B-Tree index for range queries
234                                match op {
235                                    BinaryOperator::Eq => {
236                                        return Some(PhysicalOperator::IndexScan {
237                                            table: index.table.clone(),
238                                            index: index.name.clone(),
239                                            key: literal_to_bytes(value),
240                                        });
241                                    }
242                                    BinaryOperator::Lt | BinaryOperator::Le => {
243                                        return Some(PhysicalOperator::IndexRangeScan {
244                                            table: index.table.clone(),
245                                            index: index.name.clone(),
246                                            start: None,
247                                            end: Some(literal_to_bytes(value)),
248                                        });
249                                    }
250                                    BinaryOperator::Gt | BinaryOperator::Ge => {
251                                        return Some(PhysicalOperator::IndexRangeScan {
252                                            table: index.table.clone(),
253                                            index: index.name.clone(),
254                                            start: Some(literal_to_bytes(value)),
255                                            end: None,
256                                        });
257                                    }
258                                    _ => {}
259                                }
260                            }
261                            _ => {}
262                        }
263                    }
264                }
265            }
266            Expression::Between { expr, min, max } => {
267                // Extract column name
268                let column = match expr.as_ref() {
269                    Expression::Column(col) => col,
270                    _ => return None,
271                };
272
273                // Find matching B-Tree index
274                for index in &self.available_indexes {
275                    if index.index_type == "BTree" && index.name.contains(column) {
276                        let start = match min.as_ref() {
277                            Expression::Literal(lit) => Some(literal_to_bytes(lit)),
278                            _ => None,
279                        };
280                        let end = match max.as_ref() {
281                            Expression::Literal(lit) => Some(literal_to_bytes(lit)),
282                            _ => None,
283                        };
284
285                        return Some(PhysicalOperator::IndexRangeScan {
286                            table: index.table.clone(),
287                            index: index.name.clone(),
288                            start,
289                            end,
290                        });
291                    }
292                }
293            }
294            _ => {}
295        }
296
297        None
298    }
299}
300
301impl Default for Planner {
302    fn default() -> Self {
303        Self::new()
304    }
305}
306
307/// Convert literal to bytes for index lookup
308fn literal_to_bytes(literal: &Literal) -> Vec<u8> {
309    match literal {
310        Literal::Integer(i) => i.to_le_bytes().to_vec(),
311        Literal::Float(f) => f.to_le_bytes().to_vec(),
312        Literal::String(s) => s.as_bytes().to_vec(),
313        Literal::Boolean(b) => vec![if *b { 1 } else { 0 }],
314        Literal::Null => vec![],
315    }
316}
317
318/// Planning errors
319#[derive(Debug, Clone)]
320pub enum PlanError {
321    UnsupportedOperation(String),
322    InvalidExpression(String),
323}
324
325impl fmt::Display for PlanError {
326    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327        match self {
328            PlanError::UnsupportedOperation(op) => write!(f, "Unsupported operation: {}", op),
329            PlanError::InvalidExpression(expr) => write!(f, "Invalid expression: {}", expr),
330        }
331    }
332}
333
334impl std::error::Error for PlanError {}
335
336impl fmt::Display for PhysicalPlan {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        write!(f, "{}", self.root)
339    }
340}
341
342impl fmt::Display for PhysicalOperator {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        match self {
345            PhysicalOperator::TableScan { table } => write!(f, "TableScan({})", table),
346            PhysicalOperator::IndexScan { table, index, .. } => {
347                write!(f, "IndexScan({}.{})", table, index)
348            }
349            PhysicalOperator::IndexRangeScan { table, index, .. } => {
350                write!(f, "IndexRangeScan({}.{})", table, index)
351            }
352            PhysicalOperator::Filter { input, condition } => {
353                write!(f, "Filter({}) -> {}", condition, input)
354            }
355            PhysicalOperator::Sort { input, columns } => {
356                write!(f, "Sort(")?;
357                for (i, col) in columns.iter().enumerate() {
358                    if i > 0 {
359                        write!(f, ", ")?;
360                    }
361                    write!(f, "{}", col)?;
362                }
363                write!(f, ") -> {}", input)
364            }
365            PhysicalOperator::Limit {
366                input,
367                count,
368                offset,
369            } => {
370                write!(f, "Limit({}, {}) -> {}", count, offset, input)
371            }
372            PhysicalOperator::Project { input, columns } => {
373                write!(f, "Project(")?;
374                for (i, col) in columns.iter().enumerate() {
375                    if i > 0 {
376                        write!(f, ", ")?;
377                    }
378                    write!(f, "{}", col)?;
379                }
380                write!(f, ") -> {}", input)
381            }
382            PhysicalOperator::HashJoin {
383                left,
384                right,
385                join_type,
386                ..
387            } => {
388                write!(f, "{}Join({} x {})", join_type, left, right)
389            }
390            PhysicalOperator::GroupBy {
391                input,
392                group_columns,
393                aggregates,
394                having,
395            } => {
396                write!(f, "GroupBy(")?;
397                for (i, col) in group_columns.iter().enumerate() {
398                    if i > 0 {
399                        write!(f, ", ")?;
400                    }
401                    write!(f, "{}", col)?;
402                }
403                if !aggregates.is_empty() {
404                    write!(f, " | ")?;
405                    for (i, agg) in aggregates.iter().enumerate() {
406                        if i > 0 {
407                            write!(f, ", ")?;
408                        }
409                        write!(f, "{}", agg)?;
410                    }
411                }
412                if let Some(h) = having {
413                    write!(f, " HAVING {}", h)?;
414                }
415                write!(f, ") -> {}", input)
416            }
417            PhysicalOperator::Aggregate { input, aggregates } => {
418                write!(f, "Aggregate(")?;
419                for (i, agg) in aggregates.iter().enumerate() {
420                    if i > 0 {
421                        write!(f, ", ")?;
422                    }
423                    write!(f, "{}", agg)?;
424                }
425                write!(f, ") -> {}", input)
426            }
427        }
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use crate::query::parser::Parser;
435
436    #[test]
437    fn test_simple_plan() {
438        let mut parser = Parser::new("SELECT * FROM users").unwrap();
439        let query = parser.parse().unwrap();
440
441        let planner = Planner::new();
442        let plan = planner.plan(&query).unwrap();
443
444        // Should have Project -> TableScan
445        match plan.root {
446            PhysicalOperator::Project { input, .. } => match *input {
447                PhysicalOperator::TableScan { .. } => {}
448                _ => panic!("Expected TableScan"),
449            },
450            _ => panic!("Expected Project"),
451        }
452    }
453
454    #[test]
455    fn test_filter_plan() {
456        let mut parser = Parser::new("SELECT * FROM users WHERE age > 18").unwrap();
457        let query = parser.parse().unwrap();
458
459        let planner = Planner::new();
460        let plan = planner.plan(&query).unwrap();
461
462        // Should have Project -> Filter -> TableScan
463        match plan.root {
464            PhysicalOperator::Project { input, .. } => match *input {
465                PhysicalOperator::Filter { .. } => {}
466                _ => panic!("Expected Filter"),
467            },
468            _ => panic!("Expected Project"),
469        }
470    }
471
472    #[test]
473    fn test_order_by_plan() {
474        let mut parser = Parser::new("SELECT * FROM users ORDER BY name").unwrap();
475        let query = parser.parse().unwrap();
476
477        let planner = Planner::new();
478        let plan = planner.plan(&query).unwrap();
479
480        // Should have Sort somewhere in the plan
481        let plan_str = format!("{}", plan);
482        assert!(plan_str.contains("Sort"));
483    }
484
485    #[test]
486    fn test_limit_plan() {
487        let mut parser = Parser::new("SELECT * FROM users LIMIT 10").unwrap();
488        let query = parser.parse().unwrap();
489
490        let planner = Planner::new();
491        let plan = planner.plan(&query).unwrap();
492
493        // Should have Limit somewhere in the plan
494        let plan_str = format!("{}", plan);
495        assert!(plan_str.contains("Limit"));
496    }
497}