use super::optimizer::PhysicalOptimizerRule;
use crate::{
error::Result,
physical_plan::{merge::MergeExec, Distribution},
};
use std::sync::Arc;
pub struct AddMergeExec {}
impl AddMergeExec {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for AddMergeExec {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::ExecutionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if plan.children().is_empty() {
Ok(plan.clone())
} else {
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
match plan.required_child_distribution() {
Distribution::UnspecifiedDistribution => plan.with_new_children(children),
Distribution::SinglePartition => plan.with_new_children(
children
.iter()
.map(|child| {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(MergeExec::new(child.clone()))
}
})
.collect(),
),
}
}
}
fn name(&self) -> &str {
"add_merge_exec"
}
}