Skip to main content

aegis_query/
planner.rs

1//! Aegis Query Planner
2//!
3//! Transforms analyzed SQL statements into optimized query plans.
4//! Implements cost-based optimization with rule-based transformations.
5//!
6//! Key Features:
7//! - Logical plan generation from AST
8//! - Cost-based join ordering
9//! - Predicate pushdown optimization
10//! - Index selection
11//!
12//! @version 0.1.0
13//! @author AutomataNexus Development Team
14
15use crate::ast::{
16    BinaryOperator, Expression, FromClause, JoinClause, JoinType,
17    SelectColumn, SelectStatement, Statement, TableReference,
18};
19use aegis_common::DataType;
20use std::collections::HashMap;
21use std::sync::Arc;
22use thiserror::Error;
23
24// =============================================================================
25// Error Types
26// =============================================================================
27
28#[derive(Debug, Error)]
29pub enum PlannerError {
30    #[error("Table not found: {0}")]
31    TableNotFound(String),
32
33    #[error("Column not found: {0}")]
34    ColumnNotFound(String),
35
36    #[error("Unsupported operation: {0}")]
37    UnsupportedOperation(String),
38
39    #[error("Internal planner error: {0}")]
40    Internal(String),
41}
42
43pub type PlannerResult<T> = Result<T, PlannerError>;
44
45// =============================================================================
46// Query Plan
47// =============================================================================
48
49/// A complete query plan ready for execution.
50#[derive(Debug, Clone)]
51pub struct QueryPlan {
52    pub root: PlanNode,
53    pub estimated_cost: f64,
54    pub estimated_rows: u64,
55}
56
57/// A node in the query plan tree.
58#[derive(Debug, Clone)]
59pub enum PlanNode {
60    Scan(ScanNode),
61    Filter(FilterNode),
62    Project(ProjectNode),
63    Join(JoinNode),
64    Aggregate(AggregateNode),
65    Sort(SortNode),
66    Limit(LimitNode),
67    Empty,
68    // DDL nodes
69    CreateTable(CreateTableNode),
70    DropTable(DropTableNode),
71    AlterTable(AlterTableNode),
72    CreateIndex(CreateIndexNode),
73    DropIndex(DropIndexNode),
74    // DML nodes
75    Insert(InsertNode),
76    Update(UpdateNode),
77    Delete(DeleteNode),
78}
79
80// =============================================================================
81// Plan Node Types
82// =============================================================================
83
84/// Table scan operation.
85#[derive(Debug, Clone)]
86pub struct ScanNode {
87    pub table_name: String,
88    pub alias: Option<String>,
89    pub columns: Vec<String>,
90    pub index_scan: Option<IndexScan>,
91}
92
93/// Index scan specification.
94#[derive(Debug, Clone)]
95pub struct IndexScan {
96    pub index_name: String,
97    pub key_range: KeyRange,
98}
99
100/// Key range for index scans.
101#[derive(Debug, Clone)]
102pub struct KeyRange {
103    pub start: Option<PlanExpression>,
104    pub start_inclusive: bool,
105    pub end: Option<PlanExpression>,
106    pub end_inclusive: bool,
107}
108
109/// Filter operation.
110#[derive(Debug, Clone)]
111pub struct FilterNode {
112    pub input: Box<PlanNode>,
113    pub predicate: PlanExpression,
114}
115
116/// Projection operation.
117#[derive(Debug, Clone)]
118pub struct ProjectNode {
119    pub input: Box<PlanNode>,
120    pub expressions: Vec<ProjectionExpr>,
121}
122
123/// A projection expression with optional alias.
124#[derive(Debug, Clone)]
125pub struct ProjectionExpr {
126    pub expr: PlanExpression,
127    pub alias: Option<String>,
128}
129
130/// Join operation.
131#[derive(Debug, Clone)]
132pub struct JoinNode {
133    pub left: Box<PlanNode>,
134    pub right: Box<PlanNode>,
135    pub join_type: PlanJoinType,
136    pub condition: Option<PlanExpression>,
137    pub strategy: JoinStrategy,
138}
139
140/// Join type in plan.
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub enum PlanJoinType {
143    Inner,
144    Left,
145    Right,
146    Full,
147    Cross,
148}
149
150/// Physical join strategy.
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152pub enum JoinStrategy {
153    NestedLoop,
154    HashJoin,
155    MergeJoin,
156}
157
158/// Aggregation operation.
159#[derive(Debug, Clone)]
160pub struct AggregateNode {
161    pub input: Box<PlanNode>,
162    pub group_by: Vec<PlanExpression>,
163    pub aggregates: Vec<AggregateExpr>,
164}
165
166/// Aggregate expression.
167#[derive(Debug, Clone)]
168pub struct AggregateExpr {
169    pub function: AggregateFunction,
170    pub argument: Option<PlanExpression>,
171    pub distinct: bool,
172    pub alias: Option<String>,
173}
174
175/// Aggregate functions.
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub enum AggregateFunction {
178    Count,
179    Sum,
180    Avg,
181    Min,
182    Max,
183}
184
185/// Sort operation.
186#[derive(Debug, Clone)]
187pub struct SortNode {
188    pub input: Box<PlanNode>,
189    pub order_by: Vec<SortKey>,
190}
191
192/// Sort key specification.
193#[derive(Debug, Clone)]
194pub struct SortKey {
195    pub expr: PlanExpression,
196    pub ascending: bool,
197    pub nulls_first: bool,
198}
199
200/// Limit operation.
201#[derive(Debug, Clone)]
202pub struct LimitNode {
203    pub input: Box<PlanNode>,
204    pub limit: Option<u64>,
205    pub offset: Option<u64>,
206}
207
208// =============================================================================
209// DDL Plan Nodes
210// =============================================================================
211
212/// CREATE TABLE operation.
213#[derive(Debug, Clone)]
214pub struct CreateTableNode {
215    pub table_name: String,
216    pub columns: Vec<CreateColumnDef>,
217    pub constraints: Vec<CreateTableConstraint>,
218    pub if_not_exists: bool,
219}
220
221/// Column definition for CREATE TABLE.
222#[derive(Debug, Clone)]
223pub struct CreateColumnDef {
224    pub name: String,
225    pub data_type: DataType,
226    pub nullable: bool,
227    pub default: Option<PlanExpression>,
228    pub primary_key: bool,
229    pub unique: bool,
230}
231
232/// Table constraint for CREATE TABLE.
233#[derive(Debug, Clone)]
234pub enum CreateTableConstraint {
235    PrimaryKey { columns: Vec<String> },
236    Unique { columns: Vec<String> },
237    ForeignKey {
238        columns: Vec<String>,
239        ref_table: String,
240        ref_columns: Vec<String>,
241    },
242    Check { expression: PlanExpression },
243}
244
245/// DROP TABLE operation.
246#[derive(Debug, Clone)]
247pub struct DropTableNode {
248    pub table_name: String,
249    pub if_exists: bool,
250}
251
252/// ALTER TABLE operation.
253#[derive(Debug, Clone)]
254pub struct AlterTableNode {
255    pub table_name: String,
256    pub operations: Vec<PlanAlterOperation>,
257}
258
259/// ALTER TABLE operation type.
260#[derive(Debug, Clone)]
261pub enum PlanAlterOperation {
262    AddColumn(CreateColumnDef),
263    DropColumn { name: String, if_exists: bool },
264    RenameColumn { old_name: String, new_name: String },
265    AlterColumn { name: String, data_type: Option<DataType>, set_not_null: Option<bool>, set_default: Option<Option<PlanExpression>> },
266    RenameTable { new_name: String },
267    AddConstraint(CreateTableConstraint),
268    DropConstraint { name: String },
269}
270
271/// CREATE INDEX operation.
272#[derive(Debug, Clone)]
273pub struct CreateIndexNode {
274    pub index_name: String,
275    pub table_name: String,
276    pub columns: Vec<String>,
277    pub unique: bool,
278    pub if_not_exists: bool,
279}
280
281/// DROP INDEX operation.
282#[derive(Debug, Clone)]
283pub struct DropIndexNode {
284    pub index_name: String,
285    pub if_exists: bool,
286}
287
288// =============================================================================
289// DML Plan Nodes
290// =============================================================================
291
292/// Source for INSERT data in a query plan.
293#[derive(Debug, Clone)]
294pub enum InsertPlanSource {
295    /// Direct values (INSERT INTO ... VALUES ...)
296    Values(Vec<Vec<PlanExpression>>),
297    /// Subquery (INSERT INTO ... SELECT ...)
298    Query(Box<PlanNode>),
299}
300
301/// INSERT operation.
302#[derive(Debug, Clone)]
303pub struct InsertNode {
304    pub table_name: String,
305    pub columns: Vec<String>,
306    pub source: InsertPlanSource,
307}
308
309/// UPDATE operation.
310#[derive(Debug, Clone)]
311pub struct UpdateNode {
312    pub table_name: String,
313    pub assignments: Vec<(String, PlanExpression)>,
314    pub where_clause: Option<PlanExpression>,
315}
316
317/// DELETE operation.
318#[derive(Debug, Clone)]
319pub struct DeleteNode {
320    pub table_name: String,
321    pub where_clause: Option<PlanExpression>,
322}
323
324// =============================================================================
325// Plan Expressions
326// =============================================================================
327
328/// Expression in a query plan.
329#[derive(Debug, Clone)]
330pub enum PlanExpression {
331    Column {
332        table: Option<String>,
333        name: String,
334        data_type: DataType,
335    },
336    Literal(PlanLiteral),
337    BinaryOp {
338        left: Box<PlanExpression>,
339        op: PlanBinaryOp,
340        right: Box<PlanExpression>,
341    },
342    UnaryOp {
343        op: PlanUnaryOp,
344        expr: Box<PlanExpression>,
345    },
346    Function {
347        name: String,
348        args: Vec<PlanExpression>,
349        return_type: DataType,
350    },
351    Cast {
352        expr: Box<PlanExpression>,
353        target_type: DataType,
354    },
355    IsNull {
356        expr: Box<PlanExpression>,
357        negated: bool,
358    },
359    /// CASE expression
360    Case {
361        operand: Option<Box<PlanExpression>>,
362        conditions: Vec<(PlanExpression, PlanExpression)>,
363        else_result: Option<Box<PlanExpression>>,
364    },
365    /// IN list expression (e.g., x IN (1, 2, 3))
366    InList {
367        expr: Box<PlanExpression>,
368        list: Vec<PlanExpression>,
369        negated: bool,
370    },
371    /// BETWEEN expression (e.g., x BETWEEN 1 AND 10)
372    Between {
373        expr: Box<PlanExpression>,
374        low: Box<PlanExpression>,
375        high: Box<PlanExpression>,
376        negated: bool,
377    },
378    /// LIKE pattern matching expression
379    Like {
380        expr: Box<PlanExpression>,
381        pattern: Box<PlanExpression>,
382        negated: bool,
383    },
384    /// IN subquery expression (e.g., x IN (SELECT y FROM t))
385    InSubquery {
386        expr: Box<PlanExpression>,
387        subquery: Box<PlanNode>,
388        negated: bool,
389    },
390    /// EXISTS subquery expression
391    Exists {
392        subquery: Box<PlanNode>,
393        negated: bool,
394    },
395    /// Scalar subquery (returns single value)
396    ScalarSubquery(Box<PlanNode>),
397    /// Placeholder for parameterized queries ($1, $2, etc.)
398    Placeholder(usize),
399}
400
401/// Literal values in plan.
402#[derive(Debug, Clone)]
403pub enum PlanLiteral {
404    Null,
405    Boolean(bool),
406    Integer(i64),
407    Float(f64),
408    String(String),
409}
410
411/// Binary operators in plan.
412#[derive(Debug, Clone, Copy, PartialEq, Eq)]
413pub enum PlanBinaryOp {
414    Add,
415    Subtract,
416    Multiply,
417    Divide,
418    Modulo,
419    Equal,
420    NotEqual,
421    LessThan,
422    LessThanOrEqual,
423    GreaterThan,
424    GreaterThanOrEqual,
425    And,
426    Or,
427    Concat,
428}
429
430/// Unary operators in plan.
431#[derive(Debug, Clone, Copy, PartialEq, Eq)]
432pub enum PlanUnaryOp {
433    Not,
434    Negative,
435}
436
437// =============================================================================
438// Schema Information
439// =============================================================================
440
441/// Schema information for planning.
442#[derive(Debug, Clone, Default)]
443pub struct PlannerSchema {
444    tables: HashMap<String, TableInfo>,
445    indexes: HashMap<String, Vec<IndexInfo>>,
446}
447
448/// Table metadata for planning.
449#[derive(Debug, Clone)]
450pub struct TableInfo {
451    pub name: String,
452    pub columns: Vec<ColumnInfo>,
453    pub row_count: u64,
454}
455
456/// Column metadata for planning.
457#[derive(Debug, Clone)]
458pub struct ColumnInfo {
459    pub name: String,
460    pub data_type: DataType,
461    pub nullable: bool,
462}
463
464/// Index metadata for planning.
465#[derive(Debug, Clone)]
466pub struct IndexInfo {
467    pub name: String,
468    pub columns: Vec<String>,
469    pub unique: bool,
470}
471
472impl PlannerSchema {
473    pub fn new() -> Self {
474        Self::default()
475    }
476
477    pub fn add_table(&mut self, table: TableInfo) {
478        self.tables.insert(table.name.clone(), table);
479    }
480
481    pub fn add_index(&mut self, table: &str, index: IndexInfo) {
482        self.indexes
483            .entry(table.to_string())
484            .or_default()
485            .push(index);
486    }
487
488    pub fn get_table(&self, name: &str) -> Option<&TableInfo> {
489        self.tables.get(name)
490    }
491
492    pub fn get_indexes(&self, table: &str) -> Option<&Vec<IndexInfo>> {
493        self.indexes.get(table)
494    }
495}
496
497// =============================================================================
498// Planner
499// =============================================================================
500
501/// Query planner that transforms AST into execution plans.
502pub struct Planner {
503    schema: Arc<PlannerSchema>,
504}
505
506impl Planner {
507    pub fn new(schema: Arc<PlannerSchema>) -> Self {
508        Self { schema }
509    }
510
511    /// Plan a statement.
512    pub fn plan(&self, statement: &Statement) -> PlannerResult<QueryPlan> {
513        match statement {
514            Statement::Select(select) => self.plan_select(select),
515            Statement::Insert(insert) => self.plan_insert(insert),
516            Statement::Update(update) => self.plan_update(update),
517            Statement::Delete(delete) => self.plan_delete(delete),
518            Statement::CreateTable(create) => self.plan_create_table(create),
519            Statement::DropTable(drop) => self.plan_drop_table(drop),
520            Statement::AlterTable(alter) => self.plan_alter_table(alter),
521            Statement::CreateIndex(create) => self.plan_create_index(create),
522            Statement::DropIndex(drop) => self.plan_drop_index(drop),
523            Statement::Begin | Statement::Commit | Statement::Rollback => {
524                // Transaction control statements return empty plan
525                Ok(QueryPlan {
526                    root: PlanNode::Empty,
527                    estimated_cost: 0.0,
528                    estimated_rows: 0,
529                })
530            }
531        }
532    }
533
534    /// Plan a CREATE TABLE statement.
535    fn plan_create_table(&self, create: &crate::ast::CreateTableStatement) -> PlannerResult<QueryPlan> {
536        let mut columns: Vec<CreateColumnDef> = Vec::new();
537        for col in &create.columns {
538            let primary_key = col.constraints.iter().any(|c| matches!(c, crate::ast::ColumnConstraint::PrimaryKey));
539            let unique = col.constraints.iter().any(|c| matches!(c, crate::ast::ColumnConstraint::Unique));
540
541            let default = match &col.default {
542                Some(e) => Some(self.plan_expression(e)?),
543                None => None,
544            };
545
546            columns.push(CreateColumnDef {
547                name: col.name.clone(),
548                data_type: col.data_type.clone(),
549                nullable: col.nullable,
550                default,
551                primary_key,
552                unique,
553            });
554        }
555
556        let constraints: Vec<CreateTableConstraint> = create.constraints.iter().map(|c| {
557            match c {
558                crate::ast::TableConstraint::PrimaryKey { columns } => {
559                    Ok(CreateTableConstraint::PrimaryKey { columns: columns.clone() })
560                }
561                crate::ast::TableConstraint::Unique { columns } => {
562                    Ok(CreateTableConstraint::Unique { columns: columns.clone() })
563                }
564                crate::ast::TableConstraint::ForeignKey { columns, ref_table, ref_columns } => {
565                    Ok(CreateTableConstraint::ForeignKey {
566                        columns: columns.clone(),
567                        ref_table: ref_table.clone(),
568                        ref_columns: ref_columns.clone(),
569                    })
570                }
571                crate::ast::TableConstraint::Check { expression } => {
572                    Ok(CreateTableConstraint::Check {
573                        expression: self.plan_expression(expression)?,
574                    })
575                }
576            }
577        }).collect::<PlannerResult<Vec<_>>>()?;
578
579        Ok(QueryPlan {
580            root: PlanNode::CreateTable(CreateTableNode {
581                table_name: create.name.clone(),
582                columns,
583                constraints,
584                if_not_exists: create.if_not_exists,
585            }),
586            estimated_cost: 1.0,
587            estimated_rows: 0,
588        })
589    }
590
591    /// Plan a DROP TABLE statement.
592    fn plan_drop_table(&self, drop: &crate::ast::DropTableStatement) -> PlannerResult<QueryPlan> {
593        Ok(QueryPlan {
594            root: PlanNode::DropTable(DropTableNode {
595                table_name: drop.name.clone(),
596                if_exists: drop.if_exists,
597            }),
598            estimated_cost: 1.0,
599            estimated_rows: 0,
600        })
601    }
602
603    /// Plan an ALTER TABLE statement.
604    fn plan_alter_table(&self, alter: &crate::ast::AlterTableStatement) -> PlannerResult<QueryPlan> {
605        let operations = alter.operations.iter().map(|op| {
606            match op {
607                crate::ast::AlterTableOperation::AddColumn(col) => {
608                    Ok(PlanAlterOperation::AddColumn(CreateColumnDef {
609                        name: col.name.clone(),
610                        data_type: col.data_type.clone(),
611                        nullable: col.nullable,
612                        default: col.default.as_ref().map(|e| self.plan_expression(e)).transpose()?,
613                        primary_key: false,
614                        unique: false,
615                    }))
616                }
617                crate::ast::AlterTableOperation::DropColumn { name, if_exists } => {
618                    Ok(PlanAlterOperation::DropColumn {
619                        name: name.clone(),
620                        if_exists: *if_exists,
621                    })
622                }
623                crate::ast::AlterTableOperation::RenameColumn { old_name, new_name } => {
624                    Ok(PlanAlterOperation::RenameColumn {
625                        old_name: old_name.clone(),
626                        new_name: new_name.clone(),
627                    })
628                }
629                crate::ast::AlterTableOperation::AlterColumn { name, data_type, set_not_null, set_default } => {
630                    let default_expr = match set_default {
631                        Some(Some(expr)) => Some(Some(self.plan_expression(expr)?)),
632                        Some(None) => Some(None),
633                        None => None,
634                    };
635                    Ok(PlanAlterOperation::AlterColumn {
636                        name: name.clone(),
637                        data_type: data_type.clone(),
638                        set_not_null: *set_not_null,
639                        set_default: default_expr,
640                    })
641                }
642                crate::ast::AlterTableOperation::RenameTable { new_name } => {
643                    Ok(PlanAlterOperation::RenameTable {
644                        new_name: new_name.clone(),
645                    })
646                }
647                crate::ast::AlterTableOperation::AddConstraint(constraint) => {
648                    let plan_constraint = match constraint {
649                        crate::ast::TableConstraint::PrimaryKey { columns } => {
650                            CreateTableConstraint::PrimaryKey { columns: columns.clone() }
651                        }
652                        crate::ast::TableConstraint::Unique { columns } => {
653                            CreateTableConstraint::Unique { columns: columns.clone() }
654                        }
655                        crate::ast::TableConstraint::ForeignKey { columns, ref_table, ref_columns } => {
656                            CreateTableConstraint::ForeignKey {
657                                columns: columns.clone(),
658                                ref_table: ref_table.clone(),
659                                ref_columns: ref_columns.clone(),
660                            }
661                        }
662                        crate::ast::TableConstraint::Check { expression } => {
663                            CreateTableConstraint::Check {
664                                expression: self.plan_expression(expression)?,
665                            }
666                        }
667                    };
668                    Ok(PlanAlterOperation::AddConstraint(plan_constraint))
669                }
670                crate::ast::AlterTableOperation::DropConstraint { name } => {
671                    Ok(PlanAlterOperation::DropConstraint {
672                        name: name.clone(),
673                    })
674                }
675            }
676        }).collect::<PlannerResult<Vec<_>>>()?;
677
678        Ok(QueryPlan {
679            root: PlanNode::AlterTable(AlterTableNode {
680                table_name: alter.name.clone(),
681                operations,
682            }),
683            estimated_cost: 1.0,
684            estimated_rows: 0,
685        })
686    }
687
688    /// Plan a CREATE INDEX statement.
689    fn plan_create_index(&self, create: &crate::ast::CreateIndexStatement) -> PlannerResult<QueryPlan> {
690        Ok(QueryPlan {
691            root: PlanNode::CreateIndex(CreateIndexNode {
692                index_name: create.name.clone(),
693                table_name: create.table.clone(),
694                columns: create.columns.clone(),
695                unique: create.unique,
696                if_not_exists: create.if_not_exists,
697            }),
698            estimated_cost: 1.0,
699            estimated_rows: 0,
700        })
701    }
702
703    /// Plan a DROP INDEX statement.
704    fn plan_drop_index(&self, drop: &crate::ast::DropIndexStatement) -> PlannerResult<QueryPlan> {
705        Ok(QueryPlan {
706            root: PlanNode::DropIndex(DropIndexNode {
707                index_name: drop.name.clone(),
708                if_exists: drop.if_exists,
709            }),
710            estimated_cost: 1.0,
711            estimated_rows: 0,
712        })
713    }
714
715    /// Plan an INSERT statement.
716    fn plan_insert(&self, insert: &crate::ast::InsertStatement) -> PlannerResult<QueryPlan> {
717        let columns = insert.columns.clone().unwrap_or_default();
718
719        let (source, estimated_rows) = match &insert.source {
720            crate::ast::InsertSource::Values(rows) => {
721                let values = rows.iter().map(|row| {
722                    row.iter().map(|expr| self.plan_expression(expr)).collect::<PlannerResult<Vec<_>>>()
723                }).collect::<PlannerResult<Vec<_>>>()?;
724                let num_rows = values.len() as u64;
725                (InsertPlanSource::Values(values), num_rows)
726            }
727            crate::ast::InsertSource::Query(select) => {
728                // Plan the SELECT subquery
729                let subquery_plan = self.plan_select(select)?;
730                let estimated_rows = subquery_plan.estimated_rows;
731                (InsertPlanSource::Query(Box::new(subquery_plan.root)), estimated_rows)
732            }
733        };
734
735        Ok(QueryPlan {
736            root: PlanNode::Insert(InsertNode {
737                table_name: insert.table.clone(),
738                columns,
739                source,
740            }),
741            estimated_cost: estimated_rows as f64,
742            estimated_rows,
743        })
744    }
745
746    /// Plan an UPDATE statement.
747    fn plan_update(&self, update: &crate::ast::UpdateStatement) -> PlannerResult<QueryPlan> {
748        let assignments: Vec<(String, PlanExpression)> = update.assignments.iter()
749            .map(|a| Ok((a.column.clone(), self.plan_expression(&a.value)?)))
750            .collect::<PlannerResult<Vec<_>>>()?;
751
752        let where_clause = update.where_clause.as_ref()
753            .map(|e| self.plan_expression(e))
754            .transpose()?;
755
756        // Estimate rows based on table info
757        let estimated_rows = self.schema.get_table(&update.table)
758            .map(|t| t.row_count)
759            .unwrap_or(100);
760
761        Ok(QueryPlan {
762            root: PlanNode::Update(UpdateNode {
763                table_name: update.table.clone(),
764                assignments,
765                where_clause,
766            }),
767            estimated_cost: estimated_rows as f64,
768            estimated_rows,
769        })
770    }
771
772    /// Plan a DELETE statement.
773    fn plan_delete(&self, delete: &crate::ast::DeleteStatement) -> PlannerResult<QueryPlan> {
774        let where_clause = delete.where_clause.as_ref()
775            .map(|e| self.plan_expression(e))
776            .transpose()?;
777
778        // Estimate rows based on table info
779        let estimated_rows = self.schema.get_table(&delete.table)
780            .map(|t| t.row_count)
781            .unwrap_or(100);
782
783        Ok(QueryPlan {
784            root: PlanNode::Delete(DeleteNode {
785                table_name: delete.table.clone(),
786                where_clause,
787            }),
788            estimated_cost: estimated_rows as f64,
789            estimated_rows,
790        })
791    }
792
793    /// Plan a SELECT statement.
794    fn plan_select(&self, select: &SelectStatement) -> PlannerResult<QueryPlan> {
795        let mut plan = self.plan_from_clause(&select.from)?;
796
797        if let Some(ref where_clause) = select.where_clause {
798            let predicate = self.plan_expression(where_clause)?;
799            plan = PlanNode::Filter(FilterNode {
800                input: Box::new(plan),
801                predicate,
802            });
803        }
804
805        if !select.group_by.is_empty() || self.has_aggregates(&select.columns) {
806            plan = self.plan_aggregation(plan, select)?;
807        }
808
809        if let Some(ref having) = select.having {
810            let predicate = self.plan_expression(having)?;
811            plan = PlanNode::Filter(FilterNode {
812                input: Box::new(plan),
813                predicate,
814            });
815        }
816
817        plan = self.plan_projection(plan, &select.columns)?;
818
819        if !select.order_by.is_empty() {
820            let order_by: Vec<SortKey> = select
821                .order_by
822                .iter()
823                .map(|item| {
824                    Ok(SortKey {
825                        expr: self.plan_expression(&item.expression)?,
826                        ascending: item.ascending,
827                        nulls_first: item.nulls_first.unwrap_or(!item.ascending),
828                    })
829                })
830                .collect::<PlannerResult<Vec<_>>>()?;
831
832            plan = PlanNode::Sort(SortNode {
833                input: Box::new(plan),
834                order_by,
835            });
836        }
837
838        if select.limit.is_some() || select.offset.is_some() {
839            plan = PlanNode::Limit(LimitNode {
840                input: Box::new(plan),
841                limit: select.limit,
842                offset: select.offset,
843            });
844        }
845
846        let (estimated_cost, estimated_rows) = self.estimate_cost(&plan);
847
848        Ok(QueryPlan {
849            root: plan,
850            estimated_cost,
851            estimated_rows,
852        })
853    }
854
855    /// Plan FROM clause.
856    fn plan_from_clause(&self, from: &Option<FromClause>) -> PlannerResult<PlanNode> {
857        let from = match from {
858            Some(f) => f,
859            None => return Ok(PlanNode::Empty),
860        };
861
862        let mut plan = self.plan_table_reference(&from.source)?;
863
864        for join in &from.joins {
865            plan = self.plan_join(plan, join)?;
866        }
867
868        Ok(plan)
869    }
870
871    /// Plan a table reference.
872    fn plan_table_reference(&self, table_ref: &TableReference) -> PlannerResult<PlanNode> {
873        match table_ref {
874            TableReference::Table { name, alias } => {
875                let columns = if let Some(info) = self.schema.get_table(name) {
876                    info.columns.iter().map(|c| c.name.clone()).collect()
877                } else {
878                    Vec::new()
879                };
880
881                Ok(PlanNode::Scan(ScanNode {
882                    table_name: name.clone(),
883                    alias: alias.clone(),
884                    columns,
885                    index_scan: None,
886                }))
887            }
888            TableReference::Subquery { query, alias: _ } => self.plan_select(query).map(|p| p.root),
889        }
890    }
891
892    /// Plan a join.
893    fn plan_join(&self, left: PlanNode, join: &JoinClause) -> PlannerResult<PlanNode> {
894        let right = self.plan_table_reference(&join.table)?;
895        let condition = join
896            .condition
897            .as_ref()
898            .map(|c| self.plan_expression(c))
899            .transpose()?;
900
901        let join_type = match join.join_type {
902            JoinType::Inner => PlanJoinType::Inner,
903            JoinType::Left => PlanJoinType::Left,
904            JoinType::Right => PlanJoinType::Right,
905            JoinType::Full => PlanJoinType::Full,
906            JoinType::Cross => PlanJoinType::Cross,
907        };
908
909        let strategy = self.choose_join_strategy(&left, &right, &condition);
910
911        Ok(PlanNode::Join(JoinNode {
912            left: Box::new(left),
913            right: Box::new(right),
914            join_type,
915            condition,
916            strategy,
917        }))
918    }
919
920    /// Choose join strategy based on cost estimation.
921    fn choose_join_strategy(
922        &self,
923        _left: &PlanNode,
924        _right: &PlanNode,
925        condition: &Option<PlanExpression>,
926    ) -> JoinStrategy {
927        if condition.is_none() {
928            return JoinStrategy::NestedLoop;
929        }
930
931        JoinStrategy::HashJoin
932    }
933
934    /// Check if select columns contain aggregates.
935    fn has_aggregates(&self, columns: &[SelectColumn]) -> bool {
936        columns.iter().any(|col| match col {
937            SelectColumn::Expression { expr, .. } => self.expression_has_aggregate(expr),
938            _ => false,
939        })
940    }
941
942    /// Check if expression contains aggregate function.
943    fn expression_has_aggregate(&self, expr: &Expression) -> bool {
944        match expr {
945            Expression::Function { name, .. } => {
946                matches!(
947                    name.to_uppercase().as_str(),
948                    "COUNT" | "SUM" | "AVG" | "MIN" | "MAX"
949                )
950            }
951            Expression::BinaryOp { left, right, .. } => {
952                self.expression_has_aggregate(left) || self.expression_has_aggregate(right)
953            }
954            _ => false,
955        }
956    }
957
958    /// Plan aggregation.
959    fn plan_aggregation(
960        &self,
961        input: PlanNode,
962        select: &SelectStatement,
963    ) -> PlannerResult<PlanNode> {
964        let group_by: Vec<PlanExpression> = select
965            .group_by
966            .iter()
967            .map(|e| self.plan_expression(e))
968            .collect::<PlannerResult<Vec<_>>>()?;
969
970        let mut aggregates = Vec::new();
971        for col in &select.columns {
972            if let SelectColumn::Expression { expr, alias } = col {
973                if let Some(agg) = self.extract_aggregate(expr, alias.clone())? {
974                    aggregates.push(agg);
975                }
976            }
977        }
978
979        Ok(PlanNode::Aggregate(AggregateNode {
980            input: Box::new(input),
981            group_by,
982            aggregates,
983        }))
984    }
985
986    /// Extract aggregate from expression.
987    fn extract_aggregate(
988        &self,
989        expr: &Expression,
990        alias: Option<String>,
991    ) -> PlannerResult<Option<AggregateExpr>> {
992        match expr {
993            Expression::Function {
994                name,
995                args,
996                distinct,
997            } => {
998                let func = match name.to_uppercase().as_str() {
999                    "COUNT" => AggregateFunction::Count,
1000                    "SUM" => AggregateFunction::Sum,
1001                    "AVG" => AggregateFunction::Avg,
1002                    "MIN" => AggregateFunction::Min,
1003                    "MAX" => AggregateFunction::Max,
1004                    _ => return Ok(None),
1005                };
1006
1007                let argument = if args.is_empty() {
1008                    None
1009                } else {
1010                    Some(self.plan_expression(&args[0])?)
1011                };
1012
1013                Ok(Some(AggregateExpr {
1014                    function: func,
1015                    argument,
1016                    distinct: *distinct,
1017                    alias,
1018                }))
1019            }
1020            _ => Ok(None),
1021        }
1022    }
1023
1024    /// Plan projection.
1025    fn plan_projection(
1026        &self,
1027        input: PlanNode,
1028        columns: &[SelectColumn],
1029    ) -> PlannerResult<PlanNode> {
1030        let mut expressions = Vec::new();
1031
1032        for col in columns {
1033            match col {
1034                SelectColumn::AllColumns => {
1035                    expressions.push(ProjectionExpr {
1036                        expr: PlanExpression::Column {
1037                            table: None,
1038                            name: "*".to_string(),
1039                            data_type: DataType::Any,
1040                        },
1041                        alias: None,
1042                    });
1043                }
1044                SelectColumn::TableAllColumns(table) => {
1045                    expressions.push(ProjectionExpr {
1046                        expr: PlanExpression::Column {
1047                            table: Some(table.clone()),
1048                            name: "*".to_string(),
1049                            data_type: DataType::Any,
1050                        },
1051                        alias: None,
1052                    });
1053                }
1054                SelectColumn::Expression { expr, alias } => {
1055                    expressions.push(ProjectionExpr {
1056                        expr: self.plan_expression(expr)?,
1057                        alias: alias.clone(),
1058                    });
1059                }
1060            }
1061        }
1062
1063        Ok(PlanNode::Project(ProjectNode {
1064            input: Box::new(input),
1065            expressions,
1066        }))
1067    }
1068
1069    /// Convert AST expression to plan expression.
1070    fn plan_expression(&self, expr: &Expression) -> PlannerResult<PlanExpression> {
1071        match expr {
1072            Expression::Literal(lit) => Ok(PlanExpression::Literal(match lit {
1073                crate::ast::Literal::Null => PlanLiteral::Null,
1074                crate::ast::Literal::Boolean(b) => PlanLiteral::Boolean(*b),
1075                crate::ast::Literal::Integer(i) => PlanLiteral::Integer(*i),
1076                crate::ast::Literal::Float(f) => PlanLiteral::Float(*f),
1077                crate::ast::Literal::String(s) => PlanLiteral::String(s.clone()),
1078            })),
1079
1080            Expression::Column(col_ref) => Ok(PlanExpression::Column {
1081                table: col_ref.table.clone(),
1082                name: col_ref.column.clone(),
1083                data_type: DataType::Any,
1084            }),
1085
1086            Expression::BinaryOp { left, op, right } => {
1087                let plan_op = match op {
1088                    BinaryOperator::Add => PlanBinaryOp::Add,
1089                    BinaryOperator::Subtract => PlanBinaryOp::Subtract,
1090                    BinaryOperator::Multiply => PlanBinaryOp::Multiply,
1091                    BinaryOperator::Divide => PlanBinaryOp::Divide,
1092                    BinaryOperator::Modulo => PlanBinaryOp::Modulo,
1093                    BinaryOperator::Equal => PlanBinaryOp::Equal,
1094                    BinaryOperator::NotEqual => PlanBinaryOp::NotEqual,
1095                    BinaryOperator::LessThan => PlanBinaryOp::LessThan,
1096                    BinaryOperator::LessThanOrEqual => PlanBinaryOp::LessThanOrEqual,
1097                    BinaryOperator::GreaterThan => PlanBinaryOp::GreaterThan,
1098                    BinaryOperator::GreaterThanOrEqual => PlanBinaryOp::GreaterThanOrEqual,
1099                    BinaryOperator::And => PlanBinaryOp::And,
1100                    BinaryOperator::Or => PlanBinaryOp::Or,
1101                    BinaryOperator::Concat => PlanBinaryOp::Concat,
1102                };
1103
1104                Ok(PlanExpression::BinaryOp {
1105                    left: Box::new(self.plan_expression(left)?),
1106                    op: plan_op,
1107                    right: Box::new(self.plan_expression(right)?),
1108                })
1109            }
1110
1111            Expression::UnaryOp { op, expr } => {
1112                let plan_op = match op {
1113                    crate::ast::UnaryOperator::Not => PlanUnaryOp::Not,
1114                    crate::ast::UnaryOperator::Negative => PlanUnaryOp::Negative,
1115                    crate::ast::UnaryOperator::Positive => {
1116                        return self.plan_expression(expr);
1117                    }
1118                };
1119
1120                Ok(PlanExpression::UnaryOp {
1121                    op: plan_op,
1122                    expr: Box::new(self.plan_expression(expr)?),
1123                })
1124            }
1125
1126            Expression::Function { name, args, .. } => {
1127                let planned_args: Vec<PlanExpression> = args
1128                    .iter()
1129                    .map(|a| self.plan_expression(a))
1130                    .collect::<PlannerResult<Vec<_>>>()?;
1131
1132                Ok(PlanExpression::Function {
1133                    name: name.clone(),
1134                    args: planned_args,
1135                    return_type: DataType::Any,
1136                })
1137            }
1138
1139            Expression::IsNull { expr, negated } => Ok(PlanExpression::IsNull {
1140                expr: Box::new(self.plan_expression(expr)?),
1141                negated: *negated,
1142            }),
1143
1144            Expression::Cast { expr, data_type } => Ok(PlanExpression::Cast {
1145                expr: Box::new(self.plan_expression(expr)?),
1146                target_type: data_type.clone(),
1147            }),
1148
1149            Expression::Case { operand, conditions, else_result } => {
1150                let planned_operand = operand
1151                    .as_ref()
1152                    .map(|e| self.plan_expression(e))
1153                    .transpose()?
1154                    .map(Box::new);
1155
1156                let planned_conditions = conditions
1157                    .iter()
1158                    .map(|(cond, result)| {
1159                        Ok((self.plan_expression(cond)?, self.plan_expression(result)?))
1160                    })
1161                    .collect::<PlannerResult<Vec<_>>>()?;
1162
1163                let planned_else = else_result
1164                    .as_ref()
1165                    .map(|e| self.plan_expression(e))
1166                    .transpose()?
1167                    .map(Box::new);
1168
1169                Ok(PlanExpression::Case {
1170                    operand: planned_operand,
1171                    conditions: planned_conditions,
1172                    else_result: planned_else,
1173                })
1174            }
1175
1176            Expression::InList { expr, list, negated } => {
1177                let planned_list = list
1178                    .iter()
1179                    .map(|e| self.plan_expression(e))
1180                    .collect::<PlannerResult<Vec<_>>>()?;
1181
1182                Ok(PlanExpression::InList {
1183                    expr: Box::new(self.plan_expression(expr)?),
1184                    list: planned_list,
1185                    negated: *negated,
1186                })
1187            }
1188
1189            Expression::Between { expr, low, high, negated } => Ok(PlanExpression::Between {
1190                expr: Box::new(self.plan_expression(expr)?),
1191                low: Box::new(self.plan_expression(low)?),
1192                high: Box::new(self.plan_expression(high)?),
1193                negated: *negated,
1194            }),
1195
1196            Expression::Like { expr, pattern, negated } => Ok(PlanExpression::Like {
1197                expr: Box::new(self.plan_expression(expr)?),
1198                pattern: Box::new(self.plan_expression(pattern)?),
1199                negated: *negated,
1200            }),
1201
1202            Expression::InSubquery { expr, subquery, negated } => {
1203                let subquery_plan = self.plan_select(subquery)?;
1204                Ok(PlanExpression::InSubquery {
1205                    expr: Box::new(self.plan_expression(expr)?),
1206                    subquery: Box::new(subquery_plan.root),
1207                    negated: *negated,
1208                })
1209            }
1210
1211            Expression::Exists { subquery, negated } => {
1212                let subquery_plan = self.plan_select(subquery)?;
1213                Ok(PlanExpression::Exists {
1214                    subquery: Box::new(subquery_plan.root),
1215                    negated: *negated,
1216                })
1217            }
1218
1219            Expression::Subquery(subquery) => {
1220                let subquery_plan = self.plan_select(subquery)?;
1221                Ok(PlanExpression::ScalarSubquery(Box::new(subquery_plan.root)))
1222            }
1223
1224            Expression::Placeholder(idx) => Ok(PlanExpression::Placeholder(*idx)),
1225        }
1226    }
1227
1228    /// Estimate cost and row count for a plan.
1229    fn estimate_cost(&self, node: &PlanNode) -> (f64, u64) {
1230        match node {
1231            PlanNode::Scan(scan) => {
1232                let rows = self
1233                    .schema
1234                    .get_table(&scan.table_name)
1235                    .map(|t| t.row_count)
1236                    .unwrap_or(1000);
1237                let cost = rows as f64 * 1.0;
1238                (cost, rows)
1239            }
1240
1241            PlanNode::Filter(filter) => {
1242                let (input_cost, input_rows) = self.estimate_cost(&filter.input);
1243                let selectivity = 0.1;
1244                let rows = (input_rows as f64 * selectivity) as u64;
1245                let cost = input_cost + input_rows as f64 * 0.1;
1246                (cost, rows.max(1))
1247            }
1248
1249            PlanNode::Project(project) => {
1250                let (input_cost, input_rows) = self.estimate_cost(&project.input);
1251                let cost = input_cost + input_rows as f64 * 0.01;
1252                (cost, input_rows)
1253            }
1254
1255            PlanNode::Join(join) => {
1256                let (left_cost, left_rows) = self.estimate_cost(&join.left);
1257                let (right_cost, right_rows) = self.estimate_cost(&join.right);
1258
1259                let rows = match join.join_type {
1260                    PlanJoinType::Cross => left_rows * right_rows,
1261                    _ => (left_rows + right_rows) / 2,
1262                };
1263
1264                let join_cost = match join.strategy {
1265                    JoinStrategy::NestedLoop => left_rows as f64 * right_rows as f64,
1266                    JoinStrategy::HashJoin => {
1267                        left_rows as f64 + right_rows as f64 + right_rows as f64 * 1.2
1268                    }
1269                    JoinStrategy::MergeJoin => {
1270                        left_rows as f64 * 2.0_f64.log2()
1271                            + right_rows as f64 * 2.0_f64.log2()
1272                            + (left_rows + right_rows) as f64
1273                    }
1274                };
1275
1276                (left_cost + right_cost + join_cost, rows)
1277            }
1278
1279            PlanNode::Aggregate(agg) => {
1280                let (input_cost, input_rows) = self.estimate_cost(&agg.input);
1281                let groups = if agg.group_by.is_empty() {
1282                    1
1283                } else {
1284                    (input_rows as f64).sqrt() as u64
1285                };
1286                let cost = input_cost + input_rows as f64 * 0.5;
1287                (cost, groups.max(1))
1288            }
1289
1290            PlanNode::Sort(sort) => {
1291                let (input_cost, input_rows) = self.estimate_cost(&sort.input);
1292                let sort_cost = input_rows as f64 * (input_rows as f64).log2();
1293                (input_cost + sort_cost, input_rows)
1294            }
1295
1296            PlanNode::Limit(limit) => {
1297                let (input_cost, input_rows) = self.estimate_cost(&limit.input);
1298                let rows = limit.limit.unwrap_or(input_rows).min(input_rows);
1299                (input_cost, rows)
1300            }
1301
1302            PlanNode::Empty => (0.0, 1),
1303
1304            // DDL operations have minimal cost
1305            PlanNode::CreateTable(_) => (1.0, 0),
1306            PlanNode::DropTable(_) => (1.0, 0),
1307            PlanNode::AlterTable(_) => (1.0, 0),
1308            PlanNode::CreateIndex(_) => (1.0, 0),
1309            PlanNode::DropIndex(_) => (1.0, 0),
1310
1311            // DML operations
1312            PlanNode::Insert(insert) => {
1313                let rows = match &insert.source {
1314                    InsertPlanSource::Values(values) => values.len() as u64,
1315                    InsertPlanSource::Query(subquery) => {
1316                        let (_, estimated_rows) = self.estimate_cost(subquery);
1317                        estimated_rows
1318                    }
1319                };
1320                (rows as f64, rows)
1321            }
1322            PlanNode::Update(update) => {
1323                let rows = self.schema.get_table(&update.table_name)
1324                    .map(|t| t.row_count)
1325                    .unwrap_or(100);
1326                (rows as f64, rows)
1327            }
1328            PlanNode::Delete(delete) => {
1329                let rows = self.schema.get_table(&delete.table_name)
1330                    .map(|t| t.row_count)
1331                    .unwrap_or(100);
1332                (rows as f64, rows)
1333            }
1334        }
1335    }
1336}
1337
1338// =============================================================================
1339// Tests
1340// =============================================================================
1341
1342#[cfg(test)]
1343mod tests {
1344    use super::*;
1345    use crate::ast::{ColumnRef, FromClause, Literal, OrderByItem, TableReference};
1346
1347    fn create_test_schema() -> Arc<PlannerSchema> {
1348        let mut schema = PlannerSchema::new();
1349
1350        schema.add_table(TableInfo {
1351            name: "users".to_string(),
1352            columns: vec![
1353                ColumnInfo {
1354                    name: "id".to_string(),
1355                    data_type: DataType::Integer,
1356                    nullable: false,
1357                },
1358                ColumnInfo {
1359                    name: "name".to_string(),
1360                    data_type: DataType::Text,
1361                    nullable: false,
1362                },
1363                ColumnInfo {
1364                    name: "email".to_string(),
1365                    data_type: DataType::Text,
1366                    nullable: true,
1367                },
1368            ],
1369            row_count: 10000,
1370        });
1371
1372        schema.add_table(TableInfo {
1373            name: "orders".to_string(),
1374            columns: vec![
1375                ColumnInfo {
1376                    name: "id".to_string(),
1377                    data_type: DataType::Integer,
1378                    nullable: false,
1379                },
1380                ColumnInfo {
1381                    name: "user_id".to_string(),
1382                    data_type: DataType::Integer,
1383                    nullable: false,
1384                },
1385                ColumnInfo {
1386                    name: "amount".to_string(),
1387                    data_type: DataType::Float,
1388                    nullable: false,
1389                },
1390            ],
1391            row_count: 50000,
1392        });
1393
1394        Arc::new(schema)
1395    }
1396
1397    #[test]
1398    fn test_plan_simple_select() {
1399        let schema = create_test_schema();
1400        let planner = Planner::new(schema);
1401
1402        let select = SelectStatement {
1403            distinct: false,
1404            columns: vec![SelectColumn::AllColumns],
1405            from: Some(FromClause {
1406                source: TableReference::Table {
1407                    name: "users".to_string(),
1408                    alias: None,
1409                },
1410                joins: vec![],
1411            }),
1412            where_clause: None,
1413            group_by: vec![],
1414            having: None,
1415            order_by: vec![],
1416            limit: None,
1417            offset: None,
1418        };
1419
1420        let plan = planner.plan(&Statement::Select(select)).unwrap();
1421
1422        assert!(plan.estimated_rows > 0);
1423        assert!(plan.estimated_cost > 0.0);
1424    }
1425
1426    #[test]
1427    fn test_plan_select_with_filter() {
1428        let schema = create_test_schema();
1429        let planner = Planner::new(schema);
1430
1431        let select = SelectStatement {
1432            distinct: false,
1433            columns: vec![SelectColumn::AllColumns],
1434            from: Some(FromClause {
1435                source: TableReference::Table {
1436                    name: "users".to_string(),
1437                    alias: None,
1438                },
1439                joins: vec![],
1440            }),
1441            where_clause: Some(Expression::BinaryOp {
1442                left: Box::new(Expression::Column(ColumnRef {
1443                    table: None,
1444                    column: "id".to_string(),
1445                })),
1446                op: BinaryOperator::Equal,
1447                right: Box::new(Expression::Literal(Literal::Integer(1))),
1448            }),
1449            group_by: vec![],
1450            having: None,
1451            order_by: vec![],
1452            limit: None,
1453            offset: None,
1454        };
1455
1456        let plan = planner.plan(&Statement::Select(select)).unwrap();
1457
1458        match &plan.root {
1459            PlanNode::Project(p) => match p.input.as_ref() {
1460                PlanNode::Filter(_) => {}
1461                _ => panic!("Expected filter node"),
1462            },
1463            _ => panic!("Expected project node"),
1464        }
1465    }
1466
1467    #[test]
1468    fn test_plan_select_with_order_by() {
1469        let schema = create_test_schema();
1470        let planner = Planner::new(schema);
1471
1472        let select = SelectStatement {
1473            distinct: false,
1474            columns: vec![SelectColumn::AllColumns],
1475            from: Some(FromClause {
1476                source: TableReference::Table {
1477                    name: "users".to_string(),
1478                    alias: None,
1479                },
1480                joins: vec![],
1481            }),
1482            where_clause: None,
1483            group_by: vec![],
1484            having: None,
1485            order_by: vec![OrderByItem {
1486                expression: Expression::Column(ColumnRef {
1487                    table: None,
1488                    column: "name".to_string(),
1489                }),
1490                ascending: true,
1491                nulls_first: None,
1492            }],
1493            limit: Some(10),
1494            offset: None,
1495        };
1496
1497        let plan = planner.plan(&Statement::Select(select)).unwrap();
1498
1499        match &plan.root {
1500            PlanNode::Limit(l) => match l.input.as_ref() {
1501                PlanNode::Sort(_) => {}
1502                _ => panic!("Expected sort node"),
1503            },
1504            _ => panic!("Expected limit node"),
1505        }
1506    }
1507}