use std::sync::Arc;
use super::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::{
empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan,
};
use crate::physical_plan::{Distribution, Partitioning::*};
use crate::{error::Result, execution::context::ExecutionConfig};
pub struct Repartition {}
impl Repartition {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
fn optimize_concurrency(
concurrency: usize,
requires_single_partition: bool,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let new_plan = if plan.children().is_empty() {
plan.clone()
} else {
let children = plan
.children()
.iter()
.map(|child| {
optimize_concurrency(
concurrency,
plan.required_child_distribution() == Distribution::SinglePartition,
child.clone(),
)
})
.collect::<Result<_>>()?;
plan.with_new_children(children)?
};
let perform_repartition = match new_plan.output_partitioning() {
RoundRobinBatch(x) => x < concurrency,
UnknownPartitioning(x) => x < concurrency,
Hash(_, _) => false,
};
let is_empty_exec = plan.as_any().downcast_ref::<EmptyExec>().is_some();
if perform_repartition && !requires_single_partition && !is_empty_exec {
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(concurrency),
)?))
} else {
Ok(new_plan)
}
}
impl PhysicalOptimizerRule for Repartition {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ExecutionConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
if config.concurrency == 1 {
Ok(plan)
} else {
optimize_concurrency(config.concurrency, true, plan)
}
}
fn name(&self) -> &str {
"repartition"
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::Schema;
use super::*;
use crate::datasource::datasource::Statistics;
use crate::physical_plan::parquet::{ParquetExec, ParquetPartition};
use crate::physical_plan::projection::ProjectionExec;
#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition {
filenames: vec!["x".to_string()],
statistics: Statistics::default(),
}],
Schema::empty(),
None,
None,
2048,
None,
)),
)?;
let optimizer = Repartition {};
let optimized = optimizer.optimize(
Arc::new(parquet_project),
&ExecutionConfig::new().with_concurrency(10),
)?;
assert_eq!(
optimized.children()[0]
.output_partitioning()
.partition_count(),
10
);
Ok(())
}
#[test]
fn repartition_deepest_node() -> Result<()> {
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
vec![ParquetPartition {
filenames: vec!["x".to_string()],
statistics: Statistics::default(),
}],
Schema::empty(),
None,
None,
2048,
None,
)),
)?),
)?;
let optimizer = Repartition {};
let optimized = optimizer.optimize(
Arc::new(parquet_project),
&ExecutionConfig::new().with_concurrency(10),
)?;
assert!(optimized.children()[0]
.as_any()
.downcast_ref::<RepartitionExec>()
.is_none());
assert!(optimized.children()[0].children()[0]
.as_any()
.downcast_ref::<RepartitionExec>()
.is_some());
Ok(())
}
}