use std::fmt::Debug;
use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above_with_check, is_coalesce_partitions, is_repartition,
is_sort_preserving_merge,
};
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::joins::{
CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::tree_node::PlanContext;
use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
};
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use itertools::izip;
#[derive(Default)]
pub struct EnforceDistribution {}
impl EnforceDistribution {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for EnforceDistribution {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
let adjusted = if top_down_join_key_reordering {
let plan_requirements = PlanWithKeyRequirements::new_default(plan);
let adjusted = plan_requirements
.transform_down(adjust_input_keys_ordering)
.data()?;
adjusted.plan
} else {
plan.transform_up(|plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})
.data()?
};
let distribution_context = DistributionContext::new_default(adjusted);
let distribution_context = distribution_context
.transform_up(|distribution_context| {
ensure_distribution(distribution_context, config)
})
.data()?;
Ok(distribution_context.plan)
}
fn name(&self) -> &str {
"EnforceDistribution"
}
fn schema_check(&self) -> bool {
true
}
}
fn adjust_input_keys_ordering(
mut requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
let plan = requirements.plan.clone();
if let Some(HashJoinExec {
left,
right,
on,
filter,
join_type,
projection,
mode,
null_equals_null,
..
}) = plan.as_any().downcast_ref::<HashJoinExec>()
{
match mode {
PartitionMode::Partitioned => {
let join_constructor = |new_conditions: (
Vec<(PhysicalExprRef, PhysicalExprRef)>,
Vec<SortOptions>,
)| {
HashJoinExec::try_new(
left.clone(),
right.clone(),
new_conditions.0,
filter.clone(),
join_type,
projection.clone(),
PartitionMode::Partitioned,
*null_equals_null,
)
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
on,
vec![],
&join_constructor,
)
.map(Transformed::yes);
}
PartitionMode::CollectLeft => {
requirements.children[1].data = match join_type {
JoinType::Inner | JoinType::Right => shift_right_required(
&requirements.data,
left.schema().fields().len(),
)
.unwrap_or_default(),
JoinType::RightSemi | JoinType::RightAnti => {
requirements.data.clone()
}
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::Full => vec![],
};
}
PartitionMode::Auto => {
requirements.data.clear();
}
}
} else if let Some(CrossJoinExec { left, .. }) =
plan.as_any().downcast_ref::<CrossJoinExec>()
{
let left_columns_len = left.schema().fields().len();
requirements.children[1].data =
shift_right_required(&requirements.data, left_columns_len)
.unwrap_or_default();
} else if let Some(SortMergeJoinExec {
left,
right,
on,
filter,
join_type,
sort_options,
null_equals_null,
..
}) = plan.as_any().downcast_ref::<SortMergeJoinExec>()
{
let join_constructor = |new_conditions: (
Vec<(PhysicalExprRef, PhysicalExprRef)>,
Vec<SortOptions>,
)| {
SortMergeJoinExec::try_new(
left.clone(),
right.clone(),
new_conditions.0,
filter.clone(),
*join_type,
new_conditions.1,
*null_equals_null,
)
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
on,
sort_options.clone(),
&join_constructor,
)
.map(Transformed::yes);
} else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
if !requirements.data.is_empty() {
if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
return reorder_aggregate_keys(requirements, aggregate_exec)
.map(Transformed::yes);
} else {
requirements.data.clear();
}
} else {
return Ok(Transformed::no(requirements));
}
} else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() {
let expr = proj.expr();
let new_required = map_columns_before_projection(&requirements.data, expr);
if new_required.len() == requirements.data.len() {
requirements.children[0].data = new_required;
} else {
requirements.data.clear();
}
} else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
|| plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
|| plan.as_any().downcast_ref::<WindowAggExec>().is_some()
{
requirements.data.clear();
} else {
for child in requirements.children.iter_mut() {
child.data.clone_from(&requirements.data);
}
}
Ok(Transformed::yes(requirements))
}
fn reorder_partitioned_join_keys<F>(
mut join_plan: PlanWithKeyRequirements,
on: &[(PhysicalExprRef, PhysicalExprRef)],
sort_options: Vec<SortOptions>,
join_constructor: &F,
) -> Result<PlanWithKeyRequirements>
where
F: Fn(
(Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
) -> Result<Arc<dyn ExecutionPlan>>,
{
let parent_required = &join_plan.data;
let join_key_pairs = extract_join_keys(on);
let eq_properties = join_plan.plan.equivalence_properties();
let (
JoinKeyPairs {
left_keys,
right_keys,
},
positions,
) = try_reorder(join_key_pairs, parent_required, eq_properties);
if let Some(positions) = positions {
if !positions.is_empty() {
let new_join_on = new_join_conditions(&left_keys, &right_keys);
let new_sort_options = (0..sort_options.len())
.map(|idx| sort_options[positions[idx]])
.collect();
join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
}
}
join_plan.children[0].data = left_keys;
join_plan.children[1].data = right_keys;
Ok(join_plan)
}
fn reorder_aggregate_keys(
mut agg_node: PlanWithKeyRequirements,
agg_exec: &AggregateExec,
) -> Result<PlanWithKeyRequirements> {
let parent_required = &agg_node.data;
let output_columns = agg_exec
.group_expr()
.expr()
.iter()
.enumerate()
.map(|(index, (_, name))| Column::new(name, index))
.collect::<Vec<_>>();
let output_exprs = output_columns
.iter()
.map(|c| Arc::new(c.clone()) as _)
.collect::<Vec<_>>();
if parent_required.len() == output_exprs.len()
&& agg_exec.group_expr().null_expr().is_empty()
&& !physical_exprs_equal(&output_exprs, parent_required)
{
if let Some(positions) = expected_expr_positions(&output_exprs, parent_required) {
if let Some(agg_exec) =
agg_exec.input().as_any().downcast_ref::<AggregateExec>()
{
if matches!(agg_exec.mode(), &AggregateMode::Partial) {
let group_exprs = agg_exec.group_expr().expr();
let new_group_exprs = positions
.into_iter()
.map(|idx| group_exprs[idx].clone())
.collect();
let partial_agg = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::new_single(new_group_exprs),
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
agg_exec.input().clone(),
agg_exec.input_schema.clone(),
)?);
let group_exprs = partial_agg.group_expr().expr();
let new_group_by = PhysicalGroupBy::new_single(
partial_agg
.output_group_expr()
.into_iter()
.enumerate()
.map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
.collect(),
);
let new_final_agg = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
new_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
partial_agg.clone(),
agg_exec.input_schema(),
)?);
agg_node.plan = new_final_agg.clone();
agg_node.data.clear();
agg_node.children = vec![PlanWithKeyRequirements::new(
partial_agg as _,
vec![],
agg_node.children.swap_remove(0).children,
)];
let agg_schema = new_final_agg.schema();
let mut proj_exprs = output_columns
.iter()
.map(|col| {
let name = col.name();
let index = agg_schema.index_of(name)?;
Ok((Arc::new(Column::new(name, index)) as _, name.to_owned()))
})
.collect::<Result<Vec<_>>>()?;
let agg_fields = agg_schema.fields();
for (idx, field) in
agg_fields.iter().enumerate().skip(output_columns.len())
{
let name = field.name();
let plan = Arc::new(Column::new(name, idx)) as _;
proj_exprs.push((plan, name.clone()))
}
return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| {
PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node])
});
}
}
}
}
Ok(agg_node)
}
fn shift_right_required(
parent_required: &[Arc<dyn PhysicalExpr>],
left_columns_len: usize,
) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
let new_right_required = parent_required
.iter()
.filter_map(|r| {
r.as_any().downcast_ref::<Column>().and_then(|col| {
col.index()
.checked_sub(left_columns_len)
.map(|index| Arc::new(Column::new(col.name(), index)) as _)
})
})
.collect::<Vec<_>>();
(new_right_required.len() == parent_required.len()).then_some(new_right_required)
}
pub(crate) fn reorder_join_keys_to_inputs(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_any = plan.as_any();
if let Some(HashJoinExec {
left,
right,
on,
filter,
join_type,
projection,
mode,
null_equals_null,
..
}) = plan_any.downcast_ref::<HashJoinExec>()
{
if matches!(mode, PartitionMode::Partitioned) {
let (join_keys, positions) = reorder_current_join_keys(
extract_join_keys(on),
Some(left.output_partitioning()),
Some(right.output_partitioning()),
left.equivalence_properties(),
right.equivalence_properties(),
);
if positions.map_or(false, |idxs| !idxs.is_empty()) {
let JoinKeyPairs {
left_keys,
right_keys,
} = join_keys;
let new_join_on = new_join_conditions(&left_keys, &right_keys);
return Ok(Arc::new(HashJoinExec::try_new(
left.clone(),
right.clone(),
new_join_on,
filter.clone(),
join_type,
projection.clone(),
PartitionMode::Partitioned,
*null_equals_null,
)?));
}
}
} else if let Some(SortMergeJoinExec {
left,
right,
on,
filter,
join_type,
sort_options,
null_equals_null,
..
}) = plan_any.downcast_ref::<SortMergeJoinExec>()
{
let (join_keys, positions) = reorder_current_join_keys(
extract_join_keys(on),
Some(left.output_partitioning()),
Some(right.output_partitioning()),
left.equivalence_properties(),
right.equivalence_properties(),
);
if let Some(positions) = positions {
if !positions.is_empty() {
let JoinKeyPairs {
left_keys,
right_keys,
} = join_keys;
let new_join_on = new_join_conditions(&left_keys, &right_keys);
let new_sort_options = (0..sort_options.len())
.map(|idx| sort_options[positions[idx]])
.collect();
return SortMergeJoinExec::try_new(
left.clone(),
right.clone(),
new_join_on,
filter.clone(),
*join_type,
new_sort_options,
*null_equals_null,
)
.map(|smj| Arc::new(smj) as _);
}
}
}
Ok(plan)
}
fn reorder_current_join_keys(
join_keys: JoinKeyPairs,
left_partition: Option<&Partitioning>,
right_partition: Option<&Partitioning>,
left_equivalence_properties: &EquivalenceProperties,
right_equivalence_properties: &EquivalenceProperties,
) -> (JoinKeyPairs, Option<Vec<usize>>) {
match (left_partition, right_partition) {
(Some(Partitioning::Hash(left_exprs, _)), _) => {
match try_reorder(join_keys, left_exprs, left_equivalence_properties) {
(join_keys, None) => reorder_current_join_keys(
join_keys,
None,
right_partition,
left_equivalence_properties,
right_equivalence_properties,
),
result => result,
}
}
(_, Some(Partitioning::Hash(right_exprs, _))) => {
try_reorder(join_keys, right_exprs, right_equivalence_properties)
}
_ => (join_keys, None),
}
}
fn try_reorder(
join_keys: JoinKeyPairs,
expected: &[Arc<dyn PhysicalExpr>],
equivalence_properties: &EquivalenceProperties,
) -> (JoinKeyPairs, Option<Vec<usize>>) {
let eq_groups = equivalence_properties.eq_group();
let mut normalized_expected = vec![];
let mut normalized_left_keys = vec![];
let mut normalized_right_keys = vec![];
if join_keys.left_keys.len() != expected.len() {
return (join_keys, None);
}
if physical_exprs_equal(expected, &join_keys.left_keys)
|| physical_exprs_equal(expected, &join_keys.right_keys)
{
return (join_keys, Some(vec![]));
} else if !equivalence_properties.eq_group().is_empty() {
normalized_expected = expected
.iter()
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect();
normalized_left_keys = join_keys
.left_keys
.iter()
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect();
normalized_right_keys = join_keys
.right_keys
.iter()
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect();
if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
|| physical_exprs_equal(&normalized_expected, &normalized_right_keys)
{
return (join_keys, Some(vec![]));
}
}
let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected)
.or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
.or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
.or_else(|| {
expected_expr_positions(&normalized_right_keys, &normalized_expected)
})
else {
return (join_keys, None);
};
let mut new_left_keys = vec![];
let mut new_right_keys = vec![];
for pos in positions.iter() {
new_left_keys.push(join_keys.left_keys[*pos].clone());
new_right_keys.push(join_keys.right_keys[*pos].clone());
}
let pairs = JoinKeyPairs {
left_keys: new_left_keys,
right_keys: new_right_keys,
};
(pairs, Some(positions))
}
fn expected_expr_positions(
current: &[Arc<dyn PhysicalExpr>],
expected: &[Arc<dyn PhysicalExpr>],
) -> Option<Vec<usize>> {
if current.is_empty() || expected.is_empty() {
return None;
}
let mut indexes: Vec<usize> = vec![];
let mut current = current.to_vec();
for expr in expected.iter() {
if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
current[expected_position] = Arc::new(NoOp::new());
indexes.push(expected_position);
} else {
return None;
}
}
Some(indexes)
}
fn extract_join_keys(on: &[(PhysicalExprRef, PhysicalExprRef)]) -> JoinKeyPairs {
let (left_keys, right_keys) = on
.iter()
.map(|(l, r)| (l.clone() as _, r.clone() as _))
.unzip();
JoinKeyPairs {
left_keys,
right_keys,
}
}
fn new_join_conditions(
new_left_keys: &[Arc<dyn PhysicalExpr>],
new_right_keys: &[Arc<dyn PhysicalExpr>],
) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
new_left_keys
.iter()
.zip(new_right_keys.iter())
.map(|(l_key, r_key)| (l_key.clone(), r_key.clone()))
.collect()
}
fn add_roundrobin_on_top(
input: DistributionContext,
n_target: usize,
) -> Result<DistributionContext> {
if input.plan.output_partitioning().partition_count() < n_target {
let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)?
.with_preserve_order();
let new_plan = Arc::new(repartition) as _;
Ok(DistributionContext::new(new_plan, true, vec![input]))
} else {
Ok(input)
}
}
fn add_hash_on_top(
input: DistributionContext,
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
) -> Result<DistributionContext> {
if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
return Ok(input);
}
let dist = Distribution::HashPartitioned(hash_exprs);
let satisfied = input
.plan
.output_partitioning()
.satisfy(&dist, input.plan.equivalence_properties());
if !satisfied || n_target > input.plan.output_partitioning().partition_count() {
let partitioning = dist.create_partitioning(n_target);
let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)?
.with_preserve_order();
let plan = Arc::new(repartition) as _;
return Ok(DistributionContext::new(plan, true, vec![input]));
}
Ok(input)
}
fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
if input.plan.output_partitioning().partition_count() > 1 {
let should_preserve_ordering = input.plan.output_ordering().is_some();
let new_plan = if should_preserve_ordering {
Arc::new(SortPreservingMergeExec::new(
input.plan.output_ordering().unwrap_or(&[]).to_vec(),
input.plan.clone(),
)) as _
} else {
Arc::new(CoalescePartitionsExec::new(input.plan.clone())) as _
};
DistributionContext::new(new_plan, true, vec![input])
} else {
input
}
}
fn remove_dist_changing_operators(
mut distribution_context: DistributionContext,
) -> Result<DistributionContext> {
while is_repartition(&distribution_context.plan)
|| is_coalesce_partitions(&distribution_context.plan)
|| is_sort_preserving_merge(&distribution_context.plan)
{
distribution_context = distribution_context.children.swap_remove(0);
}
Ok(distribution_context)
}
fn replace_order_preserving_variants(
mut context: DistributionContext,
) -> Result<DistributionContext> {
context.children = context
.children
.into_iter()
.map(|child| {
if child.data {
replace_order_preserving_variants(child)
} else {
Ok(child)
}
})
.collect::<Result<Vec<_>>>()?;
if is_sort_preserving_merge(&context.plan) {
let child_plan = context.children[0].plan.clone();
context.plan = Arc::new(CoalescePartitionsExec::new(child_plan));
return Ok(context);
} else if let Some(repartition) =
context.plan.as_any().downcast_ref::<RepartitionExec>()
{
if repartition.preserve_order() {
context.plan = Arc::new(RepartitionExec::try_new(
context.children[0].plan.clone(),
repartition.partitioning().clone(),
)?);
return Ok(context);
}
}
context.update_plan_from_children()
}
fn ensure_distribution(
dist_context: DistributionContext,
config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {
let dist_context = update_children(dist_context)?;
if dist_context.plan.children().is_empty() {
return Ok(Transformed::no(dist_context));
}
let target_partitions = config.execution.target_partitions;
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let batch_size = config.execution.batch_size;
let is_unbounded = dist_context.plan.execution_mode().is_unbounded();
let order_preserving_variants_desirable =
is_unbounded || config.optimizer.prefer_existing_sort;
let DistributionContext {
mut plan,
data,
children,
} = remove_dist_changing_operators(dist_context)?;
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
if let Some(updated_window) = get_best_fitting_window(
exec.window_expr(),
exec.input(),
&exec.partition_keys,
)? {
plan = updated_window;
}
} else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
if let Some(updated_window) = get_best_fitting_window(
exec.window_expr(),
exec.input(),
&exec.partition_keys,
)? {
plan = updated_window;
}
};
let children = izip!(
children.into_iter(),
plan.required_input_distribution().iter(),
plan.required_input_ordering().iter(),
plan.benefits_from_input_partitioning(),
plan.maintains_input_order()
)
.map(
|(mut child, requirement, required_input_ordering, would_benefit, maintains)| {
let num_rows = child.plan.statistics()?.num_rows;
let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
.map(|value| value > &batch_size)
.unwrap() } else {
true
};
let add_roundrobin = enable_round_robin
&& (would_benefit && repartition_beneficial_stats)
&& child.plan.output_partitioning().partition_count() < target_partitions;
if repartition_file_scans && repartition_beneficial_stats {
if let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
child.plan = new_child;
}
}
match requirement {
Distribution::SinglePartition => {
child = add_spm_on_top(child);
}
Distribution::HashPartitioned(exprs) => {
if add_roundrobin {
child = add_roundrobin_on_top(child, target_partitions)?;
}
child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
}
Distribution::UnspecifiedDistribution => {
if add_roundrobin {
child = add_roundrobin_on_top(child, target_partitions)?;
}
}
};
if let Some(required_input_ordering) = required_input_ordering {
let ordering_satisfied = child
.plan
.equivalence_properties()
.ordering_satisfy_requirement(required_input_ordering);
if (!ordering_satisfied || !order_preserving_variants_desirable)
&& child.data
{
child = replace_order_preserving_variants(child)?;
if ordering_satisfied {
child = add_sort_above_with_check(
child,
required_input_ordering.to_vec(),
None,
);
}
}
child.data = false;
} else {
match requirement {
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
child = replace_order_preserving_variants(child)?;
}
Distribution::UnspecifiedDistribution => {
if !maintains || plan.as_any().is::<OutputRequirementExec>() {
child = replace_order_preserving_variants(child)?;
}
}
}
}
Ok(child)
},
)
.collect::<Result<Vec<_>>>()?;
let children_plans = children.iter().map(|c| c.plan.clone()).collect::<Vec<_>>();
plan = if plan.as_any().is::<UnionExec>()
&& !config.optimizer.prefer_existing_union
&& can_interleave(children_plans.iter())
{
Arc::new(InterleaveExec::try_new(children_plans)?)
} else {
plan.with_new_children(children_plans)?
};
Ok(Transformed::yes(DistributionContext::new(
plan, data, children,
)))
}
type DistributionContext = PlanContext<bool>;
fn update_children(mut dist_context: DistributionContext) -> Result<DistributionContext> {
for child_context in dist_context.children.iter_mut() {
let child_plan_any = child_context.plan.as_any();
child_context.data =
if let Some(repartition) = child_plan_any.downcast_ref::<RepartitionExec>() {
!matches!(
repartition.partitioning(),
Partitioning::UnknownPartitioning(_)
)
} else {
child_plan_any.is::<SortPreservingMergeExec>()
|| child_plan_any.is::<CoalescePartitionsExec>()
|| child_context.plan.children().is_empty()
|| child_context.children[0].data
|| child_context
.plan
.required_input_distribution()
.iter()
.zip(child_context.children.iter())
.any(|(required_dist, child_context)| {
child_context.data
&& matches!(
required_dist,
Distribution::UnspecifiedDistribution
)
})
}
}
dist_context.data = false;
Ok(dist_context)
}
#[derive(Debug, Clone)]
struct JoinKeyPairs {
left_keys: Vec<Arc<dyn PhysicalExpr>>,
right_keys: Vec<Arc<dyn PhysicalExpr>>,
}
type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;
#[cfg(feature = "parquet")]
#[cfg(test)]
pub(crate) mod tests {
use std::ops::Deref;
use super::*;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec};
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::test_utils::{
check_integrity, coalesce_partitions_exec, repartition_exec,
};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::JoinOn;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics};
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions, expressions::binary, expressions::lit, LexOrdering,
PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_plan::PlanProperties;
#[derive(Debug)]
struct SortRequiredExec {
input: Arc<dyn ExecutionPlan>,
expr: LexOrdering,
cache: PlanProperties,
}
impl SortRequiredExec {
fn new_with_requirement(
input: Arc<dyn ExecutionPlan>,
requirement: Vec<PhysicalSortExpr>,
) -> Self {
let cache = Self::compute_properties(&input);
Self {
input,
expr: requirement,
cache,
}
}
fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
PlanProperties::new(
input.equivalence_properties().clone(), input.output_partitioning().clone(), input.execution_mode(), )
}
}
impl DisplayAs for SortRequiredExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(
f,
"SortRequiredExec: [{}]",
PhysicalSortExpr::format_list(&self.expr)
)
}
}
impl ExecutionPlan for SortRequiredExec {
fn name(&self) -> &'static str {
"SortRequiredExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
if self.expr.is_empty() {
vec![None]
} else {
vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))]
}
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
assert_eq!(children.len(), 1);
let child = children.pop().unwrap();
Ok(Arc::new(Self::new_with_requirement(
child,
self.expr.clone(),
)))
}
fn execute(
&self,
_partition: usize,
_context: Arc<crate::execution::context::TaskContext>,
) -> Result<crate::physical_plan::SendableRecordBatchStream> {
unreachable!();
}
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}
}
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Boolean, true),
]))
}
fn parquet_exec() -> Arc<ParquetExec> {
parquet_exec_with_sort(vec![])
}
pub(crate) fn parquet_exec_with_sort(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(output_ordering),
)
.build_arc()
}
fn parquet_exec_multiple() -> Arc<ParquetExec> {
parquet_exec_multiple_sorted(vec![])
}
fn parquet_exec_multiple_sorted(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
ParquetExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file_groups(vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
])
.with_output_ordering(output_ordering),
)
.build_arc()
}
fn csv_exec() -> Arc<CsvExec> {
csv_exec_with_sort(vec![])
}
fn csv_exec_with_sort(output_ordering: Vec<Vec<PhysicalSortExpr>>) -> Arc<CsvExec> {
Arc::new(
CsvExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_output_ordering(output_ordering),
)
.with_has_header(false)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}
fn csv_exec_multiple() -> Arc<CsvExec> {
csv_exec_multiple_sorted(vec![])
}
fn csv_exec_multiple_sorted(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<CsvExec> {
Arc::new(
CsvExec::builder(
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
.with_file_groups(vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
])
.with_output_ordering(output_ordering),
)
.with_has_header(false)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}
fn projection_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
let mut exprs = vec![];
for (column, alias) in alias_pairs.iter() {
exprs.push((col(column, &input.schema()).unwrap(), alias.to_string()));
}
Arc::new(ProjectionExec::try_new(exprs, input).unwrap())
}
fn aggregate_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
let schema = schema();
let mut group_by_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
for (column, alias) in alias_pairs.iter() {
group_by_expr
.push((col(column, &input.schema()).unwrap(), alias.to_string()));
}
let group_by = PhysicalGroupBy::new_single(group_by_expr.clone());
let final_group_by_expr = group_by_expr
.iter()
.enumerate()
.map(|(index, (_col, name))| {
(
Arc::new(expressions::Column::new(name, index))
as Arc<dyn PhysicalExpr>,
name.clone(),
)
})
.collect::<Vec<_>>();
let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr);
Arc::new(
AggregateExec::try_new(
AggregateMode::FinalPartitioned,
final_grouping,
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by,
vec![],
vec![],
input,
schema.clone(),
)
.unwrap(),
),
schema,
)
.unwrap(),
)
}
fn hash_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_on: &JoinOn,
join_type: &JoinType,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
HashJoinExec::try_new(
left,
right,
join_on.clone(),
None,
join_type,
None,
PartitionMode::Partitioned,
false,
)
.unwrap(),
)
}
fn sort_merge_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_on: &JoinOn,
join_type: &JoinType,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
SortMergeJoinExec::try_new(
left,
right,
join_on.clone(),
None,
*join_type,
vec![SortOptions::default(); join_on.len()],
false,
)
.unwrap(),
)
}
fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let predicate = Arc::new(BinaryExpr::new(
col("c", &schema()).unwrap(),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
));
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}
fn sort_exec(
sort_exprs: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
preserve_partitioning: bool,
) -> Arc<dyn ExecutionPlan> {
let new_sort = SortExec::new(sort_exprs, input)
.with_preserve_partitioning(preserve_partitioning);
Arc::new(new_sort)
}
fn sort_preserving_merge_exec(
sort_exprs: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}
fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(
Arc::new(LocalLimitExec::new(input, 100)),
0,
Some(100),
))
}
fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
Arc::new(UnionExec::new(input))
}
fn sort_required_exec_with_req(
input: Arc<dyn ExecutionPlan>,
sort_exprs: LexOrdering,
) -> Arc<dyn ExecutionPlan> {
Arc::new(SortRequiredExec::new_with_requirement(input, sort_exprs))
}
pub(crate) fn trim_plan_display(plan: &str) -> Vec<&str> {
plan.split('\n')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect()
}
fn ensure_distribution_helper(
plan: Arc<dyn ExecutionPlan>,
target_partitions: usize,
prefer_existing_sort: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let distribution_context = DistributionContext::new_default(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = prefer_existing_sort;
ensure_distribution(distribution_context, &config).map(|item| item.data.plan)
}
macro_rules! plans_matches_expected {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let physical_plan = $PLAN;
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected_plan_lines: Vec<&str> = $EXPECTED_LINES
.iter().map(|s| *s).collect();
assert_eq!(
expected_plan_lines, actual,
"\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
}
}
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024, false);
};
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, false);
};
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION);
};
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, false);
};
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION;
let optimizer = OutputRequirements::new_add_mode();
let optimized = optimizer.optimize($PLAN.clone(), &config)?;
{
let adjusted = if config.optimizer.top_down_join_key_reordering {
let plan_requirements =
PlanWithKeyRequirements::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_down(adjust_input_keys_ordering)
.data()
.and_then(check_integrity)?;
adjusted.plan
} else {
$PLAN.clone().transform_up(|plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})
.data()?
};
DistributionContext::new_default(adjusted)
.transform_up(|distribution_context| {
ensure_distribution(distribution_context, &config)
})
.data()
.and_then(check_integrity)?;
}
let optimized = if $FIRST_ENFORCE_DIST {
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
let optimizer = EnforceSorting::new();
let optimized = optimizer.optimize(optimized, &config)?;
optimized
} else {
let optimizer = EnforceSorting::new();
let optimized = optimizer.optimize(optimized, &config)?;
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
optimized
};
let optimizer = OutputRequirements::new_remove_mode();
let optimized = optimizer.optimize(optimized, &config)?;
let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
&expected_lines, &actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
macro_rules! assert_plan_txt {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
let plan = displayable($PLAN.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
&expected_lines, &actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
#[test]
fn multi_hash_joins() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
("d".to_string(), "d1".to_string()),
("e".to_string(), "e1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
let join_on = vec![(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
)];
for join_type in join_types {
let join = hash_join_exec(left.clone(), right.clone(), &join_on, &join_type);
let join_plan = format!(
"HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(a@0, b1@1)]"
);
match join_type {
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::LeftSemi
| JoinType::LeftAnti => {
let top_join_on = vec![(
Arc::new(Column::new_with_schema("a", &join.schema()).unwrap())
as _,
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
)];
let top_join = hash_join_exec(
join.clone(),
parquet_exec(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(a@0, c@2)]");
let expected = match join_type {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => vec![
top_join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
}
JoinType::RightSemi | JoinType::RightAnti => {}
}
match join_type {
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::RightSemi
| JoinType::RightAnti => {
let top_join_on = vec![(
Arc::new(Column::new_with_schema("b1", &join.schema()).unwrap())
as _,
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
)];
let top_join =
hash_join_exec(join, parquet_exec(), &top_join_on, &join_type);
let top_join_plan = match join_type {
JoinType::RightSemi | JoinType::RightAnti =>
format!("HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(b1@1, c@2)]"),
_ =>
format!("HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(b1@6, c@2)]"),
};
let expected = match join_type {
JoinType::Inner | JoinType::Right | JoinType::RightSemi | JoinType::RightAnti =>
vec![
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ =>
vec![
top_join_plan.as_str(),
"RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
}
JoinType::LeftSemi | JoinType::LeftAnti => {}
}
}
Ok(())
}
#[test]
fn multi_joins_after_alias() -> Result<()> {
let left = parquet_exec();
let right = parquet_exec();
let join_on = vec![(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &schema()).unwrap()) as _,
)];
let join = hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("a".to_string(), "a2".to_string()),
];
let projection = projection_exec_with_alias(join, alias_pairs);
let top_join_on = vec![(
Arc::new(Column::new_with_schema("a1", &projection.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
)];
let top_join = hash_join_exec(
projection.clone(),
right.clone(),
&top_join_on,
&JoinType::Inner,
);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]",
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
let top_join_on = vec![(
Arc::new(Column::new_with_schema("a2", &projection.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
)];
let top_join = hash_join_exec(projection, right, &top_join_on, &JoinType::Inner);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a2@1, c@2)]",
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
Ok(())
}
#[test]
fn multi_joins_after_multi_alias() -> Result<()> {
let left = parquet_exec();
let right = parquet_exec();
let join_on = vec![(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &schema()).unwrap()) as _,
)];
let join = hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let alias_pairs: Vec<(String, String)> =
vec![("c".to_string(), "c1".to_string())];
let projection = projection_exec_with_alias(join, alias_pairs);
let alias_pairs: Vec<(String, String)> =
vec![("c1".to_string(), "a".to_string())];
let projection2 = projection_exec_with_alias(projection, alias_pairs);
let top_join_on = vec![(
Arc::new(Column::new_with_schema("a", &projection2.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
)];
let top_join = hash_join_exec(projection2, right, &top_join_on, &JoinType::Inner);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, c@2)]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"ProjectionExec: expr=[c1@0 as a]",
"ProjectionExec: expr=[c@2 as c1]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
Ok(())
}
#[test]
fn join_after_agg_alias() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a2".to_string())],
);
let join_on = vec![(
Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a2", &right.schema()).unwrap()) as _,
)];
let join = hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, a2@0)]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join.clone(), true);
assert_optimized!(expected, join, false);
Ok(())
}
#[test]
fn hash_join_key_ordering() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
],
);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![
("b".to_string(), "b".to_string()),
("a".to_string(), "a".to_string()),
],
);
let join_on = vec![
(
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("a1", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a", &right.schema()).unwrap()) as _,
),
];
let join = hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b1@1, b@0), (a1@0, a@1)]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join.clone(), true);
assert_optimized!(expected, join, false);
Ok(())
}
#[test]
fn multi_hash_join_key_ordering() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_on = vec![
(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("b", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c1", &right.schema()).unwrap()) as _,
),
];
let bottom_left_join =
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner);
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "A".to_string()),
("a".to_string(), "AA".to_string()),
("b".to_string(), "B".to_string()),
("c".to_string(), "C".to_string()),
];
let bottom_left_projection =
projection_exec_with_alias(bottom_left_join, alias_pairs);
let join_on = vec![
(
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("b", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a1", &right.schema()).unwrap()) as _,
),
];
let bottom_right_join =
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let top_join_on = vec![
(
Arc::new(
Column::new_with_schema("B", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("b1", &bottom_right_join.schema()).unwrap(),
) as _,
),
(
Arc::new(
Column::new_with_schema("C", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("c", &bottom_right_join.schema()).unwrap(),
) as _,
),
(
Arc::new(
Column::new_with_schema("AA", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("a1", &bottom_right_join.schema()).unwrap(),
) as _,
),
];
let top_join = hash_join_exec(
bottom_left_projection.clone(),
bottom_right_join,
&top_join_on,
&JoinType::Inner,
);
let predicate: Arc<dyn PhysicalExpr> = binary(
col("c", top_join.schema().deref())?,
Operator::Gt,
lit(1i64),
top_join.schema().deref(),
)?;
let filter_top_join: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, top_join)?);
let expected = &[
"FilterExec: c@6 > 1",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]",
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]",
"RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]",
"RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, filter_top_join.clone(), true);
assert_optimized!(expected, filter_top_join, false);
Ok(())
}
#[test]
fn reorder_join_keys_to_left_input() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_on = vec![
(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("b", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c1", &right.schema()).unwrap()) as _,
),
];
let bottom_left_join = ensure_distribution_helper(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
true,
)?;
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "A".to_string()),
("a".to_string(), "AA".to_string()),
("b".to_string(), "B".to_string()),
("c".to_string(), "C".to_string()),
];
let bottom_left_projection =
projection_exec_with_alias(bottom_left_join, alias_pairs);
let join_on = vec![
(
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("b", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a1", &right.schema()).unwrap()) as _,
),
];
let bottom_right_join = ensure_distribution_helper(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
true,
)?;
let top_join_on = vec![
(
Arc::new(
Column::new_with_schema("B", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("b1", &bottom_right_join.schema()).unwrap(),
) as _,
),
(
Arc::new(
Column::new_with_schema("C", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("c", &bottom_right_join.schema()).unwrap(),
) as _,
),
(
Arc::new(
Column::new_with_schema("AA", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("a1", &bottom_right_join.schema()).unwrap(),
) as _,
),
];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
for join_type in join_types {
let top_join = hash_join_exec(
bottom_left_projection.clone(),
bottom_right_join.clone(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("HashJoinExec: mode=Partitioned, join_type={:?}, on=[(AA@1, a1@5), (B@2, b1@6), (C@3, c@2)]", &join_type);
let reordered = reorder_join_keys_to_inputs(top_join)?;
let expected = &[
top_join_plan.as_str(),
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]",
"RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]",
"RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, reordered);
}
Ok(())
}
#[test]
fn reorder_join_keys_to_right_input() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_on = vec![
(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("b", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
),
];
let bottom_left_join = ensure_distribution_helper(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
true,
)?;
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "A".to_string()),
("a".to_string(), "AA".to_string()),
("b".to_string(), "B".to_string()),
("c".to_string(), "C".to_string()),
];
let bottom_left_projection =
projection_exec_with_alias(bottom_left_join, alias_pairs);
let join_on = vec![
(
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("b", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a1", &right.schema()).unwrap()) as _,
),
];
let bottom_right_join = ensure_distribution_helper(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
true,
)?;
let top_join_on = vec![
(
Arc::new(
Column::new_with_schema("B", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("b1", &bottom_right_join.schema()).unwrap(),
) as _,
),
(
Arc::new(
Column::new_with_schema("C", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("c", &bottom_right_join.schema()).unwrap(),
) as _,
),
(
Arc::new(
Column::new_with_schema("AA", &bottom_left_projection.schema())
.unwrap(),
) as _,
Arc::new(
Column::new_with_schema("a1", &bottom_right_join.schema()).unwrap(),
) as _,
),
];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
for join_type in join_types {
let top_join = hash_join_exec(
bottom_left_projection.clone(),
bottom_right_join.clone(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("HashJoinExec: mode=Partitioned, join_type={:?}, on=[(C@3, c@2), (B@2, b1@6), (AA@1, a1@5)]", &join_type);
let reordered = reorder_join_keys_to_inputs(top_join)?;
let expected = &[
top_join_plan.as_str(),
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]",
"RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]",
"RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, reordered);
}
Ok(())
}
#[test]
fn multi_smj_joins() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
("d".to_string(), "d1".to_string()),
("e".to_string(), "e1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
];
let join_on = vec![(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b1", &right.schema()).unwrap()) as _,
)];
for join_type in join_types {
let join =
sort_merge_join_exec(left.clone(), right.clone(), &join_on, &join_type);
let join_plan =
format!("SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]");
let top_join_on = vec![(
Arc::new(Column::new_with_schema("a", &join.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
)];
let top_join = sort_merge_join_exec(
join.clone(),
parquet_exec(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("SortMergeJoin: join_type={join_type}, on=[(a@0, c@2)]");
let expected = match join_type {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti =>
vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => vec![
top_join_plan.as_str(),
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
join_plan.as_str(),
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join.clone(), true, true);
let expected_first_sort_enforcement = match join_type {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti =>
vec![
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => vec![
top_join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected_first_sort_enforcement, top_join, false, true);
match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
let top_join_on = vec![(
Arc::new(Column::new_with_schema("b1", &join.schema()).unwrap())
as _,
Arc::new(Column::new_with_schema("c", &schema()).unwrap()) as _,
)];
let top_join = sort_merge_join_exec(
join,
parquet_exec(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("SortMergeJoin: join_type={join_type}, on=[(b1@6, c@2)]");
let expected = match join_type {
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
"SortExec: expr=[b1@6 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10",
join_plan.as_str(),
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => unreachable!()
};
assert_optimized!(expected, top_join.clone(), true, true);
let expected_first_sort_enforcement = match join_type {
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
"RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@6 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => unreachable!()
};
assert_optimized!(
expected_first_sort_enforcement,
top_join,
false,
true
);
}
_ => {}
}
}
Ok(())
}
#[test]
fn smj_join_key_ordering() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
],
);
let alias_pairs: Vec<(String, String)> = vec![
("a1".to_string(), "a3".to_string()),
("b1".to_string(), "b3".to_string()),
];
let left = projection_exec_with_alias(left, alias_pairs);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![
("b".to_string(), "b".to_string()),
("a".to_string(), "a".to_string()),
],
);
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a2".to_string()),
("b".to_string(), "b2".to_string()),
];
let right = projection_exec_with_alias(right, alias_pairs);
let join_on = vec![
(
Arc::new(Column::new_with_schema("b3", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
),
(
Arc::new(Column::new_with_schema("a3", &left.schema()).unwrap()) as _,
Arc::new(Column::new_with_schema("a2", &right.schema()).unwrap()) as _,
),
];
let join = sort_merge_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let expected = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
"SortExec: expr=[b3@1 ASC,a3@0 ASC], preserve_partitioning=[true]",
"ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b2@1 ASC,a2@0 ASC], preserve_partitioning=[true]",
"ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join.clone(), true, true);
let expected_first_sort_enforcement = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
"RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC,a3@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b3@1 ASC,a3@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC,a2@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b2@1 ASC,a2@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected_first_sort_enforcement, join, false, true);
Ok(())
}
#[test]
fn merge_does_not_need_sort() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let exec = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
let exec: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(sort_key, exec));
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"CoalesceBatchesExec: target_batch_size=4096",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, exec, true);
let expected = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"CoalesceBatchesExec: target_batch_size=4096",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, exec, false);
Ok(())
}
#[test]
fn union_to_interleave() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
let plan = Arc::new(UnionExec::new(vec![left, right]));
let plan =
aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]);
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
"InterleaveExec",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan.clone(), false);
Ok(())
}
#[test]
fn union_not_to_interleave() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
let plan = Arc::new(UnionExec::new(vec![left, right]));
let plan =
aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]);
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20",
"AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
"UnionExec",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let prefer_existing_sort = false;
let first_enforce_distribution = true;
let prefer_existing_union = true;
assert_optimized!(
expected,
plan.clone(),
first_enforce_distribution,
prefer_existing_sort,
prefer_existing_union
);
assert_optimized!(
expected,
plan,
!first_enforce_distribution,
prefer_existing_sort,
prefer_existing_union
);
Ok(())
}
#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan = aggregate_exec_with_alias(parquet_exec(), alias);
let expected = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_deepest_node() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan = aggregate_exec_with_alias(filter_exec(parquet_exec()), alias);
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_unsorted_limit() -> Result<()> {
let plan = limit_exec(filter_exec(parquet_exec()));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_sorted_limit() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = limit_exec(sort_exec(sort_key, parquet_exec(), false));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_sorted_limit_with_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_required_exec_with_req(
filter_exec(sort_exec(sort_key.clone(), parquet_exec(), false)),
sort_key,
);
let expected = &[
"SortRequiredExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_limit() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan = aggregate_exec_with_alias(
limit_exec(filter_exec(limit_exec(parquet_exec()))),
alias,
);
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_union() -> Result<()> {
let plan = union_exec(vec![parquet_exec(); 5]);
let expected = &[
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_through_sort_preserving_merge() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_preserving_merge_exec(sort_key, parquet_exec());
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_sort_preserving_merge() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_preserving_merge_exec(
sort_key.clone(),
parquet_exec_multiple_sorted(vec![sort_key]),
);
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan.clone(), true);
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]);
let plan = sort_preserving_merge_exec(sort_key, input);
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan.clone(), true);
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_does_not_destroy_sort() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("d", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_required_exec_with_req(
filter_exec(parquet_exec_with_sort(vec![sort_key.clone()])),
sort_key,
);
let expected = &[
"SortRequiredExec: [d@3 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC]",
];
assert_optimized!(expected, plan.clone(), true, true);
assert_optimized!(expected, plan, false, true);
Ok(())
}
#[test]
fn repartition_does_not_destroy_sort_more_complex() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input1 = sort_required_exec_with_req(
parquet_exec_with_sort(vec![sort_key.clone()]),
sort_key,
);
let input2 = filter_exec(parquet_exec());
let plan = union_exec(vec![input1, input2]);
let expected = &[
"UnionExec",
"SortRequiredExec: [c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_transitively_with_projection() -> Result<()> {
let schema = schema();
let proj_exprs = vec![(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
col("b", &schema).unwrap(),
)) as Arc<dyn PhysicalExpr>,
"sum".to_string(),
)];
let proj = Arc::new(ProjectionExec::try_new(proj_exprs, parquet_exec())?);
let sort_key = vec![PhysicalSortExpr {
expr: col("sum", &proj.schema()).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_preserving_merge_exec(sort_key, proj);
let expected = &[
"SortPreservingMergeExec: [sum@0 ASC]",
"SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]",
"ProjectionExec: expr=[a@0 + b@1 as sum]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a@0 + b@1 as sum]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_transitively_with_projection() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "b".to_string()),
("c".to_string(), "c".to_string()),
];
let plan = sort_required_exec_with_req(
projection_exec_with_alias(
parquet_exec_multiple_sorted(vec![sort_key.clone()]),
alias,
),
sort_key,
);
let expected = &[
"SortRequiredExec: [c@2 ASC]",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_transitively_past_sort_with_projection() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "b".to_string()),
("c".to_string(), "c".to_string()),
];
let plan = sort_preserving_merge_exec(
sort_key.clone(),
sort_exec(
sort_key,
projection_exec_with_alias(parquet_exec(), alias),
true,
),
);
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_transitively_past_sort_with_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false);
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
Ok(())
}
#[test]
#[cfg(feature = "parquet")]
fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_exec(
sort_key,
projection_exec_with_alias(
filter_exec(parquet_exec()),
vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "b".to_string()),
("c".to_string(), "c".to_string()),
],
),
false,
);
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
Ok(())
}
#[test]
fn parallelization_single_partition() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet = aggregate_exec_with_alias(parquet_exec(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec(), alias);
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"ParquetExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"CsvExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_multiple_files() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
let plan = sort_required_exec_with_req(plan, sort_key);
let expected = [
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"ParquetExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]", ];
let target_partitions = 3;
let repartition_size = 1;
assert_optimized!(
expected,
plan,
true,
true,
target_partitions,
true,
repartition_size,
false
);
let expected = [
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"ParquetExec: file_groups={8 groups: [[x:0..25], [y:0..25], [x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
let target_partitions = 8;
let repartition_size = 1;
assert_optimized!(
expected,
plan,
true,
true,
target_partitions,
true,
repartition_size,
false
);
Ok(())
}
#[test]
fn parallelization_compressed_csv() -> Result<()> {
let compression_types = [
FileCompressionType::GZIP,
FileCompressionType::BZIP2,
FileCompressionType::XZ,
FileCompressionType::ZSTD,
FileCompressionType::UNCOMPRESSED,
];
let expected_not_partitioned = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
let expected_partitioned = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"CsvExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], has_header=false",
];
for compression_type in compression_types {
let expected = if compression_type.is_compressed() {
&expected_not_partitioned[..]
} else {
&expected_partitioned[..]
};
let plan = aggregate_exec_with_alias(
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema(),
)
.with_file(PartitionedFile::new("x".to_string(), 100)),
)
.with_has_header(false)
.with_delimeter(b',')
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(compression_type)
.build(),
),
vec![("a".to_string(), "a".to_string())],
);
assert_optimized!(expected, plan, true, false, 2, true, 10, false);
}
Ok(())
}
#[test]
fn parallelization_two_partitions() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet =
aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias.clone());
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_two_partitions_into_four() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet =
aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias.clone());
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"ParquetExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"CsvExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true, 10);
assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10);
Ok(())
}
#[test]
fn parallelization_sorted_limit() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet = limit_exec(sort_exec(sort_key.clone(), parquet_exec(), false));
let plan_csv = limit_exec(sort_exec(sort_key.clone(), csv_exec(), false));
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_limit_with_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet = limit_exec(filter_exec(sort_exec(
sort_key.clone(),
parquet_exec(),
false,
)));
let plan_csv =
limit_exec(filter_exec(sort_exec(sort_key.clone(), csv_exec(), false)));
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_ignores_limit() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet = aggregate_exec_with_alias(
limit_exec(filter_exec(limit_exec(parquet_exec()))),
alias.clone(),
);
let plan_csv = aggregate_exec_with_alias(
limit_exec(filter_exec(limit_exec(csv_exec()))),
alias.clone(),
);
let expected_parquet = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let expected_csv = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_union_inputs() -> Result<()> {
let plan_parquet = union_exec(vec![parquet_exec(); 5]);
let plan_csv = union_exec(vec![csv_exec(); 5]);
let expected_parquet = &[
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let expected_csv = &[
"UnionExec",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_prior_to_sort_preserving_merge() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet = sort_preserving_merge_exec(
sort_key.clone(),
parquet_exec_with_sort(vec![sort_key.clone()]),
);
let plan_csv = sort_preserving_merge_exec(
sort_key.clone(),
csv_exec_with_sort(vec![sort_key.clone()]),
);
let expected_parquet = &[
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
let expected_csv = &[
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_sort_preserving_merge_with_union() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input_parquet =
union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]);
let input_csv = union_exec(vec![csv_exec_with_sort(vec![sort_key.clone()]); 2]);
let plan_parquet = sort_preserving_merge_exec(sort_key.clone(), input_parquet);
let plan_csv = sort_preserving_merge_exec(sort_key.clone(), input_csv);
let expected_parquet = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
let expected_csv = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_does_not_benefit() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet = sort_required_exec_with_req(
parquet_exec_with_sort(vec![sort_key.clone()]),
sort_key.clone(),
);
let plan_csv = sort_required_exec_with_req(
csv_exec_with_sort(vec![sort_key.clone()]),
sort_key,
);
let expected_parquet = &[
"SortRequiredExec: [c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
let expected_csv = &[
"SortRequiredExec: [c@2 ASC]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a2".to_string()),
("c".to_string(), "c2".to_string()),
];
let proj_parquet = projection_exec_with_alias(
parquet_exec_with_sort(vec![sort_key.clone()]),
alias_pairs.clone(),
);
let sort_key_after_projection = vec![PhysicalSortExpr {
expr: col("c2", &proj_parquet.schema()).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet =
sort_preserving_merge_exec(sort_key_after_projection, proj_parquet);
let expected = &[
"SortPreservingMergeExec: [c2@1 ASC]",
" ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
plans_matches_expected!(expected, &plan_parquet);
let expected_parquet = &[
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected_parquet, plan_parquet, true);
Ok(())
}
#[test]
fn parallelization_ignores_transitively_with_projection_csv() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a2".to_string()),
("c".to_string(), "c2".to_string()),
];
let proj_csv =
projection_exec_with_alias(csv_exec_with_sort(vec![sort_key]), alias_pairs);
let sort_key_after_projection = vec![PhysicalSortExpr {
expr: col("c2", &proj_csv.schema()).unwrap(),
options: SortOptions::default(),
}];
let plan_csv = sort_preserving_merge_exec(sort_key_after_projection, proj_csv);
let expected = &[
"SortPreservingMergeExec: [c2@1 ASC]",
" ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
plans_matches_expected!(expected, &plan_csv);
let expected_csv = &[
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn remove_redundant_roundrobins() -> Result<()> {
let input = parquet_exec();
let repartition = repartition_exec(repartition_exec(input));
let physical_plan = repartition_exec(filter_exec(repartition));
let expected = &[
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
" FilterExec: c@2 = 0",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
plans_matches_expected!(expected, &physical_plan);
let expected = &[
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn remove_unnecessary_spm_after_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
let expected = &[
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true, true);
assert_optimized!(expected, physical_plan, false, true);
Ok(())
}
#[test]
fn preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("d", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
let expected = &[
"SortPreservingMergeExec: [d@3 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true, true);
assert_optimized!(expected, physical_plan, false, true);
Ok(())
}
#[test]
fn do_not_preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true);
let expected = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn no_need_for_sort_after_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
let expected = &[
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn do_not_preserve_ordering_through_repartition2() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let sort_req = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let physical_plan = sort_preserving_merge_exec(sort_req, filter_exec(input));
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true);
let expected = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"CoalescePartitionsExec",
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn do_not_preserve_ordering_through_repartition3() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let physical_plan = filter_exec(input);
let expected = &[
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec();
let physical_plan = sort_required_exec_with_req(filter_exec(input), sort_key);
let expected = &[
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, physical_plan);
let expected = &[
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
assert_plan_txt!(expected, dist_plan);
Ok(())
}
#[test]
fn put_sort_when_input_is_valid() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_required_exec_with_req(filter_exec(input), sort_key);
let expected = &[
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_plan_txt!(expected, physical_plan);
let expected = &[
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"ParquetExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
assert_plan_txt!(expected, dist_plan);
Ok(())
}
#[test]
fn do_not_add_unnecessary_hash() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![("a".to_string(), "a".to_string())];
let input = parquet_exec_with_sort(vec![sort_key]);
let physical_plan = aggregate_exec_with_alias(input, alias.clone());
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true, false, 1, false, 1024);
assert_optimized!(expected, physical_plan, false, false, 1, false, 1024);
Ok(())
}
#[test]
fn do_not_add_unnecessary_hash2() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![("a".to_string(), "a".to_string())];
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let aggregate = aggregate_exec_with_alias(input, alias.clone());
let physical_plan = aggregate_exec_with_alias(aggregate, alias.clone());
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true, false, 4, false, 1024);
assert_optimized!(expected, physical_plan, false, false, 4, false, 1024);
Ok(())
}
#[test]
fn optimize_away_unnecessary_repartition() -> Result<()> {
let physical_plan = coalesce_partitions_exec(repartition_exec(parquet_exec()));
let expected = &[
"CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
plans_matches_expected!(expected, physical_plan.clone());
let expected =
&["ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]"];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn optimize_away_unnecessary_repartition2() -> Result<()> {
let physical_plan = filter_exec(repartition_exec(coalesce_partitions_exec(
filter_exec(repartition_exec(parquet_exec())),
)));
let expected = &[
"FilterExec: c@2 = 0",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CoalescePartitionsExec",
" FilterExec: c@2 = 0",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
plans_matches_expected!(expected, physical_plan.clone());
let expected = &[
"FilterExec: c@2 = 0",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
}