use datafusion_common::tree_node::Transformed;
use std::sync::Arc;
use super::optimizer::PhysicalOptimizerRule;
use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{CsvExec, ParquetExec};
use crate::error::Result;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
};
#[derive(Default)]
pub struct Repartition {}
impl Repartition {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
fn optimize_partitions(
target_partitions: usize,
plan: Arc<dyn ExecutionPlan>,
is_root: bool,
can_reorder: bool,
would_benefit: bool,
repartition_file_scans: bool,
repartition_file_min_size: usize,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let new_plan = if plan.children().is_empty() {
Transformed::No(plan)
} else {
let children = plan
.children()
.iter()
.enumerate()
.map(|(idx, child)| {
let required_input_ordering =
plan_has_required_input_ordering(plan.as_ref());
let can_reorder_child = child.output_ordering().is_none()
|| (!required_input_ordering
&& (can_reorder || !plan.maintains_input_order()[idx]));
optimize_partitions(
target_partitions,
child.clone(),
false, can_reorder_child,
plan.benefits_from_input_partitioning(),
repartition_file_scans,
repartition_file_min_size,
)
.map(Transformed::into)
})
.collect::<Result<_>>()?;
with_new_children_if_necessary(plan, children)?
};
let (new_plan, transformed) = new_plan.into_pair();
let mut could_repartition = match new_plan.output_partitioning() {
RoundRobinBatch(x) => x < target_partitions,
UnknownPartitioning(x) => x < target_partitions,
Hash(_, _) => false,
};
let stats = new_plan.statistics();
if stats.is_exact {
could_repartition = could_repartition
&& stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
}
if is_root {
could_repartition = false;
}
let repartition_allowed = would_benefit && could_repartition && can_reorder;
if !repartition_allowed {
return Ok(if transformed {
Transformed::Yes(new_plan)
} else {
Transformed::No(new_plan)
});
}
if let Some(parquet_exec) = new_plan.as_any().downcast_ref::<ParquetExec>() {
if repartition_file_scans {
return Ok(Transformed::Yes(Arc::new(
parquet_exec
.get_repartitioned(target_partitions, repartition_file_min_size),
)));
}
}
if let Some(csv_exec) = new_plan.as_any().downcast_ref::<CsvExec>() {
if repartition_file_scans {
let repartitioned_exec_option =
csv_exec.get_repartitioned(target_partitions, repartition_file_min_size);
if let Some(repartitioned_exec) = repartitioned_exec_option {
return Ok(Transformed::Yes(Arc::new(repartitioned_exec)));
}
}
}
Ok(Transformed::Yes(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(target_partitions),
)?)))
}
fn plan_has_required_input_ordering(plan: &dyn ExecutionPlan) -> bool {
plan.required_input_ordering().iter().any(Option::is_some)
}
impl PhysicalOptimizerRule for Repartition {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.execution.target_partitions;
let enabled = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
if !enabled || target_partitions == 1 {
Ok(plan)
} else {
let is_root = true;
let can_reorder = plan.output_ordering().is_none();
let would_benefit = false;
optimize_partitions(
target_partitions,
plan.clone(),
is_root,
can_reorder,
would_benefit,
repartition_file_scans,
repartition_file_min_size,
)
.map(Transformed::into)
}
}
fn name(&self) -> &str {
"repartition"
}
fn schema_check(&self) -> bool {
true
}
}
#[cfg(test)]
#[ctor::ctor]
fn init() {
let _ = env_logger::try_init();
}
#[cfg(test)]
mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use super::*;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::expressions::{col, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics};
use datafusion_physical_expr::PhysicalSortRequirement;
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
}
fn scan_config(sorted: bool, single_file: bool) -> FileScanConfig {
let sort_exprs = vec![PhysicalSortExpr {
expr: col("c1", &schema()).unwrap(),
options: SortOptions::default(),
}];
let file_groups = if single_file {
vec![vec![PartitionedFile::new("x".to_string(), 100)]]
} else {
vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 200)],
]
};
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: if sorted { vec![sort_exprs] } else { vec![] },
infinite_source: false,
}
}
fn parquet_exec() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(scan_config(false, true), None, None))
}
fn csv_exec() -> Arc<CsvExec> {
Arc::new(CsvExec::new(
scan_config(false, true),
false,
b',',
FileCompressionType::UNCOMPRESSED,
))
}
fn parquet_exec_two_partitions() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(scan_config(false, false), None, None))
}
fn csv_exec_two_partitions() -> Arc<CsvExec> {
Arc::new(CsvExec::new(
scan_config(false, false),
false,
b',',
FileCompressionType::UNCOMPRESSED,
))
}
fn parquet_exec_sorted() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(scan_config(true, true), None, None))
}
fn csv_exec_sorted() -> Arc<CsvExec> {
Arc::new(CsvExec::new(
scan_config(true, true),
false,
b',',
FileCompressionType::UNCOMPRESSED,
))
}
fn parquet_exec_multiple_sorted() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(scan_config(true, false), None, None))
}
fn sort_preserving_merge_exec(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let expr = vec![PhysicalSortExpr {
expr: col("c1", &schema()).unwrap(),
options: arrow::compute::SortOptions::default(),
}];
Arc::new(SortPreservingMergeExec::new(expr, input))
}
fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
}
fn sort_exec(
input: Arc<dyn ExecutionPlan>,
preserve_partitioning: bool,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = vec![PhysicalSortExpr {
expr: col("c1", &schema()).unwrap(),
options: SortOptions::default(),
}];
let new_sort = SortExec::new(sort_exprs, input)
.with_preserve_partitioning(preserve_partitioning);
Arc::new(new_sort)
}
fn projection_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let exprs = vec![(col("c1", &schema()).unwrap(), "c1".to_string())];
Arc::new(ProjectionExec::try_new(exprs, input).unwrap())
}
fn aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let schema = schema();
Arc::new(
AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
input,
schema.clone(),
)
.unwrap(),
),
schema,
)
.unwrap(),
)
}
fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(
Arc::new(LocalLimitExec::new(input, 100)),
0,
Some(100),
))
}
fn limit_exec_with_skip(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(
Arc::new(LocalLimitExec::new(input, 100)),
5,
Some(100),
))
}
fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
Arc::new(UnionExec::new(input))
}
fn sort_required_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(SortRequiredExec::new(input))
}
fn trim_plan_display(plan: &str) -> Vec<&str> {
plan.split('\n')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect()
}
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024);
};
($EXPECTED_LINES: expr, $PLAN: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(Repartition::new()),
Arc::new(EnforceDistribution::new()),
Arc::new(EnforceSorting::new()),
];
let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| {
optimizer.optimize(plan, &config).unwrap()
});
let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
&expected_lines, &actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let plan = aggregate(parquet_exec());
let expected = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_deepest_node() -> Result<()> {
let plan = aggregate(filter_exec(parquet_exec()));
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_unsorted_limit() -> Result<()> {
let plan = limit_exec(filter_exec(parquet_exec()));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_unsorted_limit_with_skip() -> Result<()> {
let plan = limit_exec_with_skip(filter_exec(parquet_exec()));
let expected = &[
"GlobalLimitExec: skip=5, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_sorted_limit() -> Result<()> {
let plan = limit_exec(sort_exec(parquet_exec(), false));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c1@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_sorted_limit_with_filter() -> Result<()> {
let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false)));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"SortExec: expr=[c1@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_ignores_limit() -> Result<()> {
let plan = aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_ignores_limit_with_skip() -> Result<()> {
let plan = aggregate(limit_exec_with_skip(filter_exec(limit_exec(
parquet_exec(),
))));
let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=5, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_ignores_union() -> Result<()> {
let plan = union_exec(vec![parquet_exec(); 5]);
let expected = &[
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_through_sort_preserving_merge() -> Result<()> {
let plan = sort_preserving_merge_exec(parquet_exec());
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_ignores_sort_preserving_merge() -> Result<()> {
let plan = sort_preserving_merge_exec(parquet_exec_multiple_sorted());
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
let input = union_exec(vec![parquet_exec_sorted(); 2]);
let plan = sort_preserving_merge_exec(input);
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_does_not_destroy_sort() -> Result<()> {
let plan = sort_required_exec(parquet_exec_sorted());
let expected = &[
"SortRequiredExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_does_not_destroy_sort_more_complex() -> Result<()> {
let input1 = sort_required_exec(parquet_exec_sorted());
let input2 = filter_exec(parquet_exec());
let plan = union_exec(vec![input1, input2]);
let expected = &[
"UnionExec",
"SortRequiredExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_transitively_with_projection() -> Result<()> {
let plan = sort_preserving_merge_exec(projection_exec(parquet_exec()));
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_ignores_transitively_with_projection() -> Result<()> {
let plan =
sort_preserving_merge_exec(projection_exec(parquet_exec_multiple_sorted()));
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_transitively_past_sort_with_projection() -> Result<()> {
let plan =
sort_preserving_merge_exec(sort_exec(projection_exec(parquet_exec()), true));
let expected = &[
"SortExec: expr=[c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_transitively_past_sort_with_filter() -> Result<()> {
let plan =
sort_preserving_merge_exec(sort_exec(filter_exec(parquet_exec()), true));
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> {
let plan = sort_preserving_merge_exec(sort_exec(
projection_exec(filter_exec(parquet_exec())),
true,
));
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"SortExec: expr=[c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
assert_optimized!(expected, plan);
Ok(())
}
#[test]
fn parallelization_single_partition() -> Result<()> {
let plan_parquet = aggregate(parquet_exec());
let plan_csv = aggregate(csv_exec());
let expected_parquet = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"ParquetExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
];
let expected_csv = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"CsvExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_compressed_csv() -> Result<()> {
let compression_types = [
FileCompressionType::GZIP,
FileCompressionType::BZIP2,
FileCompressionType::XZ,
FileCompressionType::ZSTD,
FileCompressionType::UNCOMPRESSED,
];
let expected_not_partitioned = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
];
let expected_partitioned = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"CsvExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1], has_header=false",
];
for compression_type in compression_types {
let expected = if compression_type.is_compressed() {
&expected_not_partitioned[..]
} else {
&expected_partitioned[..]
};
let plan = aggregate(Arc::new(CsvExec::new(
scan_config(false, true),
false,
b',',
compression_type,
)));
assert_optimized!(expected, plan, 2, true, 10);
}
Ok(())
}
#[test]
fn parallelization_two_partitions() -> Result<()> {
let plan_parquet = aggregate(parquet_exec_two_partitions());
let plan_csv = aggregate(csv_exec_two_partitions());
let expected_parquet = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[c1]",
];
let expected_csv = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[c1], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_two_partitions_into_four() -> Result<()> {
let plan_parquet = aggregate(parquet_exec_two_partitions());
let plan_csv = aggregate(csv_exec_two_partitions());
let expected_parquet = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"ParquetExec: file_groups={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1]",
];
let expected_csv = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"CsvExec: file_groups={4 groups: [[x:0..75], [x:75..100, y:0..50], [y:50..125], [y:125..200]]}, projection=[c1], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 4, true, 10);
assert_optimized!(expected_csv, plan_csv, 4, true, 10);
Ok(())
}
#[test]
fn parallelization_sorted_limit() -> Result<()> {
let plan_parquet = limit_exec(sort_exec(parquet_exec(), false));
let plan_csv = limit_exec(sort_exec(csv_exec(), false));
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c1@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c1@0 ASC]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_limit_with_filter() -> Result<()> {
let plan_parquet = limit_exec(filter_exec(sort_exec(parquet_exec(), false)));
let plan_csv = limit_exec(filter_exec(sort_exec(csv_exec(), false)));
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"SortExec: expr=[c1@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"SortExec: expr=[c1@0 ASC]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_ignores_limit() -> Result<()> {
let plan_parquet = aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));
let plan_csv = aggregate(limit_exec(filter_exec(limit_exec(csv_exec()))));
let expected_parquet = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
let expected_csv = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_union_inputs() -> Result<()> {
let plan_parquet = union_exec(vec![parquet_exec(); 5]);
let plan_csv = union_exec(vec![csv_exec(); 5]);
let expected_parquet = &[
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1]",
];
let expected_csv = &[
"UnionExec",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_prior_to_sort_preserving_merge() -> Result<()> {
let plan_parquet = sort_preserving_merge_exec(parquet_exec_sorted());
let plan_csv = sort_preserving_merge_exec(csv_exec_sorted());
let expected_parquet = &[
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
let expected_csv = &[
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_sort_preserving_merge_with_union() -> Result<()> {
let input_parquet = union_exec(vec![parquet_exec_sorted(); 2]);
let input_csv = union_exec(vec![csv_exec_sorted(); 2]);
let plan_parquet = sort_preserving_merge_exec(input_parquet);
let plan_csv = sort_preserving_merge_exec(input_csv);
let expected_parquet = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
let expected_csv = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_does_not_destroy_sort() -> Result<()> {
let plan_parquet = sort_required_exec(parquet_exec_sorted());
let plan_csv = sort_required_exec(csv_exec_sorted());
let expected_parquet = &[
"SortRequiredExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
let expected_csv = &[
"SortRequiredExec",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_ignores_transitively_with_projection() -> Result<()> {
let plan_parquet =
sort_preserving_merge_exec(projection_exec(parquet_exec_sorted()));
let plan_csv = sort_preserving_merge_exec(projection_exec(csv_exec_sorted()));
let expected_parquet = &[
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC]",
];
let expected_csv = &[
"ProjectionExec: expr=[c1@0 as c1]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[c1], output_ordering=[c1@0 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, 2, true, 10);
Ok(())
}
#[derive(Debug)]
struct SortRequiredExec {
input: Arc<dyn ExecutionPlan>,
}
impl SortRequiredExec {
fn new(input: Arc<dyn ExecutionPlan>) -> Self {
Self { input }
}
}
impl DisplayAs for SortRequiredExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "SortRequiredExec")
}
}
impl ExecutionPlan for SortRequiredExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
self.input.schema()
}
fn output_partitioning(&self) -> crate::physical_plan::Partitioning {
self.input.output_partitioning()
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![self
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
assert_eq!(children.len(), 1);
let child = children.pop().unwrap();
Ok(Arc::new(Self::new(child)))
}
fn execute(
&self,
_partition: usize,
_context: Arc<crate::execution::context::TaskContext>,
) -> Result<crate::physical_plan::SendableRecordBatchStream> {
unreachable!();
}
fn statistics(&self) -> Statistics {
self.input.statistics()
}
}
}