1use crate::ast::{
16 BinaryOperator, Expression, FromClause, JoinClause, JoinType,
17 SelectColumn, SelectStatement, Statement, TableReference,
18};
19use aegis_common::DataType;
20use std::collections::HashMap;
21use std::sync::Arc;
22use thiserror::Error;
23
24#[derive(Debug, Error)]
29pub enum PlannerError {
30 #[error("Table not found: {0}")]
31 TableNotFound(String),
32
33 #[error("Column not found: {0}")]
34 ColumnNotFound(String),
35
36 #[error("Unsupported operation: {0}")]
37 UnsupportedOperation(String),
38
39 #[error("Internal planner error: {0}")]
40 Internal(String),
41}
42
43pub type PlannerResult<T> = Result<T, PlannerError>;
44
45#[derive(Debug, Clone)]
51pub struct QueryPlan {
52 pub root: PlanNode,
53 pub estimated_cost: f64,
54 pub estimated_rows: u64,
55}
56
57#[derive(Debug, Clone)]
59pub enum PlanNode {
60 Scan(ScanNode),
61 Filter(FilterNode),
62 Project(ProjectNode),
63 Join(JoinNode),
64 Aggregate(AggregateNode),
65 Sort(SortNode),
66 Limit(LimitNode),
67 Empty,
68 CreateTable(CreateTableNode),
70 DropTable(DropTableNode),
71 AlterTable(AlterTableNode),
72 CreateIndex(CreateIndexNode),
73 DropIndex(DropIndexNode),
74 Insert(InsertNode),
76 Update(UpdateNode),
77 Delete(DeleteNode),
78}
79
80#[derive(Debug, Clone)]
86pub struct ScanNode {
87 pub table_name: String,
88 pub alias: Option<String>,
89 pub columns: Vec<String>,
90 pub index_scan: Option<IndexScan>,
91}
92
93#[derive(Debug, Clone)]
95pub struct IndexScan {
96 pub index_name: String,
97 pub key_range: KeyRange,
98}
99
100#[derive(Debug, Clone)]
102pub struct KeyRange {
103 pub start: Option<PlanExpression>,
104 pub start_inclusive: bool,
105 pub end: Option<PlanExpression>,
106 pub end_inclusive: bool,
107}
108
109#[derive(Debug, Clone)]
111pub struct FilterNode {
112 pub input: Box<PlanNode>,
113 pub predicate: PlanExpression,
114}
115
116#[derive(Debug, Clone)]
118pub struct ProjectNode {
119 pub input: Box<PlanNode>,
120 pub expressions: Vec<ProjectionExpr>,
121}
122
123#[derive(Debug, Clone)]
125pub struct ProjectionExpr {
126 pub expr: PlanExpression,
127 pub alias: Option<String>,
128}
129
130#[derive(Debug, Clone)]
132pub struct JoinNode {
133 pub left: Box<PlanNode>,
134 pub right: Box<PlanNode>,
135 pub join_type: PlanJoinType,
136 pub condition: Option<PlanExpression>,
137 pub strategy: JoinStrategy,
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
142pub enum PlanJoinType {
143 Inner,
144 Left,
145 Right,
146 Full,
147 Cross,
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152pub enum JoinStrategy {
153 NestedLoop,
154 HashJoin,
155 MergeJoin,
156}
157
158#[derive(Debug, Clone)]
160pub struct AggregateNode {
161 pub input: Box<PlanNode>,
162 pub group_by: Vec<PlanExpression>,
163 pub aggregates: Vec<AggregateExpr>,
164}
165
166#[derive(Debug, Clone)]
168pub struct AggregateExpr {
169 pub function: AggregateFunction,
170 pub argument: Option<PlanExpression>,
171 pub distinct: bool,
172 pub alias: Option<String>,
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub enum AggregateFunction {
178 Count,
179 Sum,
180 Avg,
181 Min,
182 Max,
183}
184
185#[derive(Debug, Clone)]
187pub struct SortNode {
188 pub input: Box<PlanNode>,
189 pub order_by: Vec<SortKey>,
190}
191
192#[derive(Debug, Clone)]
194pub struct SortKey {
195 pub expr: PlanExpression,
196 pub ascending: bool,
197 pub nulls_first: bool,
198}
199
200#[derive(Debug, Clone)]
202pub struct LimitNode {
203 pub input: Box<PlanNode>,
204 pub limit: Option<u64>,
205 pub offset: Option<u64>,
206}
207
208#[derive(Debug, Clone)]
214pub struct CreateTableNode {
215 pub table_name: String,
216 pub columns: Vec<CreateColumnDef>,
217 pub constraints: Vec<CreateTableConstraint>,
218 pub if_not_exists: bool,
219}
220
221#[derive(Debug, Clone)]
223pub struct CreateColumnDef {
224 pub name: String,
225 pub data_type: DataType,
226 pub nullable: bool,
227 pub default: Option<PlanExpression>,
228 pub primary_key: bool,
229 pub unique: bool,
230}
231
232#[derive(Debug, Clone)]
234pub enum CreateTableConstraint {
235 PrimaryKey { columns: Vec<String> },
236 Unique { columns: Vec<String> },
237 ForeignKey {
238 columns: Vec<String>,
239 ref_table: String,
240 ref_columns: Vec<String>,
241 },
242 Check { expression: PlanExpression },
243}
244
245#[derive(Debug, Clone)]
247pub struct DropTableNode {
248 pub table_name: String,
249 pub if_exists: bool,
250}
251
252#[derive(Debug, Clone)]
254pub struct AlterTableNode {
255 pub table_name: String,
256 pub operations: Vec<PlanAlterOperation>,
257}
258
259#[derive(Debug, Clone)]
261pub enum PlanAlterOperation {
262 AddColumn(CreateColumnDef),
263 DropColumn { name: String, if_exists: bool },
264 RenameColumn { old_name: String, new_name: String },
265 AlterColumn { name: String, data_type: Option<DataType>, set_not_null: Option<bool>, set_default: Option<Option<PlanExpression>> },
266 RenameTable { new_name: String },
267 AddConstraint(CreateTableConstraint),
268 DropConstraint { name: String },
269}
270
271#[derive(Debug, Clone)]
273pub struct CreateIndexNode {
274 pub index_name: String,
275 pub table_name: String,
276 pub columns: Vec<String>,
277 pub unique: bool,
278 pub if_not_exists: bool,
279}
280
281#[derive(Debug, Clone)]
283pub struct DropIndexNode {
284 pub index_name: String,
285 pub if_exists: bool,
286}
287
288#[derive(Debug, Clone)]
294pub enum InsertPlanSource {
295 Values(Vec<Vec<PlanExpression>>),
297 Query(Box<PlanNode>),
299}
300
301#[derive(Debug, Clone)]
303pub struct InsertNode {
304 pub table_name: String,
305 pub columns: Vec<String>,
306 pub source: InsertPlanSource,
307}
308
309#[derive(Debug, Clone)]
311pub struct UpdateNode {
312 pub table_name: String,
313 pub assignments: Vec<(String, PlanExpression)>,
314 pub where_clause: Option<PlanExpression>,
315}
316
317#[derive(Debug, Clone)]
319pub struct DeleteNode {
320 pub table_name: String,
321 pub where_clause: Option<PlanExpression>,
322}
323
324#[derive(Debug, Clone)]
330pub enum PlanExpression {
331 Column {
332 table: Option<String>,
333 name: String,
334 data_type: DataType,
335 },
336 Literal(PlanLiteral),
337 BinaryOp {
338 left: Box<PlanExpression>,
339 op: PlanBinaryOp,
340 right: Box<PlanExpression>,
341 },
342 UnaryOp {
343 op: PlanUnaryOp,
344 expr: Box<PlanExpression>,
345 },
346 Function {
347 name: String,
348 args: Vec<PlanExpression>,
349 return_type: DataType,
350 },
351 Cast {
352 expr: Box<PlanExpression>,
353 target_type: DataType,
354 },
355 IsNull {
356 expr: Box<PlanExpression>,
357 negated: bool,
358 },
359 Case {
361 operand: Option<Box<PlanExpression>>,
362 conditions: Vec<(PlanExpression, PlanExpression)>,
363 else_result: Option<Box<PlanExpression>>,
364 },
365 InList {
367 expr: Box<PlanExpression>,
368 list: Vec<PlanExpression>,
369 negated: bool,
370 },
371 Between {
373 expr: Box<PlanExpression>,
374 low: Box<PlanExpression>,
375 high: Box<PlanExpression>,
376 negated: bool,
377 },
378 Like {
380 expr: Box<PlanExpression>,
381 pattern: Box<PlanExpression>,
382 negated: bool,
383 },
384 InSubquery {
386 expr: Box<PlanExpression>,
387 subquery: Box<PlanNode>,
388 negated: bool,
389 },
390 Exists {
392 subquery: Box<PlanNode>,
393 negated: bool,
394 },
395 ScalarSubquery(Box<PlanNode>),
397 Placeholder(usize),
399}
400
401#[derive(Debug, Clone)]
403pub enum PlanLiteral {
404 Null,
405 Boolean(bool),
406 Integer(i64),
407 Float(f64),
408 String(String),
409}
410
411#[derive(Debug, Clone, Copy, PartialEq, Eq)]
413pub enum PlanBinaryOp {
414 Add,
415 Subtract,
416 Multiply,
417 Divide,
418 Modulo,
419 Equal,
420 NotEqual,
421 LessThan,
422 LessThanOrEqual,
423 GreaterThan,
424 GreaterThanOrEqual,
425 And,
426 Or,
427 Concat,
428}
429
430#[derive(Debug, Clone, Copy, PartialEq, Eq)]
432pub enum PlanUnaryOp {
433 Not,
434 Negative,
435}
436
437#[derive(Debug, Clone, Default)]
443pub struct PlannerSchema {
444 tables: HashMap<String, TableInfo>,
445 indexes: HashMap<String, Vec<IndexInfo>>,
446}
447
448#[derive(Debug, Clone)]
450pub struct TableInfo {
451 pub name: String,
452 pub columns: Vec<ColumnInfo>,
453 pub row_count: u64,
454}
455
456#[derive(Debug, Clone)]
458pub struct ColumnInfo {
459 pub name: String,
460 pub data_type: DataType,
461 pub nullable: bool,
462}
463
464#[derive(Debug, Clone)]
466pub struct IndexInfo {
467 pub name: String,
468 pub columns: Vec<String>,
469 pub unique: bool,
470}
471
472impl PlannerSchema {
473 pub fn new() -> Self {
474 Self::default()
475 }
476
477 pub fn add_table(&mut self, table: TableInfo) {
478 self.tables.insert(table.name.clone(), table);
479 }
480
481 pub fn add_index(&mut self, table: &str, index: IndexInfo) {
482 self.indexes
483 .entry(table.to_string())
484 .or_default()
485 .push(index);
486 }
487
488 pub fn get_table(&self, name: &str) -> Option<&TableInfo> {
489 self.tables.get(name)
490 }
491
492 pub fn get_indexes(&self, table: &str) -> Option<&Vec<IndexInfo>> {
493 self.indexes.get(table)
494 }
495}
496
497pub struct Planner {
503 schema: Arc<PlannerSchema>,
504}
505
506impl Planner {
507 pub fn new(schema: Arc<PlannerSchema>) -> Self {
508 Self { schema }
509 }
510
511 pub fn plan(&self, statement: &Statement) -> PlannerResult<QueryPlan> {
513 match statement {
514 Statement::Select(select) => self.plan_select(select),
515 Statement::Insert(insert) => self.plan_insert(insert),
516 Statement::Update(update) => self.plan_update(update),
517 Statement::Delete(delete) => self.plan_delete(delete),
518 Statement::CreateTable(create) => self.plan_create_table(create),
519 Statement::DropTable(drop) => self.plan_drop_table(drop),
520 Statement::AlterTable(alter) => self.plan_alter_table(alter),
521 Statement::CreateIndex(create) => self.plan_create_index(create),
522 Statement::DropIndex(drop) => self.plan_drop_index(drop),
523 Statement::Begin | Statement::Commit | Statement::Rollback => {
524 Ok(QueryPlan {
526 root: PlanNode::Empty,
527 estimated_cost: 0.0,
528 estimated_rows: 0,
529 })
530 }
531 }
532 }
533
534 fn plan_create_table(&self, create: &crate::ast::CreateTableStatement) -> PlannerResult<QueryPlan> {
536 let mut columns: Vec<CreateColumnDef> = Vec::new();
537 for col in &create.columns {
538 let primary_key = col.constraints.iter().any(|c| matches!(c, crate::ast::ColumnConstraint::PrimaryKey));
539 let unique = col.constraints.iter().any(|c| matches!(c, crate::ast::ColumnConstraint::Unique));
540
541 let default = match &col.default {
542 Some(e) => Some(self.plan_expression(e)?),
543 None => None,
544 };
545
546 columns.push(CreateColumnDef {
547 name: col.name.clone(),
548 data_type: col.data_type.clone(),
549 nullable: col.nullable,
550 default,
551 primary_key,
552 unique,
553 });
554 }
555
556 let constraints: Vec<CreateTableConstraint> = create.constraints.iter().map(|c| {
557 match c {
558 crate::ast::TableConstraint::PrimaryKey { columns } => {
559 Ok(CreateTableConstraint::PrimaryKey { columns: columns.clone() })
560 }
561 crate::ast::TableConstraint::Unique { columns } => {
562 Ok(CreateTableConstraint::Unique { columns: columns.clone() })
563 }
564 crate::ast::TableConstraint::ForeignKey { columns, ref_table, ref_columns } => {
565 Ok(CreateTableConstraint::ForeignKey {
566 columns: columns.clone(),
567 ref_table: ref_table.clone(),
568 ref_columns: ref_columns.clone(),
569 })
570 }
571 crate::ast::TableConstraint::Check { expression } => {
572 Ok(CreateTableConstraint::Check {
573 expression: self.plan_expression(expression)?,
574 })
575 }
576 }
577 }).collect::<PlannerResult<Vec<_>>>()?;
578
579 Ok(QueryPlan {
580 root: PlanNode::CreateTable(CreateTableNode {
581 table_name: create.name.clone(),
582 columns,
583 constraints,
584 if_not_exists: create.if_not_exists,
585 }),
586 estimated_cost: 1.0,
587 estimated_rows: 0,
588 })
589 }
590
591 fn plan_drop_table(&self, drop: &crate::ast::DropTableStatement) -> PlannerResult<QueryPlan> {
593 Ok(QueryPlan {
594 root: PlanNode::DropTable(DropTableNode {
595 table_name: drop.name.clone(),
596 if_exists: drop.if_exists,
597 }),
598 estimated_cost: 1.0,
599 estimated_rows: 0,
600 })
601 }
602
603 fn plan_alter_table(&self, alter: &crate::ast::AlterTableStatement) -> PlannerResult<QueryPlan> {
605 let operations = alter.operations.iter().map(|op| {
606 match op {
607 crate::ast::AlterTableOperation::AddColumn(col) => {
608 Ok(PlanAlterOperation::AddColumn(CreateColumnDef {
609 name: col.name.clone(),
610 data_type: col.data_type.clone(),
611 nullable: col.nullable,
612 default: col.default.as_ref().map(|e| self.plan_expression(e)).transpose()?,
613 primary_key: false,
614 unique: false,
615 }))
616 }
617 crate::ast::AlterTableOperation::DropColumn { name, if_exists } => {
618 Ok(PlanAlterOperation::DropColumn {
619 name: name.clone(),
620 if_exists: *if_exists,
621 })
622 }
623 crate::ast::AlterTableOperation::RenameColumn { old_name, new_name } => {
624 Ok(PlanAlterOperation::RenameColumn {
625 old_name: old_name.clone(),
626 new_name: new_name.clone(),
627 })
628 }
629 crate::ast::AlterTableOperation::AlterColumn { name, data_type, set_not_null, set_default } => {
630 let default_expr = match set_default {
631 Some(Some(expr)) => Some(Some(self.plan_expression(expr)?)),
632 Some(None) => Some(None),
633 None => None,
634 };
635 Ok(PlanAlterOperation::AlterColumn {
636 name: name.clone(),
637 data_type: data_type.clone(),
638 set_not_null: *set_not_null,
639 set_default: default_expr,
640 })
641 }
642 crate::ast::AlterTableOperation::RenameTable { new_name } => {
643 Ok(PlanAlterOperation::RenameTable {
644 new_name: new_name.clone(),
645 })
646 }
647 crate::ast::AlterTableOperation::AddConstraint(constraint) => {
648 let plan_constraint = match constraint {
649 crate::ast::TableConstraint::PrimaryKey { columns } => {
650 CreateTableConstraint::PrimaryKey { columns: columns.clone() }
651 }
652 crate::ast::TableConstraint::Unique { columns } => {
653 CreateTableConstraint::Unique { columns: columns.clone() }
654 }
655 crate::ast::TableConstraint::ForeignKey { columns, ref_table, ref_columns } => {
656 CreateTableConstraint::ForeignKey {
657 columns: columns.clone(),
658 ref_table: ref_table.clone(),
659 ref_columns: ref_columns.clone(),
660 }
661 }
662 crate::ast::TableConstraint::Check { expression } => {
663 CreateTableConstraint::Check {
664 expression: self.plan_expression(expression)?,
665 }
666 }
667 };
668 Ok(PlanAlterOperation::AddConstraint(plan_constraint))
669 }
670 crate::ast::AlterTableOperation::DropConstraint { name } => {
671 Ok(PlanAlterOperation::DropConstraint {
672 name: name.clone(),
673 })
674 }
675 }
676 }).collect::<PlannerResult<Vec<_>>>()?;
677
678 Ok(QueryPlan {
679 root: PlanNode::AlterTable(AlterTableNode {
680 table_name: alter.name.clone(),
681 operations,
682 }),
683 estimated_cost: 1.0,
684 estimated_rows: 0,
685 })
686 }
687
688 fn plan_create_index(&self, create: &crate::ast::CreateIndexStatement) -> PlannerResult<QueryPlan> {
690 Ok(QueryPlan {
691 root: PlanNode::CreateIndex(CreateIndexNode {
692 index_name: create.name.clone(),
693 table_name: create.table.clone(),
694 columns: create.columns.clone(),
695 unique: create.unique,
696 if_not_exists: create.if_not_exists,
697 }),
698 estimated_cost: 1.0,
699 estimated_rows: 0,
700 })
701 }
702
703 fn plan_drop_index(&self, drop: &crate::ast::DropIndexStatement) -> PlannerResult<QueryPlan> {
705 Ok(QueryPlan {
706 root: PlanNode::DropIndex(DropIndexNode {
707 index_name: drop.name.clone(),
708 if_exists: drop.if_exists,
709 }),
710 estimated_cost: 1.0,
711 estimated_rows: 0,
712 })
713 }
714
715 fn plan_insert(&self, insert: &crate::ast::InsertStatement) -> PlannerResult<QueryPlan> {
717 let columns = insert.columns.clone().unwrap_or_default();
718
719 let (source, estimated_rows) = match &insert.source {
720 crate::ast::InsertSource::Values(rows) => {
721 let values = rows.iter().map(|row| {
722 row.iter().map(|expr| self.plan_expression(expr)).collect::<PlannerResult<Vec<_>>>()
723 }).collect::<PlannerResult<Vec<_>>>()?;
724 let num_rows = values.len() as u64;
725 (InsertPlanSource::Values(values), num_rows)
726 }
727 crate::ast::InsertSource::Query(select) => {
728 let subquery_plan = self.plan_select(select)?;
730 let estimated_rows = subquery_plan.estimated_rows;
731 (InsertPlanSource::Query(Box::new(subquery_plan.root)), estimated_rows)
732 }
733 };
734
735 Ok(QueryPlan {
736 root: PlanNode::Insert(InsertNode {
737 table_name: insert.table.clone(),
738 columns,
739 source,
740 }),
741 estimated_cost: estimated_rows as f64,
742 estimated_rows,
743 })
744 }
745
746 fn plan_update(&self, update: &crate::ast::UpdateStatement) -> PlannerResult<QueryPlan> {
748 let assignments: Vec<(String, PlanExpression)> = update.assignments.iter()
749 .map(|a| Ok((a.column.clone(), self.plan_expression(&a.value)?)))
750 .collect::<PlannerResult<Vec<_>>>()?;
751
752 let where_clause = update.where_clause.as_ref()
753 .map(|e| self.plan_expression(e))
754 .transpose()?;
755
756 let estimated_rows = self.schema.get_table(&update.table)
758 .map(|t| t.row_count)
759 .unwrap_or(100);
760
761 Ok(QueryPlan {
762 root: PlanNode::Update(UpdateNode {
763 table_name: update.table.clone(),
764 assignments,
765 where_clause,
766 }),
767 estimated_cost: estimated_rows as f64,
768 estimated_rows,
769 })
770 }
771
772 fn plan_delete(&self, delete: &crate::ast::DeleteStatement) -> PlannerResult<QueryPlan> {
774 let where_clause = delete.where_clause.as_ref()
775 .map(|e| self.plan_expression(e))
776 .transpose()?;
777
778 let estimated_rows = self.schema.get_table(&delete.table)
780 .map(|t| t.row_count)
781 .unwrap_or(100);
782
783 Ok(QueryPlan {
784 root: PlanNode::Delete(DeleteNode {
785 table_name: delete.table.clone(),
786 where_clause,
787 }),
788 estimated_cost: estimated_rows as f64,
789 estimated_rows,
790 })
791 }
792
793 fn plan_select(&self, select: &SelectStatement) -> PlannerResult<QueryPlan> {
795 let mut plan = self.plan_from_clause(&select.from)?;
796
797 if let Some(ref where_clause) = select.where_clause {
798 let predicate = self.plan_expression(where_clause)?;
799 plan = PlanNode::Filter(FilterNode {
800 input: Box::new(plan),
801 predicate,
802 });
803 }
804
805 if !select.group_by.is_empty() || self.has_aggregates(&select.columns) {
806 plan = self.plan_aggregation(plan, select)?;
807 }
808
809 if let Some(ref having) = select.having {
810 let predicate = self.plan_expression(having)?;
811 plan = PlanNode::Filter(FilterNode {
812 input: Box::new(plan),
813 predicate,
814 });
815 }
816
817 plan = self.plan_projection(plan, &select.columns)?;
818
819 if !select.order_by.is_empty() {
820 let order_by: Vec<SortKey> = select
821 .order_by
822 .iter()
823 .map(|item| {
824 Ok(SortKey {
825 expr: self.plan_expression(&item.expression)?,
826 ascending: item.ascending,
827 nulls_first: item.nulls_first.unwrap_or(!item.ascending),
828 })
829 })
830 .collect::<PlannerResult<Vec<_>>>()?;
831
832 plan = PlanNode::Sort(SortNode {
833 input: Box::new(plan),
834 order_by,
835 });
836 }
837
838 if select.limit.is_some() || select.offset.is_some() {
839 plan = PlanNode::Limit(LimitNode {
840 input: Box::new(plan),
841 limit: select.limit,
842 offset: select.offset,
843 });
844 }
845
846 let (estimated_cost, estimated_rows) = self.estimate_cost(&plan);
847
848 Ok(QueryPlan {
849 root: plan,
850 estimated_cost,
851 estimated_rows,
852 })
853 }
854
855 fn plan_from_clause(&self, from: &Option<FromClause>) -> PlannerResult<PlanNode> {
857 let from = match from {
858 Some(f) => f,
859 None => return Ok(PlanNode::Empty),
860 };
861
862 let mut plan = self.plan_table_reference(&from.source)?;
863
864 for join in &from.joins {
865 plan = self.plan_join(plan, join)?;
866 }
867
868 Ok(plan)
869 }
870
871 fn plan_table_reference(&self, table_ref: &TableReference) -> PlannerResult<PlanNode> {
873 match table_ref {
874 TableReference::Table { name, alias } => {
875 let columns = if let Some(info) = self.schema.get_table(name) {
876 info.columns.iter().map(|c| c.name.clone()).collect()
877 } else {
878 Vec::new()
879 };
880
881 Ok(PlanNode::Scan(ScanNode {
882 table_name: name.clone(),
883 alias: alias.clone(),
884 columns,
885 index_scan: None,
886 }))
887 }
888 TableReference::Subquery { query, alias: _ } => self.plan_select(query).map(|p| p.root),
889 }
890 }
891
892 fn plan_join(&self, left: PlanNode, join: &JoinClause) -> PlannerResult<PlanNode> {
894 let right = self.plan_table_reference(&join.table)?;
895 let condition = join
896 .condition
897 .as_ref()
898 .map(|c| self.plan_expression(c))
899 .transpose()?;
900
901 let join_type = match join.join_type {
902 JoinType::Inner => PlanJoinType::Inner,
903 JoinType::Left => PlanJoinType::Left,
904 JoinType::Right => PlanJoinType::Right,
905 JoinType::Full => PlanJoinType::Full,
906 JoinType::Cross => PlanJoinType::Cross,
907 };
908
909 let strategy = self.choose_join_strategy(&left, &right, &condition);
910
911 Ok(PlanNode::Join(JoinNode {
912 left: Box::new(left),
913 right: Box::new(right),
914 join_type,
915 condition,
916 strategy,
917 }))
918 }
919
920 fn choose_join_strategy(
922 &self,
923 _left: &PlanNode,
924 _right: &PlanNode,
925 condition: &Option<PlanExpression>,
926 ) -> JoinStrategy {
927 if condition.is_none() {
928 return JoinStrategy::NestedLoop;
929 }
930
931 JoinStrategy::HashJoin
932 }
933
934 fn has_aggregates(&self, columns: &[SelectColumn]) -> bool {
936 columns.iter().any(|col| match col {
937 SelectColumn::Expression { expr, .. } => self.expression_has_aggregate(expr),
938 _ => false,
939 })
940 }
941
942 fn expression_has_aggregate(&self, expr: &Expression) -> bool {
944 match expr {
945 Expression::Function { name, .. } => {
946 matches!(
947 name.to_uppercase().as_str(),
948 "COUNT" | "SUM" | "AVG" | "MIN" | "MAX"
949 )
950 }
951 Expression::BinaryOp { left, right, .. } => {
952 self.expression_has_aggregate(left) || self.expression_has_aggregate(right)
953 }
954 _ => false,
955 }
956 }
957
958 fn plan_aggregation(
960 &self,
961 input: PlanNode,
962 select: &SelectStatement,
963 ) -> PlannerResult<PlanNode> {
964 let group_by: Vec<PlanExpression> = select
965 .group_by
966 .iter()
967 .map(|e| self.plan_expression(e))
968 .collect::<PlannerResult<Vec<_>>>()?;
969
970 let mut aggregates = Vec::new();
971 for col in &select.columns {
972 if let SelectColumn::Expression { expr, alias } = col {
973 if let Some(agg) = self.extract_aggregate(expr, alias.clone())? {
974 aggregates.push(agg);
975 }
976 }
977 }
978
979 Ok(PlanNode::Aggregate(AggregateNode {
980 input: Box::new(input),
981 group_by,
982 aggregates,
983 }))
984 }
985
986 fn extract_aggregate(
988 &self,
989 expr: &Expression,
990 alias: Option<String>,
991 ) -> PlannerResult<Option<AggregateExpr>> {
992 match expr {
993 Expression::Function {
994 name,
995 args,
996 distinct,
997 } => {
998 let func = match name.to_uppercase().as_str() {
999 "COUNT" => AggregateFunction::Count,
1000 "SUM" => AggregateFunction::Sum,
1001 "AVG" => AggregateFunction::Avg,
1002 "MIN" => AggregateFunction::Min,
1003 "MAX" => AggregateFunction::Max,
1004 _ => return Ok(None),
1005 };
1006
1007 let argument = if args.is_empty() {
1008 None
1009 } else {
1010 Some(self.plan_expression(&args[0])?)
1011 };
1012
1013 Ok(Some(AggregateExpr {
1014 function: func,
1015 argument,
1016 distinct: *distinct,
1017 alias,
1018 }))
1019 }
1020 _ => Ok(None),
1021 }
1022 }
1023
1024 fn plan_projection(
1026 &self,
1027 input: PlanNode,
1028 columns: &[SelectColumn],
1029 ) -> PlannerResult<PlanNode> {
1030 let mut expressions = Vec::new();
1031
1032 for col in columns {
1033 match col {
1034 SelectColumn::AllColumns => {
1035 expressions.push(ProjectionExpr {
1036 expr: PlanExpression::Column {
1037 table: None,
1038 name: "*".to_string(),
1039 data_type: DataType::Any,
1040 },
1041 alias: None,
1042 });
1043 }
1044 SelectColumn::TableAllColumns(table) => {
1045 expressions.push(ProjectionExpr {
1046 expr: PlanExpression::Column {
1047 table: Some(table.clone()),
1048 name: "*".to_string(),
1049 data_type: DataType::Any,
1050 },
1051 alias: None,
1052 });
1053 }
1054 SelectColumn::Expression { expr, alias } => {
1055 expressions.push(ProjectionExpr {
1056 expr: self.plan_expression(expr)?,
1057 alias: alias.clone(),
1058 });
1059 }
1060 }
1061 }
1062
1063 Ok(PlanNode::Project(ProjectNode {
1064 input: Box::new(input),
1065 expressions,
1066 }))
1067 }
1068
1069 fn plan_expression(&self, expr: &Expression) -> PlannerResult<PlanExpression> {
1071 match expr {
1072 Expression::Literal(lit) => Ok(PlanExpression::Literal(match lit {
1073 crate::ast::Literal::Null => PlanLiteral::Null,
1074 crate::ast::Literal::Boolean(b) => PlanLiteral::Boolean(*b),
1075 crate::ast::Literal::Integer(i) => PlanLiteral::Integer(*i),
1076 crate::ast::Literal::Float(f) => PlanLiteral::Float(*f),
1077 crate::ast::Literal::String(s) => PlanLiteral::String(s.clone()),
1078 })),
1079
1080 Expression::Column(col_ref) => Ok(PlanExpression::Column {
1081 table: col_ref.table.clone(),
1082 name: col_ref.column.clone(),
1083 data_type: DataType::Any,
1084 }),
1085
1086 Expression::BinaryOp { left, op, right } => {
1087 let plan_op = match op {
1088 BinaryOperator::Add => PlanBinaryOp::Add,
1089 BinaryOperator::Subtract => PlanBinaryOp::Subtract,
1090 BinaryOperator::Multiply => PlanBinaryOp::Multiply,
1091 BinaryOperator::Divide => PlanBinaryOp::Divide,
1092 BinaryOperator::Modulo => PlanBinaryOp::Modulo,
1093 BinaryOperator::Equal => PlanBinaryOp::Equal,
1094 BinaryOperator::NotEqual => PlanBinaryOp::NotEqual,
1095 BinaryOperator::LessThan => PlanBinaryOp::LessThan,
1096 BinaryOperator::LessThanOrEqual => PlanBinaryOp::LessThanOrEqual,
1097 BinaryOperator::GreaterThan => PlanBinaryOp::GreaterThan,
1098 BinaryOperator::GreaterThanOrEqual => PlanBinaryOp::GreaterThanOrEqual,
1099 BinaryOperator::And => PlanBinaryOp::And,
1100 BinaryOperator::Or => PlanBinaryOp::Or,
1101 BinaryOperator::Concat => PlanBinaryOp::Concat,
1102 };
1103
1104 Ok(PlanExpression::BinaryOp {
1105 left: Box::new(self.plan_expression(left)?),
1106 op: plan_op,
1107 right: Box::new(self.plan_expression(right)?),
1108 })
1109 }
1110
1111 Expression::UnaryOp { op, expr } => {
1112 let plan_op = match op {
1113 crate::ast::UnaryOperator::Not => PlanUnaryOp::Not,
1114 crate::ast::UnaryOperator::Negative => PlanUnaryOp::Negative,
1115 crate::ast::UnaryOperator::Positive => {
1116 return self.plan_expression(expr);
1117 }
1118 };
1119
1120 Ok(PlanExpression::UnaryOp {
1121 op: plan_op,
1122 expr: Box::new(self.plan_expression(expr)?),
1123 })
1124 }
1125
1126 Expression::Function { name, args, .. } => {
1127 let planned_args: Vec<PlanExpression> = args
1128 .iter()
1129 .map(|a| self.plan_expression(a))
1130 .collect::<PlannerResult<Vec<_>>>()?;
1131
1132 Ok(PlanExpression::Function {
1133 name: name.clone(),
1134 args: planned_args,
1135 return_type: DataType::Any,
1136 })
1137 }
1138
1139 Expression::IsNull { expr, negated } => Ok(PlanExpression::IsNull {
1140 expr: Box::new(self.plan_expression(expr)?),
1141 negated: *negated,
1142 }),
1143
1144 Expression::Cast { expr, data_type } => Ok(PlanExpression::Cast {
1145 expr: Box::new(self.plan_expression(expr)?),
1146 target_type: data_type.clone(),
1147 }),
1148
1149 Expression::Case { operand, conditions, else_result } => {
1150 let planned_operand = operand
1151 .as_ref()
1152 .map(|e| self.plan_expression(e))
1153 .transpose()?
1154 .map(Box::new);
1155
1156 let planned_conditions = conditions
1157 .iter()
1158 .map(|(cond, result)| {
1159 Ok((self.plan_expression(cond)?, self.plan_expression(result)?))
1160 })
1161 .collect::<PlannerResult<Vec<_>>>()?;
1162
1163 let planned_else = else_result
1164 .as_ref()
1165 .map(|e| self.plan_expression(e))
1166 .transpose()?
1167 .map(Box::new);
1168
1169 Ok(PlanExpression::Case {
1170 operand: planned_operand,
1171 conditions: planned_conditions,
1172 else_result: planned_else,
1173 })
1174 }
1175
1176 Expression::InList { expr, list, negated } => {
1177 let planned_list = list
1178 .iter()
1179 .map(|e| self.plan_expression(e))
1180 .collect::<PlannerResult<Vec<_>>>()?;
1181
1182 Ok(PlanExpression::InList {
1183 expr: Box::new(self.plan_expression(expr)?),
1184 list: planned_list,
1185 negated: *negated,
1186 })
1187 }
1188
1189 Expression::Between { expr, low, high, negated } => Ok(PlanExpression::Between {
1190 expr: Box::new(self.plan_expression(expr)?),
1191 low: Box::new(self.plan_expression(low)?),
1192 high: Box::new(self.plan_expression(high)?),
1193 negated: *negated,
1194 }),
1195
1196 Expression::Like { expr, pattern, negated } => Ok(PlanExpression::Like {
1197 expr: Box::new(self.plan_expression(expr)?),
1198 pattern: Box::new(self.plan_expression(pattern)?),
1199 negated: *negated,
1200 }),
1201
1202 Expression::InSubquery { expr, subquery, negated } => {
1203 let subquery_plan = self.plan_select(subquery)?;
1204 Ok(PlanExpression::InSubquery {
1205 expr: Box::new(self.plan_expression(expr)?),
1206 subquery: Box::new(subquery_plan.root),
1207 negated: *negated,
1208 })
1209 }
1210
1211 Expression::Exists { subquery, negated } => {
1212 let subquery_plan = self.plan_select(subquery)?;
1213 Ok(PlanExpression::Exists {
1214 subquery: Box::new(subquery_plan.root),
1215 negated: *negated,
1216 })
1217 }
1218
1219 Expression::Subquery(subquery) => {
1220 let subquery_plan = self.plan_select(subquery)?;
1221 Ok(PlanExpression::ScalarSubquery(Box::new(subquery_plan.root)))
1222 }
1223
1224 Expression::Placeholder(idx) => Ok(PlanExpression::Placeholder(*idx)),
1225 }
1226 }
1227
1228 fn estimate_cost(&self, node: &PlanNode) -> (f64, u64) {
1230 match node {
1231 PlanNode::Scan(scan) => {
1232 let rows = self
1233 .schema
1234 .get_table(&scan.table_name)
1235 .map(|t| t.row_count)
1236 .unwrap_or(1000);
1237 let cost = rows as f64 * 1.0;
1238 (cost, rows)
1239 }
1240
1241 PlanNode::Filter(filter) => {
1242 let (input_cost, input_rows) = self.estimate_cost(&filter.input);
1243 let selectivity = 0.1;
1244 let rows = (input_rows as f64 * selectivity) as u64;
1245 let cost = input_cost + input_rows as f64 * 0.1;
1246 (cost, rows.max(1))
1247 }
1248
1249 PlanNode::Project(project) => {
1250 let (input_cost, input_rows) = self.estimate_cost(&project.input);
1251 let cost = input_cost + input_rows as f64 * 0.01;
1252 (cost, input_rows)
1253 }
1254
1255 PlanNode::Join(join) => {
1256 let (left_cost, left_rows) = self.estimate_cost(&join.left);
1257 let (right_cost, right_rows) = self.estimate_cost(&join.right);
1258
1259 let rows = match join.join_type {
1260 PlanJoinType::Cross => left_rows * right_rows,
1261 _ => (left_rows + right_rows) / 2,
1262 };
1263
1264 let join_cost = match join.strategy {
1265 JoinStrategy::NestedLoop => left_rows as f64 * right_rows as f64,
1266 JoinStrategy::HashJoin => {
1267 left_rows as f64 + right_rows as f64 + right_rows as f64 * 1.2
1268 }
1269 JoinStrategy::MergeJoin => {
1270 left_rows as f64 * 2.0_f64.log2()
1271 + right_rows as f64 * 2.0_f64.log2()
1272 + (left_rows + right_rows) as f64
1273 }
1274 };
1275
1276 (left_cost + right_cost + join_cost, rows)
1277 }
1278
1279 PlanNode::Aggregate(agg) => {
1280 let (input_cost, input_rows) = self.estimate_cost(&agg.input);
1281 let groups = if agg.group_by.is_empty() {
1282 1
1283 } else {
1284 (input_rows as f64).sqrt() as u64
1285 };
1286 let cost = input_cost + input_rows as f64 * 0.5;
1287 (cost, groups.max(1))
1288 }
1289
1290 PlanNode::Sort(sort) => {
1291 let (input_cost, input_rows) = self.estimate_cost(&sort.input);
1292 let sort_cost = input_rows as f64 * (input_rows as f64).log2();
1293 (input_cost + sort_cost, input_rows)
1294 }
1295
1296 PlanNode::Limit(limit) => {
1297 let (input_cost, input_rows) = self.estimate_cost(&limit.input);
1298 let rows = limit.limit.unwrap_or(input_rows).min(input_rows);
1299 (input_cost, rows)
1300 }
1301
1302 PlanNode::Empty => (0.0, 1),
1303
1304 PlanNode::CreateTable(_) => (1.0, 0),
1306 PlanNode::DropTable(_) => (1.0, 0),
1307 PlanNode::AlterTable(_) => (1.0, 0),
1308 PlanNode::CreateIndex(_) => (1.0, 0),
1309 PlanNode::DropIndex(_) => (1.0, 0),
1310
1311 PlanNode::Insert(insert) => {
1313 let rows = match &insert.source {
1314 InsertPlanSource::Values(values) => values.len() as u64,
1315 InsertPlanSource::Query(subquery) => {
1316 let (_, estimated_rows) = self.estimate_cost(subquery);
1317 estimated_rows
1318 }
1319 };
1320 (rows as f64, rows)
1321 }
1322 PlanNode::Update(update) => {
1323 let rows = self.schema.get_table(&update.table_name)
1324 .map(|t| t.row_count)
1325 .unwrap_or(100);
1326 (rows as f64, rows)
1327 }
1328 PlanNode::Delete(delete) => {
1329 let rows = self.schema.get_table(&delete.table_name)
1330 .map(|t| t.row_count)
1331 .unwrap_or(100);
1332 (rows as f64, rows)
1333 }
1334 }
1335 }
1336}
1337
1338#[cfg(test)]
1343mod tests {
1344 use super::*;
1345 use crate::ast::{ColumnRef, FromClause, Literal, OrderByItem, TableReference};
1346
1347 fn create_test_schema() -> Arc<PlannerSchema> {
1348 let mut schema = PlannerSchema::new();
1349
1350 schema.add_table(TableInfo {
1351 name: "users".to_string(),
1352 columns: vec![
1353 ColumnInfo {
1354 name: "id".to_string(),
1355 data_type: DataType::Integer,
1356 nullable: false,
1357 },
1358 ColumnInfo {
1359 name: "name".to_string(),
1360 data_type: DataType::Text,
1361 nullable: false,
1362 },
1363 ColumnInfo {
1364 name: "email".to_string(),
1365 data_type: DataType::Text,
1366 nullable: true,
1367 },
1368 ],
1369 row_count: 10000,
1370 });
1371
1372 schema.add_table(TableInfo {
1373 name: "orders".to_string(),
1374 columns: vec![
1375 ColumnInfo {
1376 name: "id".to_string(),
1377 data_type: DataType::Integer,
1378 nullable: false,
1379 },
1380 ColumnInfo {
1381 name: "user_id".to_string(),
1382 data_type: DataType::Integer,
1383 nullable: false,
1384 },
1385 ColumnInfo {
1386 name: "amount".to_string(),
1387 data_type: DataType::Float,
1388 nullable: false,
1389 },
1390 ],
1391 row_count: 50000,
1392 });
1393
1394 Arc::new(schema)
1395 }
1396
1397 #[test]
1398 fn test_plan_simple_select() {
1399 let schema = create_test_schema();
1400 let planner = Planner::new(schema);
1401
1402 let select = SelectStatement {
1403 distinct: false,
1404 columns: vec![SelectColumn::AllColumns],
1405 from: Some(FromClause {
1406 source: TableReference::Table {
1407 name: "users".to_string(),
1408 alias: None,
1409 },
1410 joins: vec![],
1411 }),
1412 where_clause: None,
1413 group_by: vec![],
1414 having: None,
1415 order_by: vec![],
1416 limit: None,
1417 offset: None,
1418 };
1419
1420 let plan = planner.plan(&Statement::Select(select)).unwrap();
1421
1422 assert!(plan.estimated_rows > 0);
1423 assert!(plan.estimated_cost > 0.0);
1424 }
1425
1426 #[test]
1427 fn test_plan_select_with_filter() {
1428 let schema = create_test_schema();
1429 let planner = Planner::new(schema);
1430
1431 let select = SelectStatement {
1432 distinct: false,
1433 columns: vec![SelectColumn::AllColumns],
1434 from: Some(FromClause {
1435 source: TableReference::Table {
1436 name: "users".to_string(),
1437 alias: None,
1438 },
1439 joins: vec![],
1440 }),
1441 where_clause: Some(Expression::BinaryOp {
1442 left: Box::new(Expression::Column(ColumnRef {
1443 table: None,
1444 column: "id".to_string(),
1445 })),
1446 op: BinaryOperator::Equal,
1447 right: Box::new(Expression::Literal(Literal::Integer(1))),
1448 }),
1449 group_by: vec![],
1450 having: None,
1451 order_by: vec![],
1452 limit: None,
1453 offset: None,
1454 };
1455
1456 let plan = planner.plan(&Statement::Select(select)).unwrap();
1457
1458 match &plan.root {
1459 PlanNode::Project(p) => match p.input.as_ref() {
1460 PlanNode::Filter(_) => {}
1461 _ => panic!("Expected filter node"),
1462 },
1463 _ => panic!("Expected project node"),
1464 }
1465 }
1466
1467 #[test]
1468 fn test_plan_select_with_order_by() {
1469 let schema = create_test_schema();
1470 let planner = Planner::new(schema);
1471
1472 let select = SelectStatement {
1473 distinct: false,
1474 columns: vec![SelectColumn::AllColumns],
1475 from: Some(FromClause {
1476 source: TableReference::Table {
1477 name: "users".to_string(),
1478 alias: None,
1479 },
1480 joins: vec![],
1481 }),
1482 where_clause: None,
1483 group_by: vec![],
1484 having: None,
1485 order_by: vec![OrderByItem {
1486 expression: Expression::Column(ColumnRef {
1487 table: None,
1488 column: "name".to_string(),
1489 }),
1490 ascending: true,
1491 nulls_first: None,
1492 }],
1493 limit: Some(10),
1494 offset: None,
1495 };
1496
1497 let plan = planner.plan(&Statement::Select(select)).unwrap();
1498
1499 match &plan.root {
1500 PlanNode::Limit(l) => match l.input.as_ref() {
1501 PlanNode::Sort(_) => {}
1502 _ => panic!("Expected sort node"),
1503 },
1504 _ => panic!("Expected limit node"),
1505 }
1506 }
1507}