quill_sql/plan/physical_planner/
physical_planner.rs

1use crate::catalog::Schema;
2use std::sync::Arc;
3
4use crate::expression::{BinaryExpr, BinaryOp, Expr};
5use crate::plan::logical_plan::{
6    Aggregate, CreateIndex, CreateTable, DropIndex, DropTable, EmptyRelation, Filter, Insert, Join,
7    Limit, LogicalPlan, Project, Sort, TableScan, Values,
8};
9
10use crate::execution::physical_plan::{
11    PhysicalAggregate, PhysicalAnalyze, PhysicalCreateIndex, PhysicalCreateTable, PhysicalDelete,
12    PhysicalDropIndex, PhysicalDropTable, PhysicalEmpty, PhysicalFilter, PhysicalInsert,
13    PhysicalLimit, PhysicalNestedLoopJoin, PhysicalPlan, PhysicalProject, PhysicalSeqScan,
14    PhysicalSort, PhysicalUpdate, PhysicalValues,
15};
16
17#[derive(Debug, Default, Clone, Copy)]
18pub struct PhysicalPlanner;
19
20impl PhysicalPlanner {
21    pub fn new() -> Self {
22        Self
23    }
24}
25
26impl PhysicalPlanner {
27    pub fn create_physical_plan(&self, logical_plan: LogicalPlan) -> PhysicalPlan {
28        let logical_plan = Arc::new(logical_plan);
29        self.build_plan(logical_plan)
30    }
31
32    fn build_plan(&self, logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
33        let plan = match logical_plan.as_ref() {
34            LogicalPlan::CreateTable(CreateTable {
35                name,
36                columns,
37                if_not_exists,
38            }) => PhysicalPlan::CreateTable(PhysicalCreateTable::new(
39                name.clone(),
40                Schema::new(columns.clone()),
41                *if_not_exists,
42            )),
43            LogicalPlan::CreateIndex(CreateIndex {
44                index_name,
45                table,
46                table_schema,
47                columns,
48            }) => PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
49                index_name.clone(),
50                table.clone(),
51                table_schema.clone(),
52                columns.clone(),
53            )),
54            LogicalPlan::DropTable(DropTable { name, if_exists }) => {
55                PhysicalPlan::DropTable(PhysicalDropTable::new(name.clone(), *if_exists))
56            }
57            LogicalPlan::DropIndex(DropIndex {
58                name,
59                schema,
60                catalog,
61                if_exists,
62            }) => PhysicalPlan::DropIndex(PhysicalDropIndex::new(
63                name.clone(),
64                schema.clone(),
65                catalog.clone(),
66                *if_exists,
67            )),
68            LogicalPlan::Insert(Insert {
69                table,
70                table_schema,
71                projected_schema,
72                input,
73            }) => {
74                let input_physical_plan = self.build_plan(input.clone());
75                PhysicalPlan::Insert(PhysicalInsert::new(
76                    table.clone(),
77                    table_schema.clone(),
78                    projected_schema.clone(),
79                    Arc::new(input_physical_plan),
80                ))
81            }
82            LogicalPlan::Values(Values { schema, values }) => {
83                PhysicalPlan::Values(PhysicalValues::new(schema.clone(), values.clone()))
84            }
85            LogicalPlan::Project(Project {
86                exprs,
87                input,
88                schema,
89            }) => {
90                let input_physical_plan = self.build_plan(input.clone());
91                PhysicalPlan::Project(PhysicalProject::new(
92                    exprs.clone(),
93                    schema.clone(),
94                    Arc::new(input_physical_plan),
95                ))
96            }
97            LogicalPlan::Filter(Filter { predicate, input }) => {
98                let input_physical_plan = self.build_plan(input.clone());
99                PhysicalPlan::Filter(PhysicalFilter::new(
100                    predicate.clone(),
101                    Arc::new(input_physical_plan),
102                ))
103            }
104            LogicalPlan::TableScan(scan) => self.build_table_scan(scan),
105            LogicalPlan::Limit(Limit {
106                limit,
107                offset,
108                input,
109            }) => {
110                let input_physical_plan = self.build_plan((*input).clone());
111                PhysicalPlan::Limit(PhysicalLimit::new(
112                    *limit,
113                    *offset,
114                    Arc::new(input_physical_plan),
115                ))
116            }
117            LogicalPlan::Join(Join {
118                left,
119                right,
120                join_type,
121                condition,
122                schema,
123            }) => {
124                let left_physical_plan = self.build_plan((*left).clone());
125                let right_physical_plan = self.build_plan((*right).clone());
126                PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
127                    *join_type,
128                    condition.clone(),
129                    Arc::new(left_physical_plan),
130                    Arc::new(right_physical_plan),
131                    schema.clone(),
132                ))
133            }
134            LogicalPlan::Sort(Sort {
135                order_by: expr,
136                ref input,
137                limit: _,
138            }) => {
139                // TODO limit
140                let input_physical_plan = self.build_plan(Arc::clone(input));
141                PhysicalPlan::Sort(PhysicalSort::new(
142                    expr.clone(),
143                    Arc::new(input_physical_plan),
144                ))
145            }
146            LogicalPlan::EmptyRelation(EmptyRelation {
147                produce_one_row,
148                schema,
149            }) => PhysicalPlan::Empty(PhysicalEmpty::new(
150                if *produce_one_row { 1 } else { 0 },
151                schema.clone(),
152            )),
153            LogicalPlan::Aggregate(Aggregate {
154                input,
155                group_exprs,
156                aggr_exprs,
157                schema,
158            }) => {
159                let input_physical_plan = self.build_plan(Arc::clone(input));
160                PhysicalPlan::Aggregate(PhysicalAggregate::new(
161                    Arc::new(input_physical_plan),
162                    group_exprs.clone(),
163                    aggr_exprs.clone(),
164                    schema.clone(),
165                ))
166            }
167            LogicalPlan::Update(update) => PhysicalPlan::Update(PhysicalUpdate::new(
168                update.table.clone(),
169                update.table_schema.clone(),
170                update.assignments.clone(),
171                update.selection.clone(),
172            )),
173            LogicalPlan::Delete(delete) => PhysicalPlan::Delete(PhysicalDelete::new(
174                delete.table.clone(),
175                delete.table_schema.clone(),
176                delete.selection.clone(),
177            )),
178            LogicalPlan::Analyze(analyze) => {
179                PhysicalPlan::Analyze(PhysicalAnalyze::new(analyze.table.clone()))
180            }
181            LogicalPlan::BeginTransaction(_)
182            | LogicalPlan::CommitTransaction
183            | LogicalPlan::RollbackTransaction
184            | LogicalPlan::SetTransaction { .. } => {
185                PhysicalPlan::Empty(PhysicalEmpty::new(0, Schema::empty().into()))
186            }
187        };
188        plan
189    }
190
191    fn build_table_scan(&self, scan: &TableScan) -> PhysicalPlan {
192        let mut plan = self.new_seq_scan(scan);
193
194        if let Some(limit_value) = scan.limit {
195            plan = PhysicalPlan::Limit(PhysicalLimit::new(Some(limit_value), 0, Arc::new(plan)));
196        }
197
198        if let Some(predicate) = conjunction(&scan.filters) {
199            plan = PhysicalPlan::Filter(PhysicalFilter::new(predicate, Arc::new(plan)));
200        }
201
202        plan
203    }
204}
205
206impl PhysicalPlanner {
207    fn new_seq_scan(&self, scan: &TableScan) -> PhysicalPlan {
208        let op = PhysicalSeqScan::new(scan.table_ref.clone(), scan.table_schema.clone());
209        PhysicalPlan::SeqScan(op)
210    }
211}
212
213fn conjunction(predicates: &[Expr]) -> Option<Expr> {
214    let mut iter = predicates.iter();
215    let first = iter.next()?.clone();
216    Some(iter.fold(first, |acc, expr| {
217        Expr::Binary(BinaryExpr {
218            left: Box::new(acc),
219            op: BinaryOp::And,
220            right: Box::new(expr.clone()),
221        })
222    }))
223}