use super::OptimizationRule;
use crate::error::DbxResult;
use crate::sql::planner::{AggregateMode, LogicalPlan};
pub struct DistributedPushdownRule;
impl OptimizationRule for DistributedPushdownRule {
fn name(&self) -> &str {
"DistributedPushdown"
}
fn apply(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
self.push_down(plan)
}
}
impl DistributedPushdownRule {
fn push_down(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
match plan {
LogicalPlan::Project { input, projections } => Ok(LogicalPlan::Project {
input: Box::new(self.push_down(*input)?),
projections,
}),
LogicalPlan::Filter { input, predicate } => Ok(LogicalPlan::Filter {
input: Box::new(self.push_down(*input)?),
predicate,
}),
LogicalPlan::Sort { input, order_by } => Ok(LogicalPlan::Sort {
input: Box::new(self.push_down(*input)?),
order_by,
}),
LogicalPlan::Limit {
input,
count,
offset,
} => Ok(LogicalPlan::Limit {
input: Box::new(self.push_down(*input)?),
count,
offset,
}),
LogicalPlan::Aggregate {
input,
group_by,
aggregates,
mode,
} => {
let pushed_input = self.push_down(*input)?;
if mode == AggregateMode::Simple {
let partial_agg = LogicalPlan::Aggregate {
input: Box::new(pushed_input),
group_by: group_by.clone(),
aggregates: aggregates.clone(),
mode: AggregateMode::Partial,
};
Ok(LogicalPlan::Aggregate {
input: Box::new(partial_agg),
group_by,
aggregates,
mode: AggregateMode::Final,
})
} else {
Ok(LogicalPlan::Aggregate {
input: Box::new(pushed_input),
group_by,
aggregates,
mode,
})
}
}
LogicalPlan::Join {
left,
right,
join_type,
on,
} => Ok(LogicalPlan::Join {
left: Box::new(self.push_down(*left)?),
right: Box::new(self.push_down(*right)?),
join_type,
on,
}),
other => Ok(other),
}
}
}