use crate::error::DbxResult;
use crate::sql::optimizer::OptimizationRule;
use crate::sql::planner::LogicalPlan;
use crate::storage::metadata::MetadataRegistry;
use std::sync::Arc;
pub struct TierPruningRule {
registry: Arc<MetadataRegistry>,
}
impl TierPruningRule {
pub fn new(registry: Arc<MetadataRegistry>) -> Self {
Self { registry }
}
}
impl OptimizationRule for TierPruningRule {
fn name(&self) -> &str {
"TierPruning"
}
fn apply(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
self.prune(plan)
}
}
impl TierPruningRule {
fn prune(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
match plan {
LogicalPlan::Scan {
table,
columns,
filter,
ros_files: _, } => {
let mut valid_ros_files = vec![];
let partitions = self
.registry
.tables
.get(&table)
.map(|t| {
t.partitions
.iter()
.map(|kv| kv.value().clone())
.collect::<Vec<_>>()
})
.unwrap_or_default();
for partition in partitions {
if partition.tier == crate::storage::metadata::StorageTier::DiskROS {
valid_ros_files.push(partition.file_path.clone());
}
}
Ok(LogicalPlan::Scan {
table,
columns,
filter,
ros_files: valid_ros_files,
})
}
LogicalPlan::Project { input, projections } => Ok(LogicalPlan::Project {
input: Box::new(self.prune(*input)?),
projections,
}),
LogicalPlan::Filter { input, predicate } => Ok(LogicalPlan::Filter {
input: Box::new(self.prune(*input)?),
predicate,
}),
LogicalPlan::Sort { input, order_by } => Ok(LogicalPlan::Sort {
input: Box::new(self.prune(*input)?),
order_by,
}),
LogicalPlan::Limit {
input,
count,
offset,
} => Ok(LogicalPlan::Limit {
input: Box::new(self.prune(*input)?),
count,
offset,
}),
LogicalPlan::Aggregate {
input,
group_by,
aggregates,
mode,
} => Ok(LogicalPlan::Aggregate {
input: Box::new(self.prune(*input)?),
group_by,
aggregates,
mode,
}),
LogicalPlan::Join {
left,
right,
join_type,
on,
} => Ok(LogicalPlan::Join {
left: Box::new(self.prune(*left)?),
right: Box::new(self.prune(*right)?),
join_type,
on,
}),
other => Ok(other),
}
}
}