quill_sql/plan/physical_planner/
physical_planner.rs1use crate::catalog::{Catalog, Schema, DEFAULT_SCHEMA_NAME};
2use std::sync::Arc;
3
4use crate::plan::logical_plan::{
5 Aggregate, CreateIndex, CreateTable, DropIndex, DropTable, EmptyRelation, Filter, Insert, Join,
6 Limit, LogicalPlan, Project, Sort, TableScan, Values,
7};
8
9use crate::execution::physical_plan::{
10 PhysicalAggregate, PhysicalCreateIndex, PhysicalCreateTable, PhysicalDelete, PhysicalDropIndex,
11 PhysicalDropTable, PhysicalEmpty, PhysicalFilter, PhysicalIndexScan, PhysicalInsert,
12 PhysicalLimit, PhysicalNestedLoopJoin, PhysicalPlan, PhysicalProject, PhysicalSeqScan,
13 PhysicalSort, PhysicalUpdate, PhysicalValues,
14};
15
16pub struct PhysicalPlanner<'a> {
17 pub catalog: &'a Catalog,
18}
19
20impl PhysicalPlanner<'_> {
21 pub fn create_physical_plan(&self, logical_plan: LogicalPlan) -> PhysicalPlan {
22 let logical_plan = Arc::new(logical_plan);
23 self.build_plan(logical_plan)
24 }
25
26 fn build_plan(&self, logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
27 let plan = match logical_plan.as_ref() {
28 LogicalPlan::CreateTable(CreateTable {
29 name,
30 columns,
31 if_not_exists,
32 }) => PhysicalPlan::CreateTable(PhysicalCreateTable::new(
33 name.clone(),
34 Schema::new(columns.clone()),
35 *if_not_exists,
36 )),
37 LogicalPlan::CreateIndex(CreateIndex {
38 index_name,
39 table,
40 table_schema,
41 columns,
42 }) => PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
43 index_name.clone(),
44 table.clone(),
45 table_schema.clone(),
46 columns.clone(),
47 )),
48 LogicalPlan::DropTable(DropTable { name, if_exists }) => {
49 PhysicalPlan::DropTable(PhysicalDropTable::new(name.clone(), *if_exists))
50 }
51 LogicalPlan::DropIndex(DropIndex {
52 name,
53 schema,
54 catalog,
55 if_exists,
56 }) => PhysicalPlan::DropIndex(PhysicalDropIndex::new(
57 name.clone(),
58 schema.clone(),
59 catalog.clone(),
60 *if_exists,
61 )),
62 LogicalPlan::Insert(Insert {
63 table,
64 table_schema,
65 projected_schema,
66 input,
67 }) => {
68 let input_physical_plan = self.build_plan(input.clone());
69 PhysicalPlan::Insert(PhysicalInsert::new(
70 table.clone(),
71 table_schema.clone(),
72 projected_schema.clone(),
73 Arc::new(input_physical_plan),
74 ))
75 }
76 LogicalPlan::Values(Values { schema, values }) => {
77 PhysicalPlan::Values(PhysicalValues::new(schema.clone(), values.clone()))
78 }
79 LogicalPlan::Project(Project {
80 exprs,
81 input,
82 schema,
83 }) => {
84 let input_physical_plan = self.build_plan(input.clone());
85 PhysicalPlan::Project(PhysicalProject::new(
86 exprs.clone(),
87 schema.clone(),
88 Arc::new(input_physical_plan),
89 ))
90 }
91 LogicalPlan::Filter(Filter { predicate, input }) => {
92 let input_physical_plan = self.build_plan(input.clone());
93 PhysicalPlan::Filter(PhysicalFilter::new(
94 predicate.clone(),
95 Arc::new(input_physical_plan),
96 ))
97 }
98 LogicalPlan::TableScan(TableScan {
99 table_ref,
100 table_schema,
101 filters: _,
102 limit: _,
103 streaming_hint,
104 }) => {
105 if let Some(catalog_table) = self
107 .catalog
108 .schemas
109 .get(table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME))
110 .unwrap()
111 .tables
112 .get(table_ref.table())
113 {
114 if !catalog_table.indexes.is_empty() {
115 PhysicalPlan::IndexScan(PhysicalIndexScan::new(
116 table_ref.clone(),
117 catalog_table.indexes.keys().next().unwrap().clone(),
118 table_schema.clone(),
119 ..,
120 ))
121 } else {
122 {
123 let mut op =
124 PhysicalSeqScan::new(table_ref.clone(), table_schema.clone());
125 op.streaming_hint = *streaming_hint;
126 PhysicalPlan::SeqScan(op)
127 }
128 }
129 } else {
130 {
131 let mut op = PhysicalSeqScan::new(table_ref.clone(), table_schema.clone());
132 op.streaming_hint = *streaming_hint;
133 PhysicalPlan::SeqScan(op)
134 }
135 }
136 }
137 LogicalPlan::Limit(Limit {
138 limit,
139 offset,
140 input,
141 }) => {
142 let input_physical_plan = self.build_plan((*input).clone());
143 PhysicalPlan::Limit(PhysicalLimit::new(
144 *limit,
145 *offset,
146 Arc::new(input_physical_plan),
147 ))
148 }
149 LogicalPlan::Join(Join {
150 left,
151 right,
152 join_type,
153 condition,
154 schema,
155 }) => {
156 let left_physical_plan = self.build_plan((*left).clone());
157 let right_physical_plan = self.build_plan((*right).clone());
158 PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
159 *join_type,
160 condition.clone(),
161 Arc::new(left_physical_plan),
162 Arc::new(right_physical_plan),
163 schema.clone(),
164 ))
165 }
166 LogicalPlan::Sort(Sort {
167 order_by: expr,
168 ref input,
169 limit: _,
170 }) => {
171 let input_physical_plan = self.build_plan(Arc::clone(input));
173 PhysicalPlan::Sort(PhysicalSort::new(
174 expr.clone(),
175 Arc::new(input_physical_plan),
176 ))
177 }
178 LogicalPlan::EmptyRelation(EmptyRelation {
179 produce_one_row,
180 schema,
181 }) => PhysicalPlan::Empty(PhysicalEmpty::new(
182 if *produce_one_row { 1 } else { 0 },
183 schema.clone(),
184 )),
185 LogicalPlan::Aggregate(Aggregate {
186 input,
187 group_exprs,
188 aggr_exprs,
189 schema,
190 }) => {
191 let input_physical_plan = self.build_plan(Arc::clone(input));
192 PhysicalPlan::Aggregate(PhysicalAggregate::new(
193 Arc::new(input_physical_plan),
194 group_exprs.clone(),
195 aggr_exprs.clone(),
196 schema.clone(),
197 ))
198 }
199 LogicalPlan::Update(update) => PhysicalPlan::Update(PhysicalUpdate::new(
200 update.table.clone(),
201 update.table_schema.clone(),
202 update.assignments.clone(),
203 update.selection.clone(),
204 )),
205 LogicalPlan::Delete(delete) => PhysicalPlan::Delete(PhysicalDelete::new(
206 delete.table.clone(),
207 delete.table_schema.clone(),
208 delete.selection.clone(),
209 )),
210 LogicalPlan::BeginTransaction(_)
211 | LogicalPlan::CommitTransaction
212 | LogicalPlan::RollbackTransaction
213 | LogicalPlan::SetTransaction { .. } => {
214 PhysicalPlan::Empty(PhysicalEmpty::new(0, Schema::empty().into()))
215 }
216 };
217 plan
218 }
219}