use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above, get_children_exectrees, get_plan_string, is_coalesce_partitions,
is_repartition, is_sort_preserving_merge, ExecTree,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::joins::{
CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{
with_new_children_if_necessary, Distribution, ExecutionPlan, Partitioning,
};
use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
};
use datafusion_physical_plan::unbounded_output;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use itertools::izip;
#[derive(Default)]
pub struct EnforceDistribution {}
impl EnforceDistribution {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for EnforceDistribution {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
let adjusted = if top_down_join_key_reordering {
let plan_requirements = PlanWithKeyRequirements::new(plan);
let adjusted =
plan_requirements.transform_down(&adjust_input_keys_ordering)?;
adjusted.plan
} else {
plan.transform_up(&|plan| {
Ok(Transformed::Yes(reorder_join_keys_to_inputs(plan)?))
})?
};
let distribution_context = DistributionContext::new(adjusted);
let distribution_context =
distribution_context.transform_up(&|distribution_context| {
ensure_distribution(distribution_context, config)
})?;
Ok(distribution_context.plan)
}
fn name(&self) -> &str {
"EnforceDistribution"
}
fn schema_check(&self) -> bool {
true
}
}
fn adjust_input_keys_ordering(
requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
let parent_required = requirements.required_key_ordering.clone();
let plan_any = requirements.plan.as_any();
let transformed = if let Some(HashJoinExec {
left,
right,
on,
filter,
join_type,
mode,
null_equals_null,
..
}) = plan_any.downcast_ref::<HashJoinExec>()
{
match mode {
PartitionMode::Partitioned => {
let join_constructor =
|new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
Ok(Arc::new(HashJoinExec::try_new(
left.clone(),
right.clone(),
new_conditions.0,
filter.clone(),
join_type,
PartitionMode::Partitioned,
*null_equals_null,
)?) as Arc<dyn ExecutionPlan>)
};
Some(reorder_partitioned_join_keys(
requirements.plan.clone(),
&parent_required,
on,
vec![],
&join_constructor,
)?)
}
PartitionMode::CollectLeft => {
let new_right_request = match join_type {
JoinType::Inner | JoinType::Right => shift_right_required(
&parent_required,
left.schema().fields().len(),
),
JoinType::RightSemi | JoinType::RightAnti => {
Some(parent_required.clone())
}
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::Full => None,
};
Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![None, new_right_request],
})
}
PartitionMode::Auto => {
Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
}
}
} else if let Some(CrossJoinExec { left, .. }) =
plan_any.downcast_ref::<CrossJoinExec>()
{
let left_columns_len = left.schema().fields().len();
Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![
None,
shift_right_required(&parent_required, left_columns_len),
],
})
} else if let Some(SortMergeJoinExec {
left,
right,
on,
join_type,
sort_options,
null_equals_null,
..
}) = plan_any.downcast_ref::<SortMergeJoinExec>()
{
let join_constructor =
|new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
Ok(Arc::new(SortMergeJoinExec::try_new(
left.clone(),
right.clone(),
new_conditions.0,
*join_type,
new_conditions.1,
*null_equals_null,
)?) as Arc<dyn ExecutionPlan>)
};
Some(reorder_partitioned_join_keys(
requirements.plan.clone(),
&parent_required,
on,
sort_options.clone(),
&join_constructor,
)?)
} else if let Some(aggregate_exec) = plan_any.downcast_ref::<AggregateExec>() {
if !parent_required.is_empty() {
match aggregate_exec.mode() {
AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
requirements.plan.clone(),
&parent_required,
aggregate_exec,
)?),
_ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
}
} else {
None
}
} else if let Some(proj) = plan_any.downcast_ref::<ProjectionExec>() {
let expr = proj.expr();
let new_required = map_columns_before_projection(&parent_required, expr);
if new_required.len() == parent_required.len() {
Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![Some(new_required.clone())],
})
} else {
Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
}
} else if plan_any.downcast_ref::<RepartitionExec>().is_some()
|| plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
|| plan_any.downcast_ref::<WindowAggExec>().is_some()
{
Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
} else {
let children_len = requirements.plan.children().len();
Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![Some(parent_required.clone()); children_len],
})
};
Ok(if let Some(transformed) = transformed {
Transformed::Yes(transformed)
} else {
Transformed::No(requirements)
})
}
fn reorder_partitioned_join_keys<F>(
join_plan: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
on: &[(Column, Column)],
sort_options: Vec<SortOptions>,
join_constructor: &F,
) -> Result<PlanWithKeyRequirements>
where
F: Fn((Vec<(Column, Column)>, Vec<SortOptions>)) -> Result<Arc<dyn ExecutionPlan>>,
{
let join_key_pairs = extract_join_keys(on);
if let Some((
JoinKeyPairs {
left_keys,
right_keys,
},
new_positions,
)) = try_reorder(
join_key_pairs.clone(),
parent_required,
&join_plan.equivalence_properties(),
) {
if !new_positions.is_empty() {
let new_join_on = new_join_conditions(&left_keys, &right_keys);
let mut new_sort_options: Vec<SortOptions> = vec![];
for idx in 0..sort_options.len() {
new_sort_options.push(sort_options[new_positions[idx]])
}
Ok(PlanWithKeyRequirements {
plan: join_constructor((new_join_on, new_sort_options))?,
required_key_ordering: vec![],
request_key_ordering: vec![Some(left_keys), Some(right_keys)],
})
} else {
Ok(PlanWithKeyRequirements {
plan: join_plan,
required_key_ordering: vec![],
request_key_ordering: vec![Some(left_keys), Some(right_keys)],
})
}
} else {
Ok(PlanWithKeyRequirements {
plan: join_plan,
required_key_ordering: vec![],
request_key_ordering: vec![
Some(join_key_pairs.left_keys),
Some(join_key_pairs.right_keys),
],
})
}
}
fn reorder_aggregate_keys(
agg_plan: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
agg_exec: &AggregateExec,
) -> Result<PlanWithKeyRequirements> {
let output_columns = agg_exec
.group_by()
.expr()
.iter()
.enumerate()
.map(|(index, (_col, name))| Column::new(name, index))
.collect::<Vec<_>>();
let output_exprs = output_columns
.iter()
.map(|c| Arc::new(c.clone()) as _)
.collect::<Vec<_>>();
if parent_required.len() != output_exprs.len()
|| !agg_exec.group_by().null_expr().is_empty()
|| physical_exprs_equal(&output_exprs, parent_required)
{
Ok(PlanWithKeyRequirements::new(agg_plan))
} else {
let new_positions = expected_expr_positions(&output_exprs, parent_required);
match new_positions {
None => Ok(PlanWithKeyRequirements::new(agg_plan)),
Some(positions) => {
let new_partial_agg = if let Some(agg_exec) =
agg_exec.input().as_any().downcast_ref::<AggregateExec>()
{
if matches!(agg_exec.mode(), &AggregateMode::Partial) {
let group_exprs = agg_exec.group_by().expr();
let new_group_exprs = positions
.into_iter()
.map(|idx| group_exprs[idx].clone())
.collect();
let new_partial_group_by =
PhysicalGroupBy::new_single(new_group_exprs);
Some(Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
new_partial_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
agg_exec.order_by_expr().to_vec(),
agg_exec.input().clone(),
agg_exec.input_schema.clone(),
)?))
} else {
None
}
} else {
None
};
if let Some(partial_agg) = new_partial_agg {
let group_exprs = partial_agg.group_expr().expr();
let new_final_group = partial_agg.output_group_expr();
let new_group_by = PhysicalGroupBy::new_single(
new_final_group
.iter()
.enumerate()
.map(|(idx, expr)| (expr.clone(), group_exprs[idx].1.clone()))
.collect(),
);
let new_final_agg = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
new_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
agg_exec.order_by_expr().to_vec(),
partial_agg,
agg_exec.input_schema(),
)?);
let agg_schema = new_final_agg.schema();
let mut proj_exprs = output_columns
.iter()
.map(|col| {
let name = col.name();
(
Arc::new(Column::new(
name,
agg_schema.index_of(name).unwrap(),
)) as _,
name.to_owned(),
)
})
.collect::<Vec<_>>();
let agg_fields = agg_schema.fields();
for (idx, field) in
agg_fields.iter().enumerate().skip(output_columns.len())
{
let name = field.name();
proj_exprs
.push((Arc::new(Column::new(name, idx)) as _, name.clone()))
}
Ok(PlanWithKeyRequirements::new(Arc::new(
ProjectionExec::try_new(proj_exprs, new_final_agg)?,
)))
} else {
Ok(PlanWithKeyRequirements::new(agg_plan))
}
}
}
}
}
fn shift_right_required(
parent_required: &[Arc<dyn PhysicalExpr>],
left_columns_len: usize,
) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
let new_right_required = parent_required
.iter()
.filter_map(|r| {
if let Some(col) = r.as_any().downcast_ref::<Column>() {
let idx = col.index();
if idx >= left_columns_len {
let result = Column::new(col.name(), idx - left_columns_len);
Some(Arc::new(result) as _)
} else {
None
}
} else {
None
}
})
.collect::<Vec<_>>();
(new_right_required.len() == parent_required.len()).then_some(new_right_required)
}
pub(crate) fn reorder_join_keys_to_inputs(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_any = plan.as_any();
if let Some(HashJoinExec {
left,
right,
on,
filter,
join_type,
mode,
null_equals_null,
..
}) = plan_any.downcast_ref::<HashJoinExec>()
{
if matches!(mode, PartitionMode::Partitioned) {
let join_key_pairs = extract_join_keys(on);
if let Some((
JoinKeyPairs {
left_keys,
right_keys,
},
new_positions,
)) = reorder_current_join_keys(
join_key_pairs,
Some(left.output_partitioning()),
Some(right.output_partitioning()),
&left.equivalence_properties(),
&right.equivalence_properties(),
) {
if !new_positions.is_empty() {
let new_join_on = new_join_conditions(&left_keys, &right_keys);
return Ok(Arc::new(HashJoinExec::try_new(
left.clone(),
right.clone(),
new_join_on,
filter.clone(),
join_type,
PartitionMode::Partitioned,
*null_equals_null,
)?));
}
}
}
} else if let Some(SortMergeJoinExec {
left,
right,
on,
join_type,
sort_options,
null_equals_null,
..
}) = plan_any.downcast_ref::<SortMergeJoinExec>()
{
let join_key_pairs = extract_join_keys(on);
if let Some((
JoinKeyPairs {
left_keys,
right_keys,
},
new_positions,
)) = reorder_current_join_keys(
join_key_pairs,
Some(left.output_partitioning()),
Some(right.output_partitioning()),
&left.equivalence_properties(),
&right.equivalence_properties(),
) {
if !new_positions.is_empty() {
let new_join_on = new_join_conditions(&left_keys, &right_keys);
let new_sort_options = (0..sort_options.len())
.map(|idx| sort_options[new_positions[idx]])
.collect();
return Ok(Arc::new(SortMergeJoinExec::try_new(
left.clone(),
right.clone(),
new_join_on,
*join_type,
new_sort_options,
*null_equals_null,
)?));
}
}
}
Ok(plan)
}
fn reorder_current_join_keys(
join_keys: JoinKeyPairs,
left_partition: Option<Partitioning>,
right_partition: Option<Partitioning>,
left_equivalence_properties: &EquivalenceProperties,
right_equivalence_properties: &EquivalenceProperties,
) -> Option<(JoinKeyPairs, Vec<usize>)> {
match (left_partition, right_partition.clone()) {
(Some(Partitioning::Hash(left_exprs, _)), _) => {
try_reorder(join_keys.clone(), &left_exprs, left_equivalence_properties)
.or_else(|| {
reorder_current_join_keys(
join_keys,
None,
right_partition,
left_equivalence_properties,
right_equivalence_properties,
)
})
}
(_, Some(Partitioning::Hash(right_exprs, _))) => {
try_reorder(join_keys, &right_exprs, right_equivalence_properties)
}
_ => None,
}
}
fn try_reorder(
join_keys: JoinKeyPairs,
expected: &[Arc<dyn PhysicalExpr>],
equivalence_properties: &EquivalenceProperties,
) -> Option<(JoinKeyPairs, Vec<usize>)> {
let eq_groups = equivalence_properties.eq_group();
let mut normalized_expected = vec![];
let mut normalized_left_keys = vec![];
let mut normalized_right_keys = vec![];
if join_keys.left_keys.len() != expected.len() {
return None;
}
if physical_exprs_equal(expected, &join_keys.left_keys)
|| physical_exprs_equal(expected, &join_keys.right_keys)
{
return Some((join_keys, vec![]));
} else if !equivalence_properties.eq_group().is_empty() {
normalized_expected = expected
.iter()
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(normalized_expected.len(), expected.len());
normalized_left_keys = join_keys
.left_keys
.iter()
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len());
normalized_right_keys = join_keys
.right_keys
.iter()
.map(|e| eq_groups.normalize_expr(e.clone()))
.collect::<Vec<_>>();
assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len());
if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
|| physical_exprs_equal(&normalized_expected, &normalized_right_keys)
{
return Some((join_keys, vec![]));
}
}
let new_positions = expected_expr_positions(&join_keys.left_keys, expected)
.or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
.or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
.or_else(|| {
expected_expr_positions(&normalized_right_keys, &normalized_expected)
});
if let Some(positions) = new_positions {
let mut new_left_keys = vec![];
let mut new_right_keys = vec![];
for pos in positions.iter() {
new_left_keys.push(join_keys.left_keys[*pos].clone());
new_right_keys.push(join_keys.right_keys[*pos].clone());
}
Some((
JoinKeyPairs {
left_keys: new_left_keys,
right_keys: new_right_keys,
},
positions,
))
} else {
None
}
}
fn expected_expr_positions(
current: &[Arc<dyn PhysicalExpr>],
expected: &[Arc<dyn PhysicalExpr>],
) -> Option<Vec<usize>> {
if current.is_empty() || expected.is_empty() {
return None;
}
let mut indexes: Vec<usize> = vec![];
let mut current = current.to_vec();
for expr in expected.iter() {
if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
current[expected_position] = Arc::new(NoOp::new());
indexes.push(expected_position);
} else {
return None;
}
}
Some(indexes)
}
fn extract_join_keys(on: &[(Column, Column)]) -> JoinKeyPairs {
let (left_keys, right_keys) = on
.iter()
.map(|(l, r)| (Arc::new(l.clone()) as _, Arc::new(r.clone()) as _))
.unzip();
JoinKeyPairs {
left_keys,
right_keys,
}
}
fn new_join_conditions(
new_left_keys: &[Arc<dyn PhysicalExpr>],
new_right_keys: &[Arc<dyn PhysicalExpr>],
) -> Vec<(Column, Column)> {
new_left_keys
.iter()
.zip(new_right_keys.iter())
.map(|(l_key, r_key)| {
(
l_key.as_any().downcast_ref::<Column>().unwrap().clone(),
r_key.as_any().downcast_ref::<Column>().unwrap().clone(),
)
})
.collect()
}
fn update_distribution_onward(
input: Arc<dyn ExecutionPlan>,
dist_onward: &mut Option<ExecTree>,
input_idx: usize,
) {
if let Some(exec_tree) = dist_onward {
exec_tree.idx = 0;
*exec_tree = ExecTree::new(input, input_idx, vec![exec_tree.clone()]);
} else {
*dist_onward = Some(ExecTree::new(input, input_idx, vec![]));
}
}
fn add_roundrobin_on_top(
input: Arc<dyn ExecutionPlan>,
n_target: usize,
dist_onward: &mut Option<ExecTree>,
input_idx: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
if input.output_partitioning().partition_count() < n_target {
let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition =
RepartitionExec::try_new(input, partitioning)?.with_preserve_order();
let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Ok(new_plan)
} else {
Ok(input)
}
}
fn add_hash_on_top(
input: Arc<dyn ExecutionPlan>,
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
dist_onward: &mut Option<ExecTree>,
input_idx: usize,
repartition_beneficial_stats: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
if n_target == input.output_partitioning().partition_count() && n_target == 1 {
return Ok(input);
}
let satisfied = input
.output_partitioning()
.satisfy(Distribution::HashPartitioned(hash_exprs.clone()), || {
input.equivalence_properties()
});
if !satisfied || n_target > input.output_partitioning().partition_count() {
let mut new_plan = if repartition_beneficial_stats {
add_roundrobin_on_top(input, n_target, dist_onward, 0)?
} else {
input
};
let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(new_plan, partitioning)?
.with_preserve_order();
new_plan = Arc::new(repartition) as _;
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Ok(new_plan)
} else {
Ok(input)
}
}
fn add_spm_on_top(
input: Arc<dyn ExecutionPlan>,
dist_onward: &mut Option<ExecTree>,
input_idx: usize,
) -> Arc<dyn ExecutionPlan> {
if input.output_partitioning().partition_count() > 1 {
let should_preserve_ordering = input.output_ordering().is_some();
let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering {
let existing_ordering = input.output_ordering().unwrap_or(&[]);
Arc::new(SortPreservingMergeExec::new(
existing_ordering.to_vec(),
input,
)) as _
} else {
Arc::new(CoalescePartitionsExec::new(input)) as _
};
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
new_plan
} else {
input
}
}
fn remove_dist_changing_operators(
distribution_context: DistributionContext,
) -> Result<DistributionContext> {
let DistributionContext {
mut plan,
mut distribution_onwards,
} = distribution_context;
while is_repartition(&plan)
|| is_coalesce_partitions(&plan)
|| is_sort_preserving_merge(&plan)
{
plan = plan.children().swap_remove(0);
distribution_onwards =
get_children_exectrees(plan.children().len(), &distribution_onwards[0]);
}
Ok(DistributionContext {
plan,
distribution_onwards,
})
}
fn replace_order_preserving_variants(
input: &mut Arc<dyn ExecutionPlan>,
dist_onward: &mut Option<ExecTree>,
) -> Result<()> {
if let Some(dist_onward) = dist_onward {
*input = replace_order_preserving_variants_helper(dist_onward)?;
}
*dist_onward = None;
Ok(())
}
fn replace_order_preserving_variants_helper(
exec_tree: &ExecTree,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut updated_children = exec_tree.plan.children();
for child in &exec_tree.children {
updated_children[child.idx] = replace_order_preserving_variants_helper(child)?;
}
if is_sort_preserving_merge(&exec_tree.plan) {
return Ok(Arc::new(CoalescePartitionsExec::new(
updated_children.swap_remove(0),
)));
}
if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
if repartition.preserve_order() {
return Ok(Arc::new(
RepartitionExec::try_new(
updated_children.swap_remove(0),
repartition.partitioning().clone(),
)?,
));
}
}
exec_tree.plan.clone().with_new_children(updated_children)
}
fn ensure_distribution(
dist_context: DistributionContext,
config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {
let target_partitions = config.execution.target_partitions;
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let batch_size = config.execution.batch_size;
let is_unbounded = unbounded_output(&dist_context.plan);
let order_preserving_variants_desirable =
is_unbounded || config.optimizer.prefer_existing_sort;
if dist_context.plan.children().is_empty() {
return Ok(Transformed::No(dist_context));
}
let DistributionContext {
mut plan,
mut distribution_onwards,
} = remove_dist_changing_operators(dist_context)?;
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
if let Some(updated_window) = get_best_fitting_window(
exec.window_expr(),
exec.input(),
&exec.partition_keys,
)? {
plan = updated_window;
}
} else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
if let Some(updated_window) = get_best_fitting_window(
exec.window_expr(),
exec.input(),
&exec.partition_keys,
)? {
plan = updated_window;
}
};
let n_children = plan.children().len();
let new_children = izip!(
plan.children().into_iter(),
plan.required_input_distribution().iter(),
plan.required_input_ordering().iter(),
distribution_onwards.iter_mut(),
plan.benefits_from_input_partitioning(),
plan.maintains_input_order(),
0..n_children
)
.map(
|(
mut child,
requirement,
required_input_ordering,
dist_onward,
would_benefit,
maintains,
child_idx,
)| {
let num_rows = child.statistics()?.num_rows;
let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
.map(|value| value > &batch_size)
.unwrap_or(true)
} else {
true
};
if enable_round_robin
&& (would_benefit && repartition_beneficial_stats)
&& child.output_partitioning().partition_count() < target_partitions
{
if repartition_file_scans {
if let Some(new_child) =
child.repartitioned(target_partitions, config)?
{
child = new_child;
}
}
child = add_roundrobin_on_top(
child,
target_partitions,
dist_onward,
child_idx,
)?;
}
match requirement {
Distribution::SinglePartition => {
child = add_spm_on_top(child, dist_onward, child_idx);
}
Distribution::HashPartitioned(exprs) => {
child = add_hash_on_top(
child,
exprs.to_vec(),
target_partitions,
dist_onward,
child_idx,
repartition_beneficial_stats,
)?;
}
Distribution::UnspecifiedDistribution => {}
};
if let Some(required_input_ordering) = required_input_ordering {
let ordering_satisfied = child
.equivalence_properties()
.ordering_satisfy_requirement(required_input_ordering);
if !ordering_satisfied || !order_preserving_variants_desirable {
replace_order_preserving_variants(&mut child, dist_onward)?;
if ordering_satisfied {
add_sort_above(&mut child, required_input_ordering, None);
}
}
*dist_onward = None;
} else {
match requirement {
Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
replace_order_preserving_variants(&mut child, dist_onward)?;
}
Distribution::UnspecifiedDistribution => {
if !maintains {
replace_order_preserving_variants(&mut child, dist_onward)?;
}
}
}
}
Ok(child)
},
)
.collect::<Result<Vec<_>>>()?;
let new_distribution_context = DistributionContext {
plan: if plan.as_any().is::<UnionExec>() && can_interleave(&new_children) {
Arc::new(InterleaveExec::try_new(new_children)?)
} else {
plan.with_new_children(new_children)?
},
distribution_onwards,
};
Ok(Transformed::Yes(new_distribution_context))
}
#[derive(Debug, Clone)]
struct DistributionContext {
plan: Arc<dyn ExecutionPlan>,
distribution_onwards: Vec<Option<ExecTree>>,
}
impl DistributionContext {
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
DistributionContext {
plan,
distribution_onwards: vec![None; length],
}
}
fn new_from_children_nodes(
children_nodes: Vec<DistributionContext>,
parent_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let children_plans = children_nodes
.iter()
.map(|item| item.plan.clone())
.collect();
let distribution_onwards = children_nodes
.into_iter()
.enumerate()
.map(|(idx, context)| {
let DistributionContext {
plan,
distribution_onwards,
} = context;
if plan.children().is_empty() {
None
} else if distribution_onwards[0].is_none() {
if let Some(repartition) =
plan.as_any().downcast_ref::<RepartitionExec>()
{
match repartition.partitioning() {
Partitioning::RoundRobinBatch(_)
| Partitioning::Hash(_, _) => {
return Some(ExecTree::new(plan, idx, vec![]));
}
_ => {}
}
} else if plan.as_any().is::<SortPreservingMergeExec>()
|| plan.as_any().is::<CoalescePartitionsExec>()
{
return Some(ExecTree::new(plan, idx, vec![]));
}
None
} else {
let new_distribution_onwards = izip!(
plan.required_input_distribution().iter(),
distribution_onwards.into_iter()
)
.flat_map(|(required_dist, distribution_onwards)| {
if let Some(distribution_onwards) = distribution_onwards {
if let Distribution::UnspecifiedDistribution = required_dist {
return Some(distribution_onwards);
}
}
None
})
.collect::<Vec<_>>();
if new_distribution_onwards.is_empty() {
None
} else {
Some(ExecTree::new(plan, idx, new_distribution_onwards))
}
}
})
.collect();
Ok(DistributionContext {
plan: with_new_children_if_necessary(parent_plan, children_plans)?.into(),
distribution_onwards,
})
}
fn children(&self) -> Vec<DistributionContext> {
self.plan
.children()
.into_iter()
.map(DistributionContext::new)
.collect()
}
}
impl TreeNode for DistributionContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
}
fn map_children<F>(self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.children();
if children.is_empty() {
Ok(self)
} else {
let children_nodes = children
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
DistributionContext::new_from_children_nodes(children_nodes, self.plan)
}
}
}
impl fmt::Display for DistributionContext {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let plan_string = get_plan_string(&self.plan);
write!(f, "plan: {:?}", plan_string)?;
for (idx, child) in self.distribution_onwards.iter().enumerate() {
if let Some(child) = child {
write!(f, "idx:{:?}, exec_tree:{}", idx, child)?;
}
}
write!(f, "")
}
}
#[derive(Debug, Clone)]
struct JoinKeyPairs {
left_keys: Vec<Arc<dyn PhysicalExpr>>,
right_keys: Vec<Arc<dyn PhysicalExpr>>,
}
#[derive(Debug, Clone)]
struct PlanWithKeyRequirements {
plan: Arc<dyn ExecutionPlan>,
required_key_ordering: Vec<Arc<dyn PhysicalExpr>>,
request_key_ordering: Vec<Option<Vec<Arc<dyn PhysicalExpr>>>>,
}
impl PlanWithKeyRequirements {
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let children_len = plan.children().len();
PlanWithKeyRequirements {
plan,
required_key_ordering: vec![],
request_key_ordering: vec![None; children_len],
}
}
fn children(&self) -> Vec<PlanWithKeyRequirements> {
let plan_children = self.plan.children();
assert_eq!(plan_children.len(), self.request_key_ordering.len());
plan_children
.into_iter()
.zip(self.request_key_ordering.clone())
.map(|(child, required)| {
let from_parent = required.unwrap_or_default();
let length = child.children().len();
PlanWithKeyRequirements {
plan: child,
required_key_ordering: from_parent,
request_key_ordering: vec![None; length],
}
})
.collect()
}
}
impl TreeNode for PlanWithKeyRequirements {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.children();
for child in children {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
}
fn map_children<F>(self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.children();
if !children.is_empty() {
let new_children: Result<Vec<_>> =
children.into_iter().map(transform).collect();
let children_plans = new_children?
.into_iter()
.map(|child| child.plan)
.collect::<Vec<_>>();
let new_plan = with_new_children_if_necessary(self.plan, children_plans)?;
Ok(PlanWithKeyRequirements {
plan: new_plan.into(),
required_key_ordering: self.required_key_ordering,
request_key_ordering: self.request_key_ordering,
})
} else {
Ok(self)
}
}
}
#[cfg(feature = "parquet")]
#[cfg(test)]
pub(crate) mod tests {
use std::ops::Deref;
use super::*;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::ParquetExec;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::physical_optimizer::enforce_sorting::EnforceSorting;
use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::{
utils::JoinOn, HashJoinExec, PartitionMode, SortMergeJoinExec,
};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics};
use crate::physical_optimizer::test_utils::{
coalesce_partitions_exec, repartition_exec,
};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::sorts::sort::SortExec;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::ScalarValue;
use datafusion_expr::logical_plan::JoinType;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions, expressions::binary, expressions::lit, expressions::Column,
LexOrdering, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
#[derive(Debug)]
struct SortRequiredExec {
input: Arc<dyn ExecutionPlan>,
expr: LexOrdering,
}
impl SortRequiredExec {
fn new(input: Arc<dyn ExecutionPlan>) -> Self {
let expr = input.output_ordering().unwrap_or(&[]).to_vec();
Self { input, expr }
}
fn new_with_requirement(
input: Arc<dyn ExecutionPlan>,
requirement: Vec<PhysicalSortExpr>,
) -> Self {
Self {
input,
expr: requirement,
}
}
}
impl DisplayAs for SortRequiredExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(
f,
"SortRequiredExec: [{}]",
PhysicalSortExpr::format_list(&self.expr)
)
}
}
impl ExecutionPlan for SortRequiredExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
self.input.schema()
}
fn output_partitioning(&self) -> crate::physical_plan::Partitioning {
self.input.output_partitioning()
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false]
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.input.output_ordering()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![self
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
assert_eq!(children.len(), 1);
let child = children.pop().unwrap();
Ok(Arc::new(Self::new_with_requirement(
child,
self.expr.clone(),
)))
}
fn execute(
&self,
_partition: usize,
_context: Arc<crate::execution::context::TaskContext>,
) -> Result<crate::physical_plan::SendableRecordBatchStream> {
unreachable!();
}
fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
}
}
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Boolean, true),
]))
}
fn parquet_exec() -> Arc<ParquetExec> {
parquet_exec_with_sort(vec![])
}
pub(crate) fn parquet_exec_with_sort(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::new_unknown(&schema()),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering,
infinite_source: false,
},
None,
None,
))
}
fn parquet_exec_multiple() -> Arc<ParquetExec> {
parquet_exec_multiple_sorted(vec![])
}
fn parquet_exec_multiple_sorted(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
],
statistics: Statistics::new_unknown(&schema()),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering,
infinite_source: false,
},
None,
None,
))
}
fn csv_exec() -> Arc<CsvExec> {
csv_exec_with_sort(vec![])
}
fn csv_exec_with_sort(output_ordering: Vec<Vec<PhysicalSortExpr>>) -> Arc<CsvExec> {
Arc::new(CsvExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::new_unknown(&schema()),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering,
infinite_source: false,
},
false,
b',',
b'"',
None,
FileCompressionType::UNCOMPRESSED,
))
}
fn csv_exec_multiple() -> Arc<CsvExec> {
csv_exec_multiple_sorted(vec![])
}
fn csv_exec_multiple_sorted(
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<CsvExec> {
Arc::new(CsvExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
],
statistics: Statistics::new_unknown(&schema()),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering,
infinite_source: false,
},
false,
b',',
b'"',
None,
FileCompressionType::UNCOMPRESSED,
))
}
fn projection_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
let mut exprs = vec![];
for (column, alias) in alias_pairs.iter() {
exprs.push((col(column, &input.schema()).unwrap(), alias.to_string()));
}
Arc::new(ProjectionExec::try_new(exprs, input).unwrap())
}
fn aggregate_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
alias_pairs: Vec<(String, String)>,
) -> Arc<dyn ExecutionPlan> {
let schema = schema();
let mut group_by_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
for (column, alias) in alias_pairs.iter() {
group_by_expr
.push((col(column, &input.schema()).unwrap(), alias.to_string()));
}
let group_by = PhysicalGroupBy::new_single(group_by_expr.clone());
let final_group_by_expr = group_by_expr
.iter()
.enumerate()
.map(|(index, (_col, name))| {
(
Arc::new(expressions::Column::new(name, index))
as Arc<dyn PhysicalExpr>,
name.clone(),
)
})
.collect::<Vec<_>>();
let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr);
Arc::new(
AggregateExec::try_new(
AggregateMode::FinalPartitioned,
final_grouping,
vec![],
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by,
vec![],
vec![],
vec![],
input,
schema.clone(),
)
.unwrap(),
),
schema,
)
.unwrap(),
)
}
fn hash_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_on: &JoinOn,
join_type: &JoinType,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
HashJoinExec::try_new(
left,
right,
join_on.clone(),
None,
join_type,
PartitionMode::Partitioned,
false,
)
.unwrap(),
)
}
fn sort_merge_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_on: &JoinOn,
join_type: &JoinType,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
SortMergeJoinExec::try_new(
left,
right,
join_on.clone(),
*join_type,
vec![SortOptions::default(); join_on.len()],
false,
)
.unwrap(),
)
}
fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let predicate = Arc::new(BinaryExpr::new(
col("c", &schema()).unwrap(),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
));
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}
fn sort_exec(
sort_exprs: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
preserve_partitioning: bool,
) -> Arc<dyn ExecutionPlan> {
let new_sort = SortExec::new(sort_exprs, input)
.with_preserve_partitioning(preserve_partitioning);
Arc::new(new_sort)
}
fn sort_preserving_merge_exec(
sort_exprs: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}
fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(
Arc::new(LocalLimitExec::new(input, 100)),
0,
Some(100),
))
}
fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
Arc::new(UnionExec::new(input))
}
fn sort_required_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(SortRequiredExec::new(input))
}
fn sort_required_exec_with_req(
input: Arc<dyn ExecutionPlan>,
sort_exprs: LexOrdering,
) -> Arc<dyn ExecutionPlan> {
Arc::new(SortRequiredExec::new_with_requirement(input, sort_exprs))
}
pub(crate) fn trim_plan_display(plan: &str) -> Vec<&str> {
plan.split('\n')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect()
}
fn ensure_distribution_helper(
plan: Arc<dyn ExecutionPlan>,
target_partitions: usize,
bounded_order_preserving_variants: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let distribution_context = DistributionContext::new(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
config.optimizer.enable_round_robin_repartition = false;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = bounded_order_preserving_variants;
ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
}
macro_rules! plans_matches_expected {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let physical_plan = $PLAN;
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected_plan_lines: Vec<&str> = $EXPECTED_LINES
.iter().map(|s| *s).collect();
assert_eq!(
expected_plan_lines, actual,
"\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
}
}
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024);
};
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $BOUNDED_ORDER_PRESERVING_VARIANTS, 10, false, 1024);
};
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
let mut config = ConfigOptions::new();
config.execution.target_partitions = $TARGET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
config.optimizer.prefer_existing_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS;
let optimizer = OutputRequirements::new_add_mode();
let optimized = optimizer.optimize($PLAN.clone(), &config)?;
let optimized = if $FIRST_ENFORCE_DIST {
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
let optimizer = EnforceSorting::new();
let optimized = optimizer.optimize(optimized, &config)?;
optimized
} else {
let optimizer = EnforceSorting::new();
let optimized = optimizer.optimize(optimized, &config)?;
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
let optimizer = EnforceDistribution::new();
let optimized = optimizer.optimize(optimized, &config)?;
optimized
};
let optimizer = OutputRequirements::new_remove_mode();
let optimized = optimizer.optimize(optimized, &config)?;
let plan = displayable(optimized.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
&expected_lines, &actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
macro_rules! assert_plan_txt {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
let plan = displayable($PLAN.as_ref()).indent(true).to_string();
let actual_lines = trim_plan_display(&plan);
assert_eq!(
&expected_lines, &actual_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
};
}
#[test]
fn multi_hash_joins() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
("d".to_string(), "d1".to_string()),
("e".to_string(), "e1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
let join_on = vec![(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
)];
for join_type in join_types {
let join = hash_join_exec(left.clone(), right.clone(), &join_on, &join_type);
let join_plan = format!(
"HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(a@0, b1@1)]"
);
match join_type {
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::LeftSemi
| JoinType::LeftAnti => {
let top_join_on = vec![(
Column::new_with_schema("a", &join.schema()).unwrap(),
Column::new_with_schema("c", &schema()).unwrap(),
)];
let top_join = hash_join_exec(
join.clone(),
parquet_exec(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(a@0, c@2)]");
let expected = match join_type {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => vec![
top_join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
}
JoinType::RightSemi | JoinType::RightAnti => {}
}
match join_type {
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::RightSemi
| JoinType::RightAnti => {
let top_join_on = vec![(
Column::new_with_schema("b1", &join.schema()).unwrap(),
Column::new_with_schema("c", &schema()).unwrap(),
)];
let top_join =
hash_join_exec(join, parquet_exec(), &top_join_on, &join_type);
let top_join_plan = match join_type {
JoinType::RightSemi | JoinType::RightAnti =>
format!("HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(b1@1, c@2)]"),
_ =>
format!("HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(b1@6, c@2)]"),
};
let expected = match join_type {
JoinType::Inner | JoinType::Right | JoinType::RightSemi | JoinType::RightAnti =>
vec![
top_join_plan.as_str(),
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ =>
vec![
top_join_plan.as_str(),
"RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10",
join_plan.as_str(),
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
}
JoinType::LeftSemi | JoinType::LeftAnti => {}
}
}
Ok(())
}
#[test]
fn multi_joins_after_alias() -> Result<()> {
let left = parquet_exec();
let right = parquet_exec();
let join_on = vec![(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("b", &schema()).unwrap(),
)];
let join = hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("a".to_string(), "a2".to_string()),
];
let projection = projection_exec_with_alias(join, alias_pairs);
let top_join_on = vec![(
Column::new_with_schema("a1", &projection.schema()).unwrap(),
Column::new_with_schema("c", &schema()).unwrap(),
)];
let top_join = hash_join_exec(
projection.clone(),
right.clone(),
&top_join_on,
&JoinType::Inner,
);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]",
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
let top_join_on = vec![(
Column::new_with_schema("a2", &projection.schema()).unwrap(),
Column::new_with_schema("c", &schema()).unwrap(),
)];
let top_join = hash_join_exec(projection, right, &top_join_on, &JoinType::Inner);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a2@1, c@2)]",
"ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
Ok(())
}
#[test]
fn multi_joins_after_multi_alias() -> Result<()> {
let left = parquet_exec();
let right = parquet_exec();
let join_on = vec![(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("b", &schema()).unwrap(),
)];
let join = hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let alias_pairs: Vec<(String, String)> =
vec![("c".to_string(), "c1".to_string())];
let projection = projection_exec_with_alias(join, alias_pairs);
let alias_pairs: Vec<(String, String)> =
vec![("c1".to_string(), "a".to_string())];
let projection2 = projection_exec_with_alias(projection, alias_pairs);
let top_join_on = vec![(
Column::new_with_schema("a", &projection2.schema()).unwrap(),
Column::new_with_schema("c", &schema()).unwrap(),
)];
let top_join = hash_join_exec(projection2, right, &top_join_on, &JoinType::Inner);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, c@2)]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"ProjectionExec: expr=[c1@0 as a]",
"ProjectionExec: expr=[c@2 as c1]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
Ok(())
}
#[test]
fn join_after_agg_alias() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a2".to_string())],
);
let join_on = vec![(
Column::new_with_schema("a1", &left.schema()).unwrap(),
Column::new_with_schema("a2", &right.schema()).unwrap(),
)];
let join = hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, a2@0)]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join.clone(), true);
assert_optimized!(expected, join, false);
Ok(())
}
#[test]
fn hash_join_key_ordering() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
],
);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![
("b".to_string(), "b".to_string()),
("a".to_string(), "a".to_string()),
],
);
let join_on = vec![
(
Column::new_with_schema("b1", &left.schema()).unwrap(),
Column::new_with_schema("b", &right.schema()).unwrap(),
),
(
Column::new_with_schema("a1", &left.schema()).unwrap(),
Column::new_with_schema("a", &right.schema()).unwrap(),
),
];
let join = hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b1@1, b@0), (a1@0, a@1)]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join.clone(), true);
assert_optimized!(expected, join, false);
Ok(())
}
#[test]
fn multi_hash_join_key_ordering() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_on = vec![
(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("b", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("c", &schema()).unwrap(),
Column::new_with_schema("c1", &right.schema()).unwrap(),
),
];
let bottom_left_join =
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner);
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "A".to_string()),
("a".to_string(), "AA".to_string()),
("b".to_string(), "B".to_string()),
("c".to_string(), "C".to_string()),
];
let bottom_left_projection =
projection_exec_with_alias(bottom_left_join, alias_pairs);
let join_on = vec![
(
Column::new_with_schema("c", &schema()).unwrap(),
Column::new_with_schema("c1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("b", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
let bottom_right_join =
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let top_join_on = vec![
(
Column::new_with_schema("B", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("b1", &bottom_right_join.schema()).unwrap(),
),
(
Column::new_with_schema("C", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("c", &bottom_right_join.schema()).unwrap(),
),
(
Column::new_with_schema("AA", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("a1", &bottom_right_join.schema()).unwrap(),
),
];
let top_join = hash_join_exec(
bottom_left_projection.clone(),
bottom_right_join,
&top_join_on,
&JoinType::Inner,
);
let predicate: Arc<dyn PhysicalExpr> = binary(
col("c", top_join.schema().deref())?,
Operator::Gt,
lit(1i64),
top_join.schema().deref(),
)?;
let filter_top_join: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, top_join)?);
let expected = &[
"FilterExec: c@6 > 1",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]",
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]",
"RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]",
"RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, filter_top_join.clone(), true);
assert_optimized!(expected, filter_top_join, false);
Ok(())
}
#[test]
fn reorder_join_keys_to_left_input() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_on = vec![
(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("b", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("c", &schema()).unwrap(),
Column::new_with_schema("c1", &right.schema()).unwrap(),
),
];
let bottom_left_join = ensure_distribution_helper(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
true,
)?;
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "A".to_string()),
("a".to_string(), "AA".to_string()),
("b".to_string(), "B".to_string()),
("c".to_string(), "C".to_string()),
];
let bottom_left_projection =
projection_exec_with_alias(bottom_left_join, alias_pairs);
let join_on = vec![
(
Column::new_with_schema("c", &schema()).unwrap(),
Column::new_with_schema("c1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("b", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
let bottom_right_join = ensure_distribution_helper(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
true,
)?;
let top_join_on = vec![
(
Column::new_with_schema("B", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("b1", &bottom_right_join.schema()).unwrap(),
),
(
Column::new_with_schema("C", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("c", &bottom_right_join.schema()).unwrap(),
),
(
Column::new_with_schema("AA", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("a1", &bottom_right_join.schema()).unwrap(),
),
];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
for join_type in join_types {
let top_join = hash_join_exec(
bottom_left_projection.clone(),
bottom_right_join.clone(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("HashJoinExec: mode=Partitioned, join_type={:?}, on=[(AA@1, a1@5), (B@2, b1@6), (C@3, c@2)]", &join_type);
let reordered = reorder_join_keys_to_inputs(top_join)?;
let expected = &[
top_join_plan.as_str(),
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]",
"RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]",
"RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, reordered);
}
Ok(())
}
#[test]
fn reorder_join_keys_to_right_input() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_on = vec![
(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("b", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
];
let bottom_left_join = ensure_distribution_helper(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
true,
)?;
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "A".to_string()),
("a".to_string(), "AA".to_string()),
("b".to_string(), "B".to_string()),
("c".to_string(), "C".to_string()),
];
let bottom_left_projection =
projection_exec_with_alias(bottom_left_join, alias_pairs);
let join_on = vec![
(
Column::new_with_schema("c", &schema()).unwrap(),
Column::new_with_schema("c1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("b", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
let bottom_right_join = ensure_distribution_helper(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
true,
)?;
let top_join_on = vec![
(
Column::new_with_schema("B", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("b1", &bottom_right_join.schema()).unwrap(),
),
(
Column::new_with_schema("C", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("c", &bottom_right_join.schema()).unwrap(),
),
(
Column::new_with_schema("AA", &bottom_left_projection.schema()).unwrap(),
Column::new_with_schema("a1", &bottom_right_join.schema()).unwrap(),
),
];
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
JoinType::RightSemi,
JoinType::RightAnti,
];
for join_type in join_types {
let top_join = hash_join_exec(
bottom_left_projection.clone(),
bottom_right_join.clone(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("HashJoinExec: mode=Partitioned, join_type={:?}, on=[(C@3, c@2), (B@2, b1@6), (AA@1, a1@5)]", &join_type);
let reordered = reorder_join_keys_to_inputs(top_join)?;
let expected = &[
top_join_plan.as_str(),
"ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]",
"RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]",
"RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, reordered);
}
Ok(())
}
#[test]
fn multi_smj_joins() -> Result<()> {
let left = parquet_exec();
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
("c".to_string(), "c1".to_string()),
("d".to_string(), "d1".to_string()),
("e".to_string(), "e1".to_string()),
];
let right = projection_exec_with_alias(parquet_exec(), alias_pairs);
let join_types = vec![
JoinType::Inner,
JoinType::Left,
JoinType::Right,
JoinType::Full,
JoinType::LeftSemi,
JoinType::LeftAnti,
];
let join_on = vec![(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
)];
for join_type in join_types {
let join =
sort_merge_join_exec(left.clone(), right.clone(), &join_on, &join_type);
let join_plan =
format!("SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]");
let top_join_on = vec![(
Column::new_with_schema("a", &join.schema()).unwrap(),
Column::new_with_schema("c", &schema()).unwrap(),
)];
let top_join = sort_merge_join_exec(
join.clone(),
parquet_exec(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("SortMergeJoin: join_type={join_type}, on=[(a@0, c@2)]");
let expected = match join_type {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti =>
vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => vec![
top_join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected, top_join.clone(), true, true);
let expected_first_sort_enforcement = match join_type {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti =>
vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => vec![
top_join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
};
assert_optimized!(expected_first_sort_enforcement, top_join, false, true);
match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
let top_join_on = vec![(
Column::new_with_schema("b1", &join.schema()).unwrap(),
Column::new_with_schema("c", &schema()).unwrap(),
)];
let top_join = sort_merge_join_exec(
join,
parquet_exec(),
&top_join_on,
&join_type,
);
let top_join_plan =
format!("SortMergeJoin: join_type={join_type}, on=[(b1@6, c@2)]");
let expected = match join_type {
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
"SortExec: expr=[b1@6 ASC]",
"RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10",
join_plan.as_str(),
"SortExec: expr=[a@0 ASC]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b1@1 ASC]",
"RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[c@2 ASC]",
"RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => unreachable!()
};
assert_optimized!(expected, top_join.clone(), true, true);
let expected_first_sort_enforcement = match join_type {
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, sort_exprs=b1@6 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@6 ASC]",
"CoalescePartitionsExec",
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
_ => unreachable!()
};
assert_optimized!(
expected_first_sort_enforcement,
top_join,
false,
true
);
}
_ => {}
}
}
Ok(())
}
#[test]
fn smj_join_key_ordering() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![
("a".to_string(), "a1".to_string()),
("b".to_string(), "b1".to_string()),
],
);
let alias_pairs: Vec<(String, String)> = vec![
("a1".to_string(), "a3".to_string()),
("b1".to_string(), "b3".to_string()),
];
let left = projection_exec_with_alias(left, alias_pairs);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![
("b".to_string(), "b".to_string()),
("a".to_string(), "a".to_string()),
],
);
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a2".to_string()),
("b".to_string(), "b2".to_string()),
];
let right = projection_exec_with_alias(right, alias_pairs);
let join_on = vec![
(
Column::new_with_schema("b3", &left.schema()).unwrap(),
Column::new_with_schema("b2", &right.schema()).unwrap(),
),
(
Column::new_with_schema("a3", &left.schema()).unwrap(),
Column::new_with_schema("a2", &right.schema()).unwrap(),
),
];
let join = sort_merge_join_exec(left, right.clone(), &join_on, &JoinType::Inner);
let expected = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
"SortExec: expr=[b3@1 ASC,a3@0 ASC]",
"ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortExec: expr=[b2@1 ASC,a2@0 ASC]",
"ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, join.clone(), true, true);
let expected_first_sort_enforcement = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
"SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, sort_exprs=b3@1 ASC,a3@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b3@1 ASC,a3@0 ASC]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
"ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
"AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([b1@0, a1@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10, sort_exprs=b2@1 ASC,a2@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b2@1 ASC,a2@0 ASC]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
"AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([b@0, a@1], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected_first_sort_enforcement, join, false, true);
Ok(())
}
#[test]
fn merge_does_not_need_sort() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let exec = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let exec = Arc::new(CoalesceBatchesExec::new(exec, 4096));
let exec = Arc::new(SortPreservingMergeExec::new(sort_key, exec));
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"CoalesceBatchesExec: target_batch_size=4096",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, exec, true);
let expected = &[
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"CoalesceBatchesExec: target_batch_size=4096",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, exec, false);
Ok(())
}
#[test]
fn union_to_interleave() -> Result<()> {
let left = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
let right = aggregate_exec_with_alias(
parquet_exec(),
vec![("a".to_string(), "a1".to_string())],
);
let plan = Arc::new(UnionExec::new(vec![left, right]));
let plan =
aggregate_exec_with_alias(plan, vec![("a1".to_string(), "a2".to_string())]);
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
"AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
"InterleaveExec",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan = aggregate_exec_with_alias(parquet_exec(), alias);
let expected = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_deepest_node() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan = aggregate_exec_with_alias(filter_exec(parquet_exec()), alias);
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_unsorted_limit() -> Result<()> {
let plan = limit_exec(filter_exec(parquet_exec()));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_sorted_limit() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = limit_exec(sort_exec(sort_key, parquet_exec(), false));
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_sorted_limit_with_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan =
sort_required_exec(filter_exec(sort_exec(sort_key, parquet_exec(), false)));
let expected = &[
"SortRequiredExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_limit() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan = aggregate_exec_with_alias(
limit_exec(filter_exec(limit_exec(parquet_exec()))),
alias,
);
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_union() -> Result<()> {
let plan = union_exec(vec![parquet_exec(); 5]);
let expected = &[
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_through_sort_preserving_merge() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_preserving_merge_exec(sort_key, parquet_exec());
let expected = &[
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_sort_preserving_merge() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_preserving_merge_exec(
sort_key.clone(),
parquet_exec_multiple_sorted(vec![sort_key]),
);
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan.clone(), true);
let expected = &[
"SortExec: expr=[c@2 ASC]",
"CoalescePartitionsExec",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]);
let plan = sort_preserving_merge_exec(sort_key, input);
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan.clone(), true);
let expected = &[
"SortExec: expr=[c@2 ASC]",
"CoalescePartitionsExec",
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_does_not_destroy_sort() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan =
sort_required_exec(filter_exec(parquet_exec_with_sort(vec![sort_key])));
let expected = &[
"SortRequiredExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan.clone(), true, true);
assert_optimized!(expected, plan, false, true);
Ok(())
}
#[test]
fn repartition_does_not_destroy_sort_more_complex() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input1 = sort_required_exec(parquet_exec_with_sort(vec![sort_key]));
let input2 = filter_exec(parquet_exec());
let plan = union_exec(vec![input1, input2]);
let expected = &[
"UnionExec",
"SortRequiredExec: [c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_transitively_with_projection() -> Result<()> {
let schema = schema();
let proj_exprs = vec![(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
col("b", &schema).unwrap(),
)) as Arc<dyn PhysicalExpr>,
"sum".to_string(),
)];
let proj = Arc::new(ProjectionExec::try_new(proj_exprs, parquet_exec())?);
let sort_key = vec![PhysicalSortExpr {
expr: col("sum", &proj.schema()).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_preserving_merge_exec(sort_key, proj);
let expected = &[
"SortPreservingMergeExec: [sum@0 ASC]",
"SortExec: expr=[sum@0 ASC]",
"ProjectionExec: expr=[a@0 + b@1 as sum]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[sum@0 ASC]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a@0 + b@1 as sum]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
Ok(())
}
#[test]
fn repartition_ignores_transitively_with_projection() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![
("a".to_string(), "a".to_string()),
("b".to_string(), "b".to_string()),
("c".to_string(), "c".to_string()),
];
let plan = sort_required_exec(projection_exec_with_alias(
parquet_exec_multiple_sorted(vec![sort_key]),
alias,
));
let expected = &[
"SortRequiredExec: [c@2 ASC]",
"ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_transitively_past_sort_with_projection() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![("a".to_string(), "a".to_string())];
let plan = sort_preserving_merge_exec(
sort_key.clone(),
sort_exec(
sort_key,
projection_exec_with_alias(parquet_exec(), alias),
true,
),
);
let expected = &[
"SortExec: expr=[c@2 ASC]",
"ProjectionExec: expr=[a@0 as a]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
Ok(())
}
#[test]
fn repartition_transitively_past_sort_with_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false);
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
Ok(())
}
#[test]
#[cfg(feature = "parquet")]
fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan = sort_exec(
sort_key,
projection_exec_with_alias(
filter_exec(parquet_exec()),
vec![("a".to_string(), "a".to_string())],
),
false,
);
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"SortExec: expr=[c@2 ASC]",
"ProjectionExec: expr=[a@0 as a]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[c@2 ASC]",
"CoalescePartitionsExec",
"ProjectionExec: expr=[a@0 as a]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
Ok(())
}
#[test]
fn parallelization_single_partition() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet = aggregate_exec_with_alias(parquet_exec(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec(), alias);
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"ParquetExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"CsvExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_compressed_csv() -> Result<()> {
let compression_types = [
FileCompressionType::GZIP,
FileCompressionType::BZIP2,
FileCompressionType::XZ,
FileCompressionType::ZSTD,
FileCompressionType::UNCOMPRESSED,
];
let expected_not_partitioned = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
let expected_partitioned = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"CsvExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], has_header=false",
];
for compression_type in compression_types {
let expected = if compression_type.is_compressed() {
&expected_not_partitioned[..]
} else {
&expected_partitioned[..]
};
let plan = aggregate_exec_with_alias(
Arc::new(CsvExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![vec![PartitionedFile::new(
"x".to_string(),
100,
)]],
statistics: Statistics::new_unknown(&schema()),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
false,
b',',
b'"',
None,
compression_type,
)),
vec![("a".to_string(), "a".to_string())],
);
assert_optimized!(expected, plan, true, false, 2, true, 10);
}
Ok(())
}
#[test]
fn parallelization_two_partitions() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet =
aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias.clone());
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10);
assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
Ok(())
}
#[test]
fn parallelization_two_partitions_into_four() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet =
aggregate_exec_with_alias(parquet_exec_multiple(), alias.clone());
let plan_csv = aggregate_exec_with_alias(csv_exec_multiple(), alias.clone());
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"ParquetExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"CsvExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true, 10);
assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10);
Ok(())
}
#[test]
fn parallelization_sorted_limit() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet = limit_exec(sort_exec(sort_key.clone(), parquet_exec(), false));
let plan_csv = limit_exec(sort_exec(sort_key.clone(), csv_exec(), false));
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"SortExec: expr=[c@2 ASC]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_limit_with_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet = limit_exec(filter_exec(sort_exec(
sort_key.clone(),
parquet_exec(),
false,
)));
let plan_csv =
limit_exec(filter_exec(sort_exec(sort_key.clone(), csv_exec(), false)));
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_ignores_limit() -> Result<()> {
let alias = vec![("a".to_string(), "a".to_string())];
let plan_parquet = aggregate_exec_with_alias(
limit_exec(filter_exec(limit_exec(parquet_exec()))),
alias.clone(),
);
let plan_csv = aggregate_exec_with_alias(
limit_exec(filter_exec(limit_exec(csv_exec()))),
alias.clone(),
);
let expected_parquet = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let expected_csv = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_union_inputs() -> Result<()> {
let plan_parquet = union_exec(vec![parquet_exec(); 5]);
let plan_csv = union_exec(vec![csv_exec(); 5]);
let expected_parquet = &[
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let expected_csv = &[
"UnionExec",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_prior_to_sort_preserving_merge() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet = sort_preserving_merge_exec(
sort_key.clone(),
parquet_exec_with_sort(vec![sort_key.clone()]),
);
let plan_csv = sort_preserving_merge_exec(
sort_key.clone(),
csv_exec_with_sort(vec![sort_key.clone()]),
);
let expected_parquet = &[
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
let expected_csv = &[
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_sort_preserving_merge_with_union() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input_parquet =
union_exec(vec![parquet_exec_with_sort(vec![sort_key.clone()]); 2]);
let input_csv = union_exec(vec![csv_exec_with_sort(vec![sort_key.clone()]); 2]);
let plan_parquet = sort_preserving_merge_exec(sort_key.clone(), input_parquet);
let plan_csv = sort_preserving_merge_exec(sort_key.clone(), input_csv);
let expected_parquet = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
let expected_csv = &[
"SortPreservingMergeExec: [c@2 ASC]",
"UnionExec",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_does_not_benefit() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet =
sort_required_exec(parquet_exec_with_sort(vec![sort_key.clone()]));
let plan_csv = sort_required_exec(csv_exec_with_sort(vec![sort_key]));
let expected_parquet = &[
"SortRequiredExec: [c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
let expected_csv = &[
"SortRequiredExec: [c@2 ASC]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a2".to_string()),
("c".to_string(), "c2".to_string()),
];
let proj_parquet = projection_exec_with_alias(
parquet_exec_with_sort(vec![sort_key.clone()]),
alias_pairs.clone(),
);
let sort_key_after_projection = vec![PhysicalSortExpr {
expr: col("c2", &proj_parquet.schema()).unwrap(),
options: SortOptions::default(),
}];
let plan_parquet =
sort_preserving_merge_exec(sort_key_after_projection, proj_parquet);
let expected = &[
"SortPreservingMergeExec: [c2@1 ASC]",
" ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
plans_matches_expected!(expected, &plan_parquet);
let expected_parquet = &[
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected_parquet, plan_parquet, true);
Ok(())
}
#[test]
fn parallelization_ignores_transitively_with_projection_csv() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias_pairs: Vec<(String, String)> = vec![
("a".to_string(), "a2".to_string()),
("c".to_string(), "c2".to_string()),
];
let proj_csv =
projection_exec_with_alias(csv_exec_with_sort(vec![sort_key]), alias_pairs);
let sort_key_after_projection = vec![PhysicalSortExpr {
expr: col("c2", &proj_csv.schema()).unwrap(),
options: SortOptions::default(),
}];
let plan_csv = sort_preserving_merge_exec(sort_key_after_projection, proj_csv);
let expected = &[
"SortPreservingMergeExec: [c2@1 ASC]",
" ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
plans_matches_expected!(expected, &plan_csv);
let expected_csv = &[
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
"CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], has_header=false",
];
assert_optimized!(expected_csv, plan_csv, true);
Ok(())
}
#[test]
fn remove_redundant_roundrobins() -> Result<()> {
let input = parquet_exec();
let repartition = repartition_exec(repartition_exec(input));
let physical_plan = repartition_exec(filter_exec(repartition));
let expected = &[
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
" FilterExec: c@2 = 0",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
plans_matches_expected!(expected, &physical_plan);
let expected = &[
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
"SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, sort_exprs=c@2 ASC",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true, true);
assert_optimized!(expected, physical_plan, false, true);
Ok(())
}
#[test]
fn do_not_preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true);
let expected = &[
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn no_need_for_sort_after_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
let expected = &[
"CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn do_not_preserve_ordering_through_repartition2() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let sort_req = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let physical_plan = sort_preserving_merge_exec(sort_req, filter_exec(input));
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true);
let expected = &[
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
"SortExec: expr=[a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn do_not_preserve_ordering_through_repartition3() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let physical_plan = filter_exec(input);
let expected = &[
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec();
let physical_plan = sort_required_exec_with_req(filter_exec(input), sort_key);
let expected = &[
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_plan_txt!(expected, physical_plan);
let expected = &[
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let distribution_plan =
EnforceDistribution::new().optimize(physical_plan, &config)?;
assert_plan_txt!(expected, distribution_plan);
Ok(())
}
#[test]
fn put_sort_when_input_is_valid() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options: SortOptions::default(),
}];
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_required_exec_with_req(filter_exec(input), sort_key);
let expected = &[
"SortRequiredExec: [a@0 ASC]",
"FilterExec: c@2 = 0",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
assert_plan_txt!(expected, physical_plan);
let expected = &[
"SortRequiredExec: [a@0 ASC]",
"SortExec: expr=[a@0 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
];
let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.optimizer.enable_round_robin_repartition = true;
config.optimizer.prefer_existing_sort = false;
let distribution_plan =
EnforceDistribution::new().optimize(physical_plan, &config)?;
assert_plan_txt!(expected, distribution_plan);
Ok(())
}
#[test]
fn do_not_add_unnecessary_hash() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![("a".to_string(), "a".to_string())];
let input = parquet_exec_with_sort(vec![sort_key]);
let physical_plan = aggregate_exec_with_alias(input, alias.clone());
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true, false, 1, false, 1024);
assert_optimized!(expected, physical_plan, false, false, 1, false, 1024);
Ok(())
}
#[test]
fn do_not_add_unnecessary_hash2() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
let alias = vec![("a".to_string(), "a".to_string())];
let input = parquet_exec_multiple_sorted(vec![sort_key]);
let aggregate = aggregate_exec_with_alias(input, alias.clone());
let physical_plan = aggregate_exec_with_alias(aggregate, alias.clone());
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];
assert_optimized!(expected, physical_plan.clone(), true, false, 4, false, 1024);
assert_optimized!(expected, physical_plan, false, false, 4, false, 1024);
Ok(())
}
#[test]
fn optimize_away_unnecessary_repartition() -> Result<()> {
let physical_plan = coalesce_partitions_exec(repartition_exec(parquet_exec()));
let expected = &[
"CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
plans_matches_expected!(expected, physical_plan.clone());
let expected =
&["ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]"];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
#[test]
fn optimize_away_unnecessary_repartition2() -> Result<()> {
let physical_plan = filter_exec(repartition_exec(coalesce_partitions_exec(
filter_exec(repartition_exec(parquet_exec())),
)));
let expected = &[
"FilterExec: c@2 = 0",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CoalescePartitionsExec",
" FilterExec: c@2 = 0",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
plans_matches_expected!(expected, physical_plan.clone());
let expected = &[
"FilterExec: c@2 = 0",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
Ok(())
}
}