quill_sql/plan/physical_planner/
physical_planner.rs1use crate::catalog::Schema;
2use std::sync::Arc;
3
4use crate::expression::{BinaryExpr, BinaryOp, Expr};
5use crate::plan::logical_plan::{
6 Aggregate, CreateIndex, CreateTable, DropIndex, DropTable, EmptyRelation, Filter, Insert, Join,
7 Limit, LogicalPlan, Project, Sort, TableScan, Values,
8};
9
10use crate::execution::physical_plan::{
11 PhysicalAggregate, PhysicalAnalyze, PhysicalCreateIndex, PhysicalCreateTable, PhysicalDelete,
12 PhysicalDropIndex, PhysicalDropTable, PhysicalEmpty, PhysicalFilter, PhysicalInsert,
13 PhysicalLimit, PhysicalNestedLoopJoin, PhysicalPlan, PhysicalProject, PhysicalSeqScan,
14 PhysicalSort, PhysicalUpdate, PhysicalValues,
15};
16
17#[derive(Debug, Default, Clone, Copy)]
18pub struct PhysicalPlanner;
19
20impl PhysicalPlanner {
21 pub fn new() -> Self {
22 Self
23 }
24}
25
26impl PhysicalPlanner {
27 pub fn create_physical_plan(&self, logical_plan: LogicalPlan) -> PhysicalPlan {
28 let logical_plan = Arc::new(logical_plan);
29 self.build_plan(logical_plan)
30 }
31
32 fn build_plan(&self, logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
33 let plan = match logical_plan.as_ref() {
34 LogicalPlan::CreateTable(CreateTable {
35 name,
36 columns,
37 if_not_exists,
38 }) => PhysicalPlan::CreateTable(PhysicalCreateTable::new(
39 name.clone(),
40 Schema::new(columns.clone()),
41 *if_not_exists,
42 )),
43 LogicalPlan::CreateIndex(CreateIndex {
44 index_name,
45 table,
46 table_schema,
47 columns,
48 }) => PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
49 index_name.clone(),
50 table.clone(),
51 table_schema.clone(),
52 columns.clone(),
53 )),
54 LogicalPlan::DropTable(DropTable { name, if_exists }) => {
55 PhysicalPlan::DropTable(PhysicalDropTable::new(name.clone(), *if_exists))
56 }
57 LogicalPlan::DropIndex(DropIndex {
58 name,
59 schema,
60 catalog,
61 if_exists,
62 }) => PhysicalPlan::DropIndex(PhysicalDropIndex::new(
63 name.clone(),
64 schema.clone(),
65 catalog.clone(),
66 *if_exists,
67 )),
68 LogicalPlan::Insert(Insert {
69 table,
70 table_schema,
71 projected_schema,
72 input,
73 }) => {
74 let input_physical_plan = self.build_plan(input.clone());
75 PhysicalPlan::Insert(PhysicalInsert::new(
76 table.clone(),
77 table_schema.clone(),
78 projected_schema.clone(),
79 Arc::new(input_physical_plan),
80 ))
81 }
82 LogicalPlan::Values(Values { schema, values }) => {
83 PhysicalPlan::Values(PhysicalValues::new(schema.clone(), values.clone()))
84 }
85 LogicalPlan::Project(Project {
86 exprs,
87 input,
88 schema,
89 }) => {
90 let input_physical_plan = self.build_plan(input.clone());
91 PhysicalPlan::Project(PhysicalProject::new(
92 exprs.clone(),
93 schema.clone(),
94 Arc::new(input_physical_plan),
95 ))
96 }
97 LogicalPlan::Filter(Filter { predicate, input }) => {
98 let input_physical_plan = self.build_plan(input.clone());
99 PhysicalPlan::Filter(PhysicalFilter::new(
100 predicate.clone(),
101 Arc::new(input_physical_plan),
102 ))
103 }
104 LogicalPlan::TableScan(scan) => self.build_table_scan(scan),
105 LogicalPlan::Limit(Limit {
106 limit,
107 offset,
108 input,
109 }) => {
110 let input_physical_plan = self.build_plan((*input).clone());
111 PhysicalPlan::Limit(PhysicalLimit::new(
112 *limit,
113 *offset,
114 Arc::new(input_physical_plan),
115 ))
116 }
117 LogicalPlan::Join(Join {
118 left,
119 right,
120 join_type,
121 condition,
122 schema,
123 }) => {
124 let left_physical_plan = self.build_plan((*left).clone());
125 let right_physical_plan = self.build_plan((*right).clone());
126 PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
127 *join_type,
128 condition.clone(),
129 Arc::new(left_physical_plan),
130 Arc::new(right_physical_plan),
131 schema.clone(),
132 ))
133 }
134 LogicalPlan::Sort(Sort {
135 order_by: expr,
136 ref input,
137 limit: _,
138 }) => {
139 let input_physical_plan = self.build_plan(Arc::clone(input));
141 PhysicalPlan::Sort(PhysicalSort::new(
142 expr.clone(),
143 Arc::new(input_physical_plan),
144 ))
145 }
146 LogicalPlan::EmptyRelation(EmptyRelation {
147 produce_one_row,
148 schema,
149 }) => PhysicalPlan::Empty(PhysicalEmpty::new(
150 if *produce_one_row { 1 } else { 0 },
151 schema.clone(),
152 )),
153 LogicalPlan::Aggregate(Aggregate {
154 input,
155 group_exprs,
156 aggr_exprs,
157 schema,
158 }) => {
159 let input_physical_plan = self.build_plan(Arc::clone(input));
160 PhysicalPlan::Aggregate(PhysicalAggregate::new(
161 Arc::new(input_physical_plan),
162 group_exprs.clone(),
163 aggr_exprs.clone(),
164 schema.clone(),
165 ))
166 }
167 LogicalPlan::Update(update) => PhysicalPlan::Update(PhysicalUpdate::new(
168 update.table.clone(),
169 update.table_schema.clone(),
170 update.assignments.clone(),
171 update.selection.clone(),
172 )),
173 LogicalPlan::Delete(delete) => PhysicalPlan::Delete(PhysicalDelete::new(
174 delete.table.clone(),
175 delete.table_schema.clone(),
176 delete.selection.clone(),
177 )),
178 LogicalPlan::Analyze(analyze) => {
179 PhysicalPlan::Analyze(PhysicalAnalyze::new(analyze.table.clone()))
180 }
181 LogicalPlan::BeginTransaction(_)
182 | LogicalPlan::CommitTransaction
183 | LogicalPlan::RollbackTransaction
184 | LogicalPlan::SetTransaction { .. } => {
185 PhysicalPlan::Empty(PhysicalEmpty::new(0, Schema::empty().into()))
186 }
187 };
188 plan
189 }
190
191 fn build_table_scan(&self, scan: &TableScan) -> PhysicalPlan {
192 let mut plan = self.new_seq_scan(scan);
193
194 if let Some(limit_value) = scan.limit {
195 plan = PhysicalPlan::Limit(PhysicalLimit::new(Some(limit_value), 0, Arc::new(plan)));
196 }
197
198 if let Some(predicate) = conjunction(&scan.filters) {
199 plan = PhysicalPlan::Filter(PhysicalFilter::new(predicate, Arc::new(plan)));
200 }
201
202 plan
203 }
204}
205
206impl PhysicalPlanner {
207 fn new_seq_scan(&self, scan: &TableScan) -> PhysicalPlan {
208 let op = PhysicalSeqScan::new(scan.table_ref.clone(), scan.table_schema.clone());
209 PhysicalPlan::SeqScan(op)
210 }
211}
212
213fn conjunction(predicates: &[Expr]) -> Option<Expr> {
214 let mut iter = predicates.iter();
215 let first = iter.next()?.clone();
216 Some(iter.fold(first, |acc, expr| {
217 Expr::Binary(BinaryExpr {
218 left: Box::new(acc),
219 op: BinaryOp::And,
220 right: Box::new(expr.clone()),
221 })
222 }))
223}