use crate::catalog::Schema;
use std::sync::Arc;
use crate::expression::{BinaryExpr, BinaryOp, Expr};
use crate::plan::logical_plan::{
Aggregate, CreateIndex, CreateTable, DropIndex, DropTable, EmptyRelation, Filter, Insert, Join,
Limit, LogicalPlan, Project, Sort, TableScan, Values,
};
use crate::execution::physical_plan::{
PhysicalAggregate, PhysicalAnalyze, PhysicalCreateIndex, PhysicalCreateTable, PhysicalDelete,
PhysicalDropIndex, PhysicalDropTable, PhysicalEmpty, PhysicalFilter, PhysicalInsert,
PhysicalLimit, PhysicalNestedLoopJoin, PhysicalPlan, PhysicalProject, PhysicalSeqScan,
PhysicalSort, PhysicalUpdate, PhysicalValues,
};
#[derive(Debug, Default, Clone, Copy)]
pub struct PhysicalPlanner;
impl PhysicalPlanner {
pub fn new() -> Self {
Self
}
}
impl PhysicalPlanner {
pub fn create_physical_plan(&self, logical_plan: LogicalPlan) -> PhysicalPlan {
let logical_plan = Arc::new(logical_plan);
self.build_plan(logical_plan)
}
fn build_plan(&self, logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
let plan = match logical_plan.as_ref() {
LogicalPlan::CreateTable(CreateTable {
name,
columns,
if_not_exists,
}) => PhysicalPlan::CreateTable(PhysicalCreateTable::new(
name.clone(),
Schema::new(columns.clone()),
*if_not_exists,
)),
LogicalPlan::CreateIndex(CreateIndex {
index_name,
table,
table_schema,
columns,
}) => PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
index_name.clone(),
table.clone(),
table_schema.clone(),
columns.clone(),
)),
LogicalPlan::DropTable(DropTable { name, if_exists }) => {
PhysicalPlan::DropTable(PhysicalDropTable::new(name.clone(), *if_exists))
}
LogicalPlan::DropIndex(DropIndex {
name,
schema,
catalog,
if_exists,
}) => PhysicalPlan::DropIndex(PhysicalDropIndex::new(
name.clone(),
schema.clone(),
catalog.clone(),
*if_exists,
)),
LogicalPlan::Insert(Insert {
table,
table_schema,
projected_schema,
input,
}) => {
let input_physical_plan = self.build_plan(input.clone());
PhysicalPlan::Insert(PhysicalInsert::new(
table.clone(),
table_schema.clone(),
projected_schema.clone(),
Arc::new(input_physical_plan),
))
}
LogicalPlan::Values(Values { schema, values }) => {
PhysicalPlan::Values(PhysicalValues::new(schema.clone(), values.clone()))
}
LogicalPlan::Project(Project {
exprs,
input,
schema,
}) => {
let input_physical_plan = self.build_plan(input.clone());
PhysicalPlan::Project(PhysicalProject::new(
exprs.clone(),
schema.clone(),
Arc::new(input_physical_plan),
))
}
LogicalPlan::Filter(Filter { predicate, input }) => {
let input_physical_plan = self.build_plan(input.clone());
PhysicalPlan::Filter(PhysicalFilter::new(
predicate.clone(),
Arc::new(input_physical_plan),
))
}
LogicalPlan::TableScan(scan) => self.build_table_scan(scan),
LogicalPlan::Limit(Limit {
limit,
offset,
input,
}) => {
let input_physical_plan = self.build_plan((*input).clone());
PhysicalPlan::Limit(PhysicalLimit::new(
*limit,
*offset,
Arc::new(input_physical_plan),
))
}
LogicalPlan::Join(Join {
left,
right,
join_type,
condition,
schema,
}) => {
let left_physical_plan = self.build_plan((*left).clone());
let right_physical_plan = self.build_plan((*right).clone());
PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin::new(
*join_type,
condition.clone(),
Arc::new(left_physical_plan),
Arc::new(right_physical_plan),
schema.clone(),
))
}
LogicalPlan::Sort(Sort {
order_by: expr,
ref input,
limit: _,
}) => {
let input_physical_plan = self.build_plan(Arc::clone(input));
PhysicalPlan::Sort(PhysicalSort::new(
expr.clone(),
Arc::new(input_physical_plan),
))
}
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
schema,
}) => PhysicalPlan::Empty(PhysicalEmpty::new(
if *produce_one_row { 1 } else { 0 },
schema.clone(),
)),
LogicalPlan::Aggregate(Aggregate {
input,
group_exprs,
aggr_exprs,
schema,
}) => {
let input_physical_plan = self.build_plan(Arc::clone(input));
PhysicalPlan::Aggregate(PhysicalAggregate::new(
Arc::new(input_physical_plan),
group_exprs.clone(),
aggr_exprs.clone(),
schema.clone(),
))
}
LogicalPlan::Update(update) => PhysicalPlan::Update(PhysicalUpdate::new(
update.table.clone(),
update.table_schema.clone(),
update.assignments.clone(),
update.selection.clone(),
)),
LogicalPlan::Delete(delete) => PhysicalPlan::Delete(PhysicalDelete::new(
delete.table.clone(),
delete.table_schema.clone(),
delete.selection.clone(),
)),
LogicalPlan::Analyze(analyze) => {
PhysicalPlan::Analyze(PhysicalAnalyze::new(analyze.table.clone()))
}
LogicalPlan::BeginTransaction(_)
| LogicalPlan::CommitTransaction
| LogicalPlan::RollbackTransaction
| LogicalPlan::SetTransaction { .. } => {
PhysicalPlan::Empty(PhysicalEmpty::new(0, Schema::empty().into()))
}
};
plan
}
fn build_table_scan(&self, scan: &TableScan) -> PhysicalPlan {
let mut plan = self.new_seq_scan(scan);
if let Some(limit_value) = scan.limit {
plan = PhysicalPlan::Limit(PhysicalLimit::new(Some(limit_value), 0, Arc::new(plan)));
}
if let Some(predicate) = conjunction(&scan.filters) {
plan = PhysicalPlan::Filter(PhysicalFilter::new(predicate, Arc::new(plan)));
}
plan
}
}
impl PhysicalPlanner {
fn new_seq_scan(&self, scan: &TableScan) -> PhysicalPlan {
let op = PhysicalSeqScan::new(scan.table_ref.clone(), scan.table_schema.clone());
PhysicalPlan::SeqScan(op)
}
}
fn conjunction(predicates: &[Expr]) -> Option<Expr> {
let mut iter = predicates.iter();
let first = iter.next()?.clone();
Some(iter.fold(first, |acc, expr| {
Expr::Binary(BinaryExpr {
left: Box::new(acc),
op: BinaryOp::And,
right: Box::new(expr.clone()),
})
}))
}