1use crate::catalog::{Catalog, CatalogIndex, Schema};
2use std::ops::Bound;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use crate::expression::{BinaryExpr, BinaryOp, ColumnExpr, Expr, Literal};
7use crate::plan::logical_plan::{
8 Aggregate, CreateIndex, CreateTable, DropIndex, DropTable, EmptyRelation, Filter, Insert, Join,
9 Limit, LogicalPlan, Project, Sort, TableScan, Values,
10};
11use crate::storage::tuple::Tuple;
12use crate::utils::scalar::ScalarValue;
13
14use crate::execution::physical_plan::{
15 PhysicalAggregate, PhysicalAnalyze, PhysicalCreateIndex, PhysicalCreateTable, PhysicalDelete,
16 PhysicalDropIndex, PhysicalDropTable, PhysicalEmpty, PhysicalFilter, PhysicalIndexScan,
17 PhysicalInsert, PhysicalLimit, PhysicalNestedLoopJoin, PhysicalPlan, PhysicalProject,
18 PhysicalSeqScan, PhysicalSort, PhysicalUpdate, PhysicalValues,
19};
20
21#[derive(Debug, Default, Clone, Copy)]
22pub struct PhysicalPlanner<'a> {
23 catalog: Option<&'a Catalog>,
24}
25
26impl<'a> PhysicalPlanner<'a> {
27 pub fn new() -> Self {
28 Self { catalog: None }
29 }
30
31 pub fn with_catalog(catalog: &'a Catalog) -> Self {
32 Self {
33 catalog: Some(catalog),
34 }
35 }
36}
37
38impl<'a> PhysicalPlanner<'a> {
39 pub fn create_physical_plan(&self, logical_plan: LogicalPlan) -> PhysicalPlan {
40 let logical_plan = Arc::new(logical_plan);
41 self.build_plan(logical_plan)
42 }
43
44 fn build_plan(&self, logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
45 let plan = match logical_plan.as_ref() {
46 LogicalPlan::CreateTable(CreateTable {
47 name,
48 columns,
49 if_not_exists,
50 }) => PhysicalPlan::CreateTable(PhysicalCreateTable::new(
51 name.clone(),
52 Schema::new(columns.clone()),
53 *if_not_exists,
54 )),
55 LogicalPlan::CreateIndex(CreateIndex {
56 index_name,
57 table,
58 table_schema,
59 columns,
60 }) => PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
61 index_name.clone(),
62 table.clone(),
63 table_schema.clone(),
64 columns.clone(),
65 )),
66 LogicalPlan::DropTable(DropTable { name, if_exists }) => {
67 PhysicalPlan::DropTable(PhysicalDropTable::new(name.clone(), *if_exists))
68 }
69 LogicalPlan::DropIndex(DropIndex {
70 name,
71 schema,
72 catalog,
73 if_exists,
74 }) => PhysicalPlan::DropIndex(PhysicalDropIndex::new(
75 name.clone(),
76 schema.clone(),
77 catalog.clone(),
78 *if_exists,
79 )),
80 LogicalPlan::Insert(Insert {
81 table,
82 table_schema,
83 projected_schema,
84 input,
85 }) => {
86 let input_physical_plan = self.build_plan(input.clone());
87 PhysicalPlan::Insert(PhysicalInsert::new(
88 table.clone(),
89 table_schema.clone(),
90 projected_schema.clone(),
91 Rc::new(input_physical_plan),
92 ))
93 }
94 LogicalPlan::Values(Values { schema, values }) => {
95 PhysicalPlan::Values(PhysicalValues::new(schema.clone(), values.clone()))
96 }
97 LogicalPlan::Project(Project {
98 exprs,
99 input,
100 schema,
101 }) => {
102 let input_physical_plan = self.build_plan(input.clone());
103 PhysicalPlan::Project(PhysicalProject::new(
104 exprs.clone(),
105 schema.clone(),
106 Rc::new(input_physical_plan),
107 ))
108 }
109 LogicalPlan::Filter(Filter { predicate, input }) => {
110 let input_physical_plan = self.build_plan(input.clone());
111 PhysicalPlan::Filter(PhysicalFilter::new(
112 predicate.clone(),
113 Rc::new(input_physical_plan),
114 ))
115 }
116 LogicalPlan::TableScan(scan) => self.build_table_scan(scan),
117 LogicalPlan::Limit(Limit {
118 limit,
119 offset,
120 input,
121 }) => {
122 let input_physical_plan = self.build_plan((*input).clone());
123 PhysicalPlan::Limit(PhysicalLimit::new(
124 *limit,
125 *offset,
126 Rc::new(input_physical_plan),
127 ))
128 }
129 LogicalPlan::Join(Join {
130 left,
131 right,
132 join_type,
133 condition,
134 schema,
135 }) => {
136 let left_physical_plan = self.build_plan((*left).clone());
137 let right_physical_plan = self.build_plan((*right).clone());
138 PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
139 *join_type,
140 condition.clone(),
141 Rc::new(left_physical_plan),
142 Rc::new(right_physical_plan),
143 schema.clone(),
144 ))
145 }
146 LogicalPlan::Sort(Sort {
147 order_by: expr,
148 ref input,
149 limit: _,
150 }) => {
151 let input_physical_plan = self.build_plan(Arc::clone(input));
153 PhysicalPlan::Sort(PhysicalSort::new(
154 expr.clone(),
155 Rc::new(input_physical_plan),
156 ))
157 }
158 LogicalPlan::EmptyRelation(EmptyRelation {
159 produce_one_row,
160 schema,
161 }) => PhysicalPlan::Empty(PhysicalEmpty::new(
162 if *produce_one_row { 1 } else { 0 },
163 schema.clone(),
164 )),
165 LogicalPlan::Aggregate(Aggregate {
166 input,
167 group_exprs,
168 aggr_exprs,
169 schema,
170 }) => {
171 let input_physical_plan = self.build_plan(Arc::clone(input));
172 PhysicalPlan::Aggregate(PhysicalAggregate::new(
173 Rc::new(input_physical_plan),
174 group_exprs.clone(),
175 aggr_exprs.clone(),
176 schema.clone(),
177 ))
178 }
179 LogicalPlan::Update(update) => PhysicalPlan::Update(PhysicalUpdate::new(
180 update.table.clone(),
181 update.table_schema.clone(),
182 update.assignments.clone(),
183 update.selection.clone(),
184 )),
185 LogicalPlan::Delete(delete) => PhysicalPlan::Delete(PhysicalDelete::new(
186 delete.table.clone(),
187 delete.table_schema.clone(),
188 delete.selection.clone(),
189 )),
190 LogicalPlan::Analyze(analyze) => {
191 PhysicalPlan::Analyze(PhysicalAnalyze::new(analyze.table.clone()))
192 }
193 LogicalPlan::BeginTransaction(_)
194 | LogicalPlan::CommitTransaction
195 | LogicalPlan::RollbackTransaction
196 | LogicalPlan::SetTransaction { .. } => {
197 PhysicalPlan::Empty(PhysicalEmpty::new(0, Schema::empty().into()))
198 }
199 };
200 plan
201 }
202
203 fn build_table_scan(&self, scan: &TableScan) -> PhysicalPlan {
204 let mut plan = self
205 .new_index_scan(scan)
206 .unwrap_or_else(|| self.new_seq_scan(scan));
207
208 if let Some(limit_value) = scan.limit {
209 plan = PhysicalPlan::Limit(PhysicalLimit::new(Some(limit_value), 0, Rc::new(plan)));
210 }
211
212 if let Some(predicate) = conjunction(&scan.filters) {
213 plan = PhysicalPlan::Filter(PhysicalFilter::new(predicate, Rc::new(plan)));
214 }
215
216 plan
217 }
218}
219
220impl<'a> PhysicalPlanner<'a> {
221 fn new_seq_scan(&self, scan: &TableScan) -> PhysicalPlan {
222 let op = PhysicalSeqScan::new(scan.table_ref.clone(), scan.table_schema.clone());
223 PhysicalPlan::SeqScan(op)
224 }
225
226 fn new_index_scan(&self, scan: &TableScan) -> Option<PhysicalPlan> {
227 let catalog = self.catalog?;
228 let indexes = catalog.table_indexes(&scan.table_ref).ok()?;
229 indexes.into_iter().find_map(|index| {
230 let bounds = bounds_for_index(&index, &scan.filters)?;
231 Some(PhysicalPlan::IndexScan(PhysicalIndexScan::new(
232 scan.table_ref.clone(),
233 index.name,
234 scan.table_schema.clone(),
235 bounds,
236 )))
237 })
238 }
239}
240
241fn conjunction(predicates: &[Expr]) -> Option<Expr> {
242 let mut iter = predicates.iter();
243 let first = iter.next()?.clone();
244 Some(iter.fold(first, |acc, expr| {
245 Expr::Binary(BinaryExpr {
246 left: Box::new(acc),
247 op: BinaryOp::And,
248 right: Box::new(expr.clone()),
249 })
250 }))
251}
252
253fn bounds_for_index(
254 index: &CatalogIndex,
255 filters: &[Expr],
256) -> Option<(Bound<Tuple>, Bound<Tuple>)> {
257 if index.key_schema.column_count() > 1 {
258 return composite_equality_bounds(index, filters);
259 }
260 let column = index.key_schema.columns[0].clone();
261 let mut lower = Bound::Unbounded;
262 let mut upper = Bound::Unbounded;
263 let mut matched = false;
264 for predicate in flattened_conjuncts(filters) {
265 if let Some((next_lower, next_upper)) =
266 bounds_from_predicate(predicate, column.name.as_str(), &index.key_schema)
267 {
268 lower = merge_lower(lower, next_lower);
269 upper = merge_upper(upper, next_upper);
270 matched = true;
271 }
272 }
273 matched.then_some((lower, upper))
274}
275
276fn composite_equality_bounds(
277 index: &CatalogIndex,
278 filters: &[Expr],
279) -> Option<(Bound<Tuple>, Bound<Tuple>)> {
280 let mut values = vec![None; index.key_schema.column_count()];
281 for predicate in flattened_conjuncts(filters) {
282 for (idx, column) in index.key_schema.columns.iter().enumerate() {
283 let Some(value) =
284 equality_literal_from_predicate(predicate, column.name.as_str(), column.data_type)
285 else {
286 continue;
287 };
288 if values[idx]
289 .as_ref()
290 .is_some_and(|existing| existing != &value)
291 {
292 return None;
293 }
294 values[idx] = Some(value);
295 }
296 }
297
298 let values = values.into_iter().collect::<Option<Vec<_>>>()?;
299 let key = Tuple::new(index.key_schema.clone(), values);
300 Some((Bound::Included(key.clone()), Bound::Included(key)))
301}
302
303fn flattened_conjuncts(filters: &[Expr]) -> Vec<&Expr> {
304 let mut out = Vec::new();
305 for filter in filters {
306 flatten_conjunct(filter, &mut out);
307 }
308 out
309}
310
311fn flatten_conjunct<'a>(expr: &'a Expr, out: &mut Vec<&'a Expr>) {
312 if let Expr::Binary(BinaryExpr {
313 left,
314 op: BinaryOp::And,
315 right,
316 }) = expr
317 {
318 flatten_conjunct(left, out);
319 flatten_conjunct(right, out);
320 } else {
321 out.push(expr);
322 }
323}
324
325fn bounds_from_predicate(
326 predicate: &Expr,
327 column_name: &str,
328 key_schema: &crate::catalog::SchemaRef,
329) -> Option<(Bound<Tuple>, Bound<Tuple>)> {
330 let Expr::Binary(BinaryExpr { left, op, right }) = predicate else {
331 return None;
332 };
333 if let Some(value) = column_literal(left, right, column_name, key_schema) {
334 return bounds_for_op(*op, value);
335 }
336 if let Some(value) = column_literal(right, left, column_name, key_schema) {
337 return bounds_for_op(invert_comparison(*op)?, value);
338 }
339 None
340}
341
342fn column_literal(
343 column_expr: &Expr,
344 literal_expr: &Expr,
345 column_name: &str,
346 key_schema: &crate::catalog::SchemaRef,
347) -> Option<Tuple> {
348 let data_type = key_schema.columns[0].data_type;
349 let value = column_literal_value(column_expr, literal_expr, column_name, data_type)?;
350 Some(Tuple::new(key_schema.clone(), vec![value]))
351}
352
353fn equality_literal_from_predicate(
354 predicate: &Expr,
355 column_name: &str,
356 data_type: crate::catalog::DataType,
357) -> Option<ScalarValue> {
358 let Expr::Binary(BinaryExpr { left, op, right }) = predicate else {
359 return None;
360 };
361 if *op != BinaryOp::Eq {
362 return None;
363 }
364 column_literal_value(left, right, column_name, data_type)
365 .or_else(|| column_literal_value(right, left, column_name, data_type))
366}
367
368fn column_literal_value(
369 column_expr: &Expr,
370 literal_expr: &Expr,
371 column_name: &str,
372 data_type: crate::catalog::DataType,
373) -> Option<ScalarValue> {
374 let Expr::Column(ColumnExpr { name, .. }) = column_expr else {
375 return None;
376 };
377 if !name.eq_ignore_ascii_case(column_name) {
378 return None;
379 }
380 let Expr::Literal(Literal { value }) = literal_expr else {
381 return None;
382 };
383 value.cast_to(&data_type).ok()
384}
385
386fn bounds_for_op(op: BinaryOp, value: Tuple) -> Option<(Bound<Tuple>, Bound<Tuple>)> {
387 match op {
388 BinaryOp::Eq => Some((Bound::Included(value.clone()), Bound::Included(value))),
389 BinaryOp::Gt => Some((Bound::Excluded(value), Bound::Unbounded)),
390 BinaryOp::GtEq => Some((Bound::Included(value), Bound::Unbounded)),
391 BinaryOp::Lt => Some((Bound::Unbounded, Bound::Excluded(value))),
392 BinaryOp::LtEq => Some((Bound::Unbounded, Bound::Included(value))),
393 _ => None,
394 }
395}
396
397fn invert_comparison(op: BinaryOp) -> Option<BinaryOp> {
398 match op {
399 BinaryOp::Eq => Some(BinaryOp::Eq),
400 BinaryOp::Gt => Some(BinaryOp::Lt),
401 BinaryOp::GtEq => Some(BinaryOp::LtEq),
402 BinaryOp::Lt => Some(BinaryOp::Gt),
403 BinaryOp::LtEq => Some(BinaryOp::GtEq),
404 _ => None,
405 }
406}
407
408fn merge_lower(current: Bound<Tuple>, next: Bound<Tuple>) -> Bound<Tuple> {
409 match (current, next) {
410 (Bound::Unbounded, bound) | (bound, Bound::Unbounded) => bound,
411 (left, right) => {
412 let left_tuple = bound_tuple(&left);
413 let right_tuple = bound_tuple(&right);
414 match left_tuple.cmp(right_tuple) {
415 std::cmp::Ordering::Less => right,
416 std::cmp::Ordering::Greater => left,
417 std::cmp::Ordering::Equal => {
418 if matches!(left, Bound::Excluded(_)) || matches!(right, Bound::Excluded(_)) {
419 Bound::Excluded(left_tuple.clone())
420 } else {
421 Bound::Included(left_tuple.clone())
422 }
423 }
424 }
425 }
426 }
427}
428
429fn merge_upper(current: Bound<Tuple>, next: Bound<Tuple>) -> Bound<Tuple> {
430 match (current, next) {
431 (Bound::Unbounded, bound) | (bound, Bound::Unbounded) => bound,
432 (left, right) => {
433 let left_tuple = bound_tuple(&left);
434 let right_tuple = bound_tuple(&right);
435 match left_tuple.cmp(right_tuple) {
436 std::cmp::Ordering::Less => left,
437 std::cmp::Ordering::Greater => right,
438 std::cmp::Ordering::Equal => {
439 if matches!(left, Bound::Excluded(_)) || matches!(right, Bound::Excluded(_)) {
440 Bound::Excluded(left_tuple.clone())
441 } else {
442 Bound::Included(left_tuple.clone())
443 }
444 }
445 }
446 }
447 }
448}
449
450fn bound_tuple(bound: &Bound<Tuple>) -> &Tuple {
451 match bound {
452 Bound::Included(tuple) | Bound::Excluded(tuple) => tuple,
453 Bound::Unbounded => unreachable!("unbounded handled before bound_tuple"),
454 }
455}