quill_sql/plan/physical_planner/
physical_planner.rs

1use crate::catalog::{Catalog, Schema, DEFAULT_SCHEMA_NAME};
2use std::sync::Arc;
3
4use crate::plan::logical_plan::{
5    Aggregate, CreateIndex, CreateTable, DropIndex, DropTable, EmptyRelation, Filter, Insert, Join,
6    Limit, LogicalPlan, Project, Sort, TableScan, Values,
7};
8
9use crate::execution::physical_plan::{
10    PhysicalAggregate, PhysicalCreateIndex, PhysicalCreateTable, PhysicalDelete, PhysicalDropIndex,
11    PhysicalDropTable, PhysicalEmpty, PhysicalFilter, PhysicalIndexScan, PhysicalInsert,
12    PhysicalLimit, PhysicalNestedLoopJoin, PhysicalPlan, PhysicalProject, PhysicalSeqScan,
13    PhysicalSort, PhysicalUpdate, PhysicalValues,
14};
15
16pub struct PhysicalPlanner<'a> {
17    pub catalog: &'a Catalog,
18}
19
20impl PhysicalPlanner<'_> {
21    pub fn create_physical_plan(&self, logical_plan: LogicalPlan) -> PhysicalPlan {
22        let logical_plan = Arc::new(logical_plan);
23        self.build_plan(logical_plan)
24    }
25
26    fn build_plan(&self, logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
27        let plan = match logical_plan.as_ref() {
28            LogicalPlan::CreateTable(CreateTable {
29                name,
30                columns,
31                if_not_exists,
32            }) => PhysicalPlan::CreateTable(PhysicalCreateTable::new(
33                name.clone(),
34                Schema::new(columns.clone()),
35                *if_not_exists,
36            )),
37            LogicalPlan::CreateIndex(CreateIndex {
38                index_name,
39                table,
40                table_schema,
41                columns,
42            }) => PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
43                index_name.clone(),
44                table.clone(),
45                table_schema.clone(),
46                columns.clone(),
47            )),
48            LogicalPlan::DropTable(DropTable { name, if_exists }) => {
49                PhysicalPlan::DropTable(PhysicalDropTable::new(name.clone(), *if_exists))
50            }
51            LogicalPlan::DropIndex(DropIndex {
52                name,
53                schema,
54                catalog,
55                if_exists,
56            }) => PhysicalPlan::DropIndex(PhysicalDropIndex::new(
57                name.clone(),
58                schema.clone(),
59                catalog.clone(),
60                *if_exists,
61            )),
62            LogicalPlan::Insert(Insert {
63                table,
64                table_schema,
65                projected_schema,
66                input,
67            }) => {
68                let input_physical_plan = self.build_plan(input.clone());
69                PhysicalPlan::Insert(PhysicalInsert::new(
70                    table.clone(),
71                    table_schema.clone(),
72                    projected_schema.clone(),
73                    Arc::new(input_physical_plan),
74                ))
75            }
76            LogicalPlan::Values(Values { schema, values }) => {
77                PhysicalPlan::Values(PhysicalValues::new(schema.clone(), values.clone()))
78            }
79            LogicalPlan::Project(Project {
80                exprs,
81                input,
82                schema,
83            }) => {
84                let input_physical_plan = self.build_plan(input.clone());
85                PhysicalPlan::Project(PhysicalProject::new(
86                    exprs.clone(),
87                    schema.clone(),
88                    Arc::new(input_physical_plan),
89                ))
90            }
91            LogicalPlan::Filter(Filter { predicate, input }) => {
92                let input_physical_plan = self.build_plan(input.clone());
93                PhysicalPlan::Filter(PhysicalFilter::new(
94                    predicate.clone(),
95                    Arc::new(input_physical_plan),
96                ))
97            }
98            LogicalPlan::TableScan(TableScan {
99                table_ref,
100                table_schema,
101                filters: _,
102                limit: _,
103                streaming_hint,
104            }) => {
105                // TODO fix testing
106                if let Some(catalog_table) = self
107                    .catalog
108                    .schemas
109                    .get(table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME))
110                    .unwrap()
111                    .tables
112                    .get(table_ref.table())
113                {
114                    if !catalog_table.indexes.is_empty() {
115                        PhysicalPlan::IndexScan(PhysicalIndexScan::new(
116                            table_ref.clone(),
117                            catalog_table.indexes.keys().next().unwrap().clone(),
118                            table_schema.clone(),
119                            ..,
120                        ))
121                    } else {
122                        {
123                            let mut op =
124                                PhysicalSeqScan::new(table_ref.clone(), table_schema.clone());
125                            op.streaming_hint = *streaming_hint;
126                            PhysicalPlan::SeqScan(op)
127                        }
128                    }
129                } else {
130                    {
131                        let mut op = PhysicalSeqScan::new(table_ref.clone(), table_schema.clone());
132                        op.streaming_hint = *streaming_hint;
133                        PhysicalPlan::SeqScan(op)
134                    }
135                }
136            }
137            LogicalPlan::Limit(Limit {
138                limit,
139                offset,
140                input,
141            }) => {
142                let input_physical_plan = self.build_plan((*input).clone());
143                PhysicalPlan::Limit(PhysicalLimit::new(
144                    *limit,
145                    *offset,
146                    Arc::new(input_physical_plan),
147                ))
148            }
149            LogicalPlan::Join(Join {
150                left,
151                right,
152                join_type,
153                condition,
154                schema,
155            }) => {
156                let left_physical_plan = self.build_plan((*left).clone());
157                let right_physical_plan = self.build_plan((*right).clone());
158                PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
159                    *join_type,
160                    condition.clone(),
161                    Arc::new(left_physical_plan),
162                    Arc::new(right_physical_plan),
163                    schema.clone(),
164                ))
165            }
166            LogicalPlan::Sort(Sort {
167                order_by: expr,
168                ref input,
169                limit: _,
170            }) => {
171                // TODO limit
172                let input_physical_plan = self.build_plan(Arc::clone(input));
173                PhysicalPlan::Sort(PhysicalSort::new(
174                    expr.clone(),
175                    Arc::new(input_physical_plan),
176                ))
177            }
178            LogicalPlan::EmptyRelation(EmptyRelation {
179                produce_one_row,
180                schema,
181            }) => PhysicalPlan::Empty(PhysicalEmpty::new(
182                if *produce_one_row { 1 } else { 0 },
183                schema.clone(),
184            )),
185            LogicalPlan::Aggregate(Aggregate {
186                input,
187                group_exprs,
188                aggr_exprs,
189                schema,
190            }) => {
191                let input_physical_plan = self.build_plan(Arc::clone(input));
192                PhysicalPlan::Aggregate(PhysicalAggregate::new(
193                    Arc::new(input_physical_plan),
194                    group_exprs.clone(),
195                    aggr_exprs.clone(),
196                    schema.clone(),
197                ))
198            }
199            LogicalPlan::Update(update) => PhysicalPlan::Update(PhysicalUpdate::new(
200                update.table.clone(),
201                update.table_schema.clone(),
202                update.assignments.clone(),
203                update.selection.clone(),
204            )),
205            LogicalPlan::Delete(delete) => PhysicalPlan::Delete(PhysicalDelete::new(
206                delete.table.clone(),
207                delete.table_schema.clone(),
208                delete.selection.clone(),
209            )),
210            LogicalPlan::BeginTransaction(_)
211            | LogicalPlan::CommitTransaction
212            | LogicalPlan::RollbackTransaction
213            | LogicalPlan::SetTransaction { .. } => {
214                PhysicalPlan::Empty(PhysicalEmpty::new(0, Schema::empty().into()))
215            }
216        };
217        plan
218    }
219}