Skip to main content

quill_sql/plan/physical_planner/
planner.rs

1use crate::catalog::{Catalog, CatalogIndex, Schema};
2use std::ops::Bound;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use crate::expression::{BinaryExpr, BinaryOp, ColumnExpr, Expr, Literal};
7use crate::plan::logical_plan::{
8    Aggregate, CreateIndex, CreateTable, DropIndex, DropTable, EmptyRelation, Filter, Insert, Join,
9    Limit, LogicalPlan, Project, Sort, TableScan, Values,
10};
11use crate::storage::tuple::Tuple;
12use crate::utils::scalar::ScalarValue;
13
14use crate::execution::physical_plan::{
15    PhysicalAggregate, PhysicalAnalyze, PhysicalCreateIndex, PhysicalCreateTable, PhysicalDelete,
16    PhysicalDropIndex, PhysicalDropTable, PhysicalEmpty, PhysicalFilter, PhysicalIndexScan,
17    PhysicalInsert, PhysicalLimit, PhysicalNestedLoopJoin, PhysicalPlan, PhysicalProject,
18    PhysicalSeqScan, PhysicalSort, PhysicalUpdate, PhysicalValues,
19};
20
21#[derive(Debug, Default, Clone, Copy)]
22pub struct PhysicalPlanner<'a> {
23    catalog: Option<&'a Catalog>,
24}
25
26impl<'a> PhysicalPlanner<'a> {
27    pub fn new() -> Self {
28        Self { catalog: None }
29    }
30
31    pub fn with_catalog(catalog: &'a Catalog) -> Self {
32        Self {
33            catalog: Some(catalog),
34        }
35    }
36}
37
38impl<'a> PhysicalPlanner<'a> {
39    pub fn create_physical_plan(&self, logical_plan: LogicalPlan) -> PhysicalPlan {
40        let logical_plan = Arc::new(logical_plan);
41        self.build_plan(logical_plan)
42    }
43
44    fn build_plan(&self, logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
45        let plan = match logical_plan.as_ref() {
46            LogicalPlan::CreateTable(CreateTable {
47                name,
48                columns,
49                if_not_exists,
50            }) => PhysicalPlan::CreateTable(PhysicalCreateTable::new(
51                name.clone(),
52                Schema::new(columns.clone()),
53                *if_not_exists,
54            )),
55            LogicalPlan::CreateIndex(CreateIndex {
56                index_name,
57                table,
58                table_schema,
59                columns,
60            }) => PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
61                index_name.clone(),
62                table.clone(),
63                table_schema.clone(),
64                columns.clone(),
65            )),
66            LogicalPlan::DropTable(DropTable { name, if_exists }) => {
67                PhysicalPlan::DropTable(PhysicalDropTable::new(name.clone(), *if_exists))
68            }
69            LogicalPlan::DropIndex(DropIndex {
70                name,
71                schema,
72                catalog,
73                if_exists,
74            }) => PhysicalPlan::DropIndex(PhysicalDropIndex::new(
75                name.clone(),
76                schema.clone(),
77                catalog.clone(),
78                *if_exists,
79            )),
80            LogicalPlan::Insert(Insert {
81                table,
82                table_schema,
83                projected_schema,
84                input,
85            }) => {
86                let input_physical_plan = self.build_plan(input.clone());
87                PhysicalPlan::Insert(PhysicalInsert::new(
88                    table.clone(),
89                    table_schema.clone(),
90                    projected_schema.clone(),
91                    Rc::new(input_physical_plan),
92                ))
93            }
94            LogicalPlan::Values(Values { schema, values }) => {
95                PhysicalPlan::Values(PhysicalValues::new(schema.clone(), values.clone()))
96            }
97            LogicalPlan::Project(Project {
98                exprs,
99                input,
100                schema,
101            }) => {
102                let input_physical_plan = self.build_plan(input.clone());
103                PhysicalPlan::Project(PhysicalProject::new(
104                    exprs.clone(),
105                    schema.clone(),
106                    Rc::new(input_physical_plan),
107                ))
108            }
109            LogicalPlan::Filter(Filter { predicate, input }) => {
110                let input_physical_plan = self.build_plan(input.clone());
111                PhysicalPlan::Filter(PhysicalFilter::new(
112                    predicate.clone(),
113                    Rc::new(input_physical_plan),
114                ))
115            }
116            LogicalPlan::TableScan(scan) => self.build_table_scan(scan),
117            LogicalPlan::Limit(Limit {
118                limit,
119                offset,
120                input,
121            }) => {
122                let input_physical_plan = self.build_plan((*input).clone());
123                PhysicalPlan::Limit(PhysicalLimit::new(
124                    *limit,
125                    *offset,
126                    Rc::new(input_physical_plan),
127                ))
128            }
129            LogicalPlan::Join(Join {
130                left,
131                right,
132                join_type,
133                condition,
134                schema,
135            }) => {
136                let left_physical_plan = self.build_plan((*left).clone());
137                let right_physical_plan = self.build_plan((*right).clone());
138                PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
139                    *join_type,
140                    condition.clone(),
141                    Rc::new(left_physical_plan),
142                    Rc::new(right_physical_plan),
143                    schema.clone(),
144                ))
145            }
146            LogicalPlan::Sort(Sort {
147                order_by: expr,
148                ref input,
149                limit: _,
150            }) => {
151                // TODO limit
152                let input_physical_plan = self.build_plan(Arc::clone(input));
153                PhysicalPlan::Sort(PhysicalSort::new(
154                    expr.clone(),
155                    Rc::new(input_physical_plan),
156                ))
157            }
158            LogicalPlan::EmptyRelation(EmptyRelation {
159                produce_one_row,
160                schema,
161            }) => PhysicalPlan::Empty(PhysicalEmpty::new(
162                if *produce_one_row { 1 } else { 0 },
163                schema.clone(),
164            )),
165            LogicalPlan::Aggregate(Aggregate {
166                input,
167                group_exprs,
168                aggr_exprs,
169                schema,
170            }) => {
171                let input_physical_plan = self.build_plan(Arc::clone(input));
172                PhysicalPlan::Aggregate(PhysicalAggregate::new(
173                    Rc::new(input_physical_plan),
174                    group_exprs.clone(),
175                    aggr_exprs.clone(),
176                    schema.clone(),
177                ))
178            }
179            LogicalPlan::Update(update) => PhysicalPlan::Update(PhysicalUpdate::new(
180                update.table.clone(),
181                update.table_schema.clone(),
182                update.assignments.clone(),
183                update.selection.clone(),
184            )),
185            LogicalPlan::Delete(delete) => PhysicalPlan::Delete(PhysicalDelete::new(
186                delete.table.clone(),
187                delete.table_schema.clone(),
188                delete.selection.clone(),
189            )),
190            LogicalPlan::Analyze(analyze) => {
191                PhysicalPlan::Analyze(PhysicalAnalyze::new(analyze.table.clone()))
192            }
193            LogicalPlan::BeginTransaction(_)
194            | LogicalPlan::CommitTransaction
195            | LogicalPlan::RollbackTransaction
196            | LogicalPlan::SetTransaction { .. } => {
197                PhysicalPlan::Empty(PhysicalEmpty::new(0, Schema::empty().into()))
198            }
199        };
200        plan
201    }
202
203    fn build_table_scan(&self, scan: &TableScan) -> PhysicalPlan {
204        let mut plan = self
205            .new_index_scan(scan)
206            .unwrap_or_else(|| self.new_seq_scan(scan));
207
208        if let Some(limit_value) = scan.limit {
209            plan = PhysicalPlan::Limit(PhysicalLimit::new(Some(limit_value), 0, Rc::new(plan)));
210        }
211
212        if let Some(predicate) = conjunction(&scan.filters) {
213            plan = PhysicalPlan::Filter(PhysicalFilter::new(predicate, Rc::new(plan)));
214        }
215
216        plan
217    }
218}
219
220impl<'a> PhysicalPlanner<'a> {
221    fn new_seq_scan(&self, scan: &TableScan) -> PhysicalPlan {
222        let op = PhysicalSeqScan::new(scan.table_ref.clone(), scan.table_schema.clone());
223        PhysicalPlan::SeqScan(op)
224    }
225
226    fn new_index_scan(&self, scan: &TableScan) -> Option<PhysicalPlan> {
227        let catalog = self.catalog?;
228        let indexes = catalog.table_indexes(&scan.table_ref).ok()?;
229        indexes.into_iter().find_map(|index| {
230            let bounds = bounds_for_index(&index, &scan.filters)?;
231            Some(PhysicalPlan::IndexScan(PhysicalIndexScan::new(
232                scan.table_ref.clone(),
233                index.name,
234                scan.table_schema.clone(),
235                bounds,
236            )))
237        })
238    }
239}
240
241fn conjunction(predicates: &[Expr]) -> Option<Expr> {
242    let mut iter = predicates.iter();
243    let first = iter.next()?.clone();
244    Some(iter.fold(first, |acc, expr| {
245        Expr::Binary(BinaryExpr {
246            left: Box::new(acc),
247            op: BinaryOp::And,
248            right: Box::new(expr.clone()),
249        })
250    }))
251}
252
253fn bounds_for_index(
254    index: &CatalogIndex,
255    filters: &[Expr],
256) -> Option<(Bound<Tuple>, Bound<Tuple>)> {
257    if index.key_schema.column_count() > 1 {
258        return composite_equality_bounds(index, filters);
259    }
260    let column = index.key_schema.columns[0].clone();
261    let mut lower = Bound::Unbounded;
262    let mut upper = Bound::Unbounded;
263    let mut matched = false;
264    for predicate in flattened_conjuncts(filters) {
265        if let Some((next_lower, next_upper)) =
266            bounds_from_predicate(predicate, column.name.as_str(), &index.key_schema)
267        {
268            lower = merge_lower(lower, next_lower);
269            upper = merge_upper(upper, next_upper);
270            matched = true;
271        }
272    }
273    matched.then_some((lower, upper))
274}
275
276fn composite_equality_bounds(
277    index: &CatalogIndex,
278    filters: &[Expr],
279) -> Option<(Bound<Tuple>, Bound<Tuple>)> {
280    let mut values = vec![None; index.key_schema.column_count()];
281    for predicate in flattened_conjuncts(filters) {
282        for (idx, column) in index.key_schema.columns.iter().enumerate() {
283            let Some(value) =
284                equality_literal_from_predicate(predicate, column.name.as_str(), column.data_type)
285            else {
286                continue;
287            };
288            if values[idx]
289                .as_ref()
290                .is_some_and(|existing| existing != &value)
291            {
292                return None;
293            }
294            values[idx] = Some(value);
295        }
296    }
297
298    let values = values.into_iter().collect::<Option<Vec<_>>>()?;
299    let key = Tuple::new(index.key_schema.clone(), values);
300    Some((Bound::Included(key.clone()), Bound::Included(key)))
301}
302
303fn flattened_conjuncts(filters: &[Expr]) -> Vec<&Expr> {
304    let mut out = Vec::new();
305    for filter in filters {
306        flatten_conjunct(filter, &mut out);
307    }
308    out
309}
310
311fn flatten_conjunct<'a>(expr: &'a Expr, out: &mut Vec<&'a Expr>) {
312    if let Expr::Binary(BinaryExpr {
313        left,
314        op: BinaryOp::And,
315        right,
316    }) = expr
317    {
318        flatten_conjunct(left, out);
319        flatten_conjunct(right, out);
320    } else {
321        out.push(expr);
322    }
323}
324
325fn bounds_from_predicate(
326    predicate: &Expr,
327    column_name: &str,
328    key_schema: &crate::catalog::SchemaRef,
329) -> Option<(Bound<Tuple>, Bound<Tuple>)> {
330    let Expr::Binary(BinaryExpr { left, op, right }) = predicate else {
331        return None;
332    };
333    if let Some(value) = column_literal(left, right, column_name, key_schema) {
334        return bounds_for_op(*op, value);
335    }
336    if let Some(value) = column_literal(right, left, column_name, key_schema) {
337        return bounds_for_op(invert_comparison(*op)?, value);
338    }
339    None
340}
341
342fn column_literal(
343    column_expr: &Expr,
344    literal_expr: &Expr,
345    column_name: &str,
346    key_schema: &crate::catalog::SchemaRef,
347) -> Option<Tuple> {
348    let data_type = key_schema.columns[0].data_type;
349    let value = column_literal_value(column_expr, literal_expr, column_name, data_type)?;
350    Some(Tuple::new(key_schema.clone(), vec![value]))
351}
352
353fn equality_literal_from_predicate(
354    predicate: &Expr,
355    column_name: &str,
356    data_type: crate::catalog::DataType,
357) -> Option<ScalarValue> {
358    let Expr::Binary(BinaryExpr { left, op, right }) = predicate else {
359        return None;
360    };
361    if *op != BinaryOp::Eq {
362        return None;
363    }
364    column_literal_value(left, right, column_name, data_type)
365        .or_else(|| column_literal_value(right, left, column_name, data_type))
366}
367
368fn column_literal_value(
369    column_expr: &Expr,
370    literal_expr: &Expr,
371    column_name: &str,
372    data_type: crate::catalog::DataType,
373) -> Option<ScalarValue> {
374    let Expr::Column(ColumnExpr { name, .. }) = column_expr else {
375        return None;
376    };
377    if !name.eq_ignore_ascii_case(column_name) {
378        return None;
379    }
380    let Expr::Literal(Literal { value }) = literal_expr else {
381        return None;
382    };
383    value.cast_to(&data_type).ok()
384}
385
386fn bounds_for_op(op: BinaryOp, value: Tuple) -> Option<(Bound<Tuple>, Bound<Tuple>)> {
387    match op {
388        BinaryOp::Eq => Some((Bound::Included(value.clone()), Bound::Included(value))),
389        BinaryOp::Gt => Some((Bound::Excluded(value), Bound::Unbounded)),
390        BinaryOp::GtEq => Some((Bound::Included(value), Bound::Unbounded)),
391        BinaryOp::Lt => Some((Bound::Unbounded, Bound::Excluded(value))),
392        BinaryOp::LtEq => Some((Bound::Unbounded, Bound::Included(value))),
393        _ => None,
394    }
395}
396
397fn invert_comparison(op: BinaryOp) -> Option<BinaryOp> {
398    match op {
399        BinaryOp::Eq => Some(BinaryOp::Eq),
400        BinaryOp::Gt => Some(BinaryOp::Lt),
401        BinaryOp::GtEq => Some(BinaryOp::LtEq),
402        BinaryOp::Lt => Some(BinaryOp::Gt),
403        BinaryOp::LtEq => Some(BinaryOp::GtEq),
404        _ => None,
405    }
406}
407
408fn merge_lower(current: Bound<Tuple>, next: Bound<Tuple>) -> Bound<Tuple> {
409    match (current, next) {
410        (Bound::Unbounded, bound) | (bound, Bound::Unbounded) => bound,
411        (left, right) => {
412            let left_tuple = bound_tuple(&left);
413            let right_tuple = bound_tuple(&right);
414            match left_tuple.cmp(right_tuple) {
415                std::cmp::Ordering::Less => right,
416                std::cmp::Ordering::Greater => left,
417                std::cmp::Ordering::Equal => {
418                    if matches!(left, Bound::Excluded(_)) || matches!(right, Bound::Excluded(_)) {
419                        Bound::Excluded(left_tuple.clone())
420                    } else {
421                        Bound::Included(left_tuple.clone())
422                    }
423                }
424            }
425        }
426    }
427}
428
429fn merge_upper(current: Bound<Tuple>, next: Bound<Tuple>) -> Bound<Tuple> {
430    match (current, next) {
431        (Bound::Unbounded, bound) | (bound, Bound::Unbounded) => bound,
432        (left, right) => {
433            let left_tuple = bound_tuple(&left);
434            let right_tuple = bound_tuple(&right);
435            match left_tuple.cmp(right_tuple) {
436                std::cmp::Ordering::Less => left,
437                std::cmp::Ordering::Greater => right,
438                std::cmp::Ordering::Equal => {
439                    if matches!(left, Bound::Excluded(_)) || matches!(right, Bound::Excluded(_)) {
440                        Bound::Excluded(left_tuple.clone())
441                    } else {
442                        Bound::Included(left_tuple.clone())
443                    }
444                }
445            }
446        }
447    }
448}
449
450fn bound_tuple(bound: &Bound<Tuple>) -> &Tuple {
451    match bound {
452        Bound::Included(tuple) | Bound::Excluded(tuple) => tuple,
453        Bound::Unbounded => unreachable!("unbounded handled before bound_tuple"),
454    }
455}