quill-sql 0.2.1

An educational Rust relational database (RDBMS) inspired by CMU 15445
Documentation
use crate::catalog::Schema;
use std::sync::Arc;

use crate::expression::{BinaryExpr, BinaryOp, Expr};
use crate::plan::logical_plan::{
    Aggregate, CreateIndex, CreateTable, DropIndex, DropTable, EmptyRelation, Filter, Insert, Join,
    Limit, LogicalPlan, Project, Sort, TableScan, Values,
};

use crate::execution::physical_plan::{
    PhysicalAggregate, PhysicalAnalyze, PhysicalCreateIndex, PhysicalCreateTable, PhysicalDelete,
    PhysicalDropIndex, PhysicalDropTable, PhysicalEmpty, PhysicalFilter, PhysicalInsert,
    PhysicalLimit, PhysicalNestedLoopJoin, PhysicalPlan, PhysicalProject, PhysicalSeqScan,
    PhysicalSort, PhysicalUpdate, PhysicalValues,
};

#[derive(Debug, Default, Clone, Copy)]
pub struct PhysicalPlanner;

impl PhysicalPlanner {
    pub fn new() -> Self {
        Self
    }
}

impl PhysicalPlanner {
    pub fn create_physical_plan(&self, logical_plan: LogicalPlan) -> PhysicalPlan {
        let logical_plan = Arc::new(logical_plan);
        self.build_plan(logical_plan)
    }

    fn build_plan(&self, logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
        let plan = match logical_plan.as_ref() {
            LogicalPlan::CreateTable(CreateTable {
                name,
                columns,
                if_not_exists,
            }) => PhysicalPlan::CreateTable(PhysicalCreateTable::new(
                name.clone(),
                Schema::new(columns.clone()),
                *if_not_exists,
            )),
            LogicalPlan::CreateIndex(CreateIndex {
                index_name,
                table,
                table_schema,
                columns,
            }) => PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
                index_name.clone(),
                table.clone(),
                table_schema.clone(),
                columns.clone(),
            )),
            LogicalPlan::DropTable(DropTable { name, if_exists }) => {
                PhysicalPlan::DropTable(PhysicalDropTable::new(name.clone(), *if_exists))
            }
            LogicalPlan::DropIndex(DropIndex {
                name,
                schema,
                catalog,
                if_exists,
            }) => PhysicalPlan::DropIndex(PhysicalDropIndex::new(
                name.clone(),
                schema.clone(),
                catalog.clone(),
                *if_exists,
            )),
            LogicalPlan::Insert(Insert {
                table,
                table_schema,
                projected_schema,
                input,
            }) => {
                let input_physical_plan = self.build_plan(input.clone());
                PhysicalPlan::Insert(PhysicalInsert::new(
                    table.clone(),
                    table_schema.clone(),
                    projected_schema.clone(),
                    Arc::new(input_physical_plan),
                ))
            }
            LogicalPlan::Values(Values { schema, values }) => {
                PhysicalPlan::Values(PhysicalValues::new(schema.clone(), values.clone()))
            }
            LogicalPlan::Project(Project {
                exprs,
                input,
                schema,
            }) => {
                let input_physical_plan = self.build_plan(input.clone());
                PhysicalPlan::Project(PhysicalProject::new(
                    exprs.clone(),
                    schema.clone(),
                    Arc::new(input_physical_plan),
                ))
            }
            LogicalPlan::Filter(Filter { predicate, input }) => {
                let input_physical_plan = self.build_plan(input.clone());
                PhysicalPlan::Filter(PhysicalFilter::new(
                    predicate.clone(),
                    Arc::new(input_physical_plan),
                ))
            }
            LogicalPlan::TableScan(scan) => self.build_table_scan(scan),
            LogicalPlan::Limit(Limit {
                limit,
                offset,
                input,
            }) => {
                let input_physical_plan = self.build_plan((*input).clone());
                PhysicalPlan::Limit(PhysicalLimit::new(
                    *limit,
                    *offset,
                    Arc::new(input_physical_plan),
                ))
            }
            LogicalPlan::Join(Join {
                left,
                right,
                join_type,
                condition,
                schema,
            }) => {
                let left_physical_plan = self.build_plan((*left).clone());
                let right_physical_plan = self.build_plan((*right).clone());
                PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
                    *join_type,
                    condition.clone(),
                    Arc::new(left_physical_plan),
                    Arc::new(right_physical_plan),
                    schema.clone(),
                ))
            }
            LogicalPlan::Sort(Sort {
                order_by: expr,
                ref input,
                limit: _,
            }) => {
                // TODO limit
                let input_physical_plan = self.build_plan(Arc::clone(input));
                PhysicalPlan::Sort(PhysicalSort::new(
                    expr.clone(),
                    Arc::new(input_physical_plan),
                ))
            }
            LogicalPlan::EmptyRelation(EmptyRelation {
                produce_one_row,
                schema,
            }) => PhysicalPlan::Empty(PhysicalEmpty::new(
                if *produce_one_row { 1 } else { 0 },
                schema.clone(),
            )),
            LogicalPlan::Aggregate(Aggregate {
                input,
                group_exprs,
                aggr_exprs,
                schema,
            }) => {
                let input_physical_plan = self.build_plan(Arc::clone(input));
                PhysicalPlan::Aggregate(PhysicalAggregate::new(
                    Arc::new(input_physical_plan),
                    group_exprs.clone(),
                    aggr_exprs.clone(),
                    schema.clone(),
                ))
            }
            LogicalPlan::Update(update) => PhysicalPlan::Update(PhysicalUpdate::new(
                update.table.clone(),
                update.table_schema.clone(),
                update.assignments.clone(),
                update.selection.clone(),
            )),
            LogicalPlan::Delete(delete) => PhysicalPlan::Delete(PhysicalDelete::new(
                delete.table.clone(),
                delete.table_schema.clone(),
                delete.selection.clone(),
            )),
            LogicalPlan::Analyze(analyze) => {
                PhysicalPlan::Analyze(PhysicalAnalyze::new(analyze.table.clone()))
            }
            LogicalPlan::BeginTransaction(_)
            | LogicalPlan::CommitTransaction
            | LogicalPlan::RollbackTransaction
            | LogicalPlan::SetTransaction { .. } => {
                PhysicalPlan::Empty(PhysicalEmpty::new(0, Schema::empty().into()))
            }
        };
        plan
    }

    fn build_table_scan(&self, scan: &TableScan) -> PhysicalPlan {
        let mut plan = self.new_seq_scan(scan);

        if let Some(limit_value) = scan.limit {
            plan = PhysicalPlan::Limit(PhysicalLimit::new(Some(limit_value), 0, Arc::new(plan)));
        }

        if let Some(predicate) = conjunction(&scan.filters) {
            plan = PhysicalPlan::Filter(PhysicalFilter::new(predicate, Arc::new(plan)));
        }

        plan
    }
}

impl PhysicalPlanner {
    fn new_seq_scan(&self, scan: &TableScan) -> PhysicalPlan {
        let op = PhysicalSeqScan::new(scan.table_ref.clone(), scan.table_schema.clone());
        PhysicalPlan::SeqScan(op)
    }
}

fn conjunction(predicates: &[Expr]) -> Option<Expr> {
    let mut iter = predicates.iter();
    let first = iter.next()?.clone();
    Some(iter.fold(first, |acc, expr| {
        Expr::Binary(BinaryExpr {
            left: Box::new(acc),
            op: BinaryOp::And,
            right: Box::new(expr.clone()),
        })
    }))
}