use std::sync::Arc;
use super::utils::{is_repartition, is_sort_preserving_merge};
use crate::error::Result;
use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::Transformed;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::ExecutionPlanProperties;
use itertools::izip;
pub type OrderPreservationContext = PlanContext<bool>;
pub fn update_children(opc: &mut OrderPreservationContext) {
for PlanContext {
plan,
children,
data,
} in opc.children.iter_mut()
{
let maintains_input_order = plan.maintains_input_order();
let inspect_child = |idx| {
maintains_input_order[idx]
|| is_coalesce_partitions(plan)
|| is_repartition(plan)
};
for (idx, c) in children.iter_mut().enumerate() {
c.data &= inspect_child(idx);
}
let plan_children = plan.children();
*data = if plan_children.is_empty() {
false
} else if !children[0].data
&& ((is_repartition(plan) && !maintains_input_order[0])
|| (is_coalesce_partitions(plan)
&& plan_children[0].output_ordering().is_some()))
{
true
} else {
children
.iter()
.enumerate()
.any(|(idx, c)| c.data && inspect_child(idx))
}
}
opc.data = false;
}
fn plan_with_order_preserving_variants(
mut sort_input: OrderPreservationContext,
is_spr_better: bool,
is_spm_better: bool,
) -> Result<OrderPreservationContext> {
sort_input.children = sort_input
.children
.into_iter()
.map(|node| {
if node.data {
plan_with_order_preserving_variants(node, is_spr_better, is_spm_better)
} else {
Ok(node)
}
})
.collect::<Result<_>>()?;
sort_input.data = false;
if is_repartition(&sort_input.plan)
&& !sort_input.plan.maintains_input_order()[0]
&& is_spr_better
{
let child = sort_input.children[0].plan.clone();
let partitioning = sort_input.plan.output_partitioning().clone();
sort_input.plan = Arc::new(
RepartitionExec::try_new(child, partitioning)?.with_preserve_order(),
) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
} else if is_coalesce_partitions(&sort_input.plan) && is_spm_better {
let child = &sort_input.children[0].plan;
if let Some(ordering) = child.output_ordering().map(Vec::from) {
let spm = SortPreservingMergeExec::new(ordering, child.clone());
sort_input.plan = Arc::new(spm) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
}
}
sort_input.update_plan_from_children()
}
fn plan_with_order_breaking_variants(
mut sort_input: OrderPreservationContext,
) -> Result<OrderPreservationContext> {
let plan = &sort_input.plan;
sort_input.children = izip!(
sort_input.children,
plan.maintains_input_order(),
plan.required_input_ordering()
)
.map(|(node, maintains, required_ordering)| {
if maintains
&& (is_sort_preserving_merge(plan)
|| !required_ordering.map_or(false, |required_ordering| {
node.plan
.equivalence_properties()
.ordering_satisfy_requirement(&required_ordering)
}))
{
plan_with_order_breaking_variants(node)
} else {
Ok(node)
}
})
.collect::<Result<_>>()?;
sort_input.data = false;
if is_repartition(plan) && plan.maintains_input_order()[0] {
let child = sort_input.children[0].plan.clone();
let partitioning = plan.output_partitioning().clone();
sort_input.plan = Arc::new(RepartitionExec::try_new(child, partitioning)?) as _;
} else if is_sort_preserving_merge(plan) {
let child = sort_input.children[0].plan.clone();
let coalesce = CoalescePartitionsExec::new(child);
sort_input.plan = Arc::new(coalesce) as _;
} else {
return sort_input.update_plan_from_children();
}
sort_input.children[0].data = false;
Ok(sort_input)
}
pub(crate) fn replace_with_order_preserving_variants(
mut requirements: OrderPreservationContext,
is_spr_better: bool,
is_spm_better: bool,
config: &ConfigOptions,
) -> Result<Transformed<OrderPreservationContext>> {
update_children(&mut requirements);
if !(is_sort(&requirements.plan) && requirements.children[0].data) {
return Ok(Transformed::no(requirements));
}
let use_order_preserving_variant = config.optimizer.prefer_existing_sort
|| !requirements.plan.execution_mode().pipeline_friendly();
let mut alternate_plan = plan_with_order_preserving_variants(
requirements.children.swap_remove(0),
is_spr_better || use_order_preserving_variant,
is_spm_better || use_order_preserving_variant,
)?;
if alternate_plan
.plan
.equivalence_properties()
.ordering_satisfy(requirements.plan.output_ordering().unwrap_or(&[]))
{
for child in alternate_plan.children.iter_mut() {
child.data = false;
}
Ok(Transformed::yes(alternate_plan))
} else {
alternate_plan = plan_with_order_breaking_variants(alternate_plan)?;
alternate_plan.data = false;
requirements.children = vec![alternate_plan];
Ok(Transformed::yes(requirements))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::physical_optimizer::test_utils::check_integrity;
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{
displayable, get_plan_string, ExecutionPlan, Partitioning,
};
use crate::prelude::SessionConfig;
use crate::test::TestStreamPartition;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{self, col, Column};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::streaming::StreamingTableExec;
use rstest::rstest;
macro_rules! assert_optimized_in_all_boundedness_situations {
($EXPECTED_UNBOUNDED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PLAN_LINES: expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $SOURCE_UNBOUNDED: expr) => {
if $SOURCE_UNBOUNDED {
assert_optimized_prefer_sort_on_off!(
$EXPECTED_UNBOUNDED_PLAN_LINES,
$EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
$EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
$PLAN
);
} else {
assert_optimized_prefer_sort_on_off!(
$EXPECTED_BOUNDED_PLAN_LINES,
$EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES,
$EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
$PLAN
);
}
};
}
macro_rules! assert_optimized_prefer_sort_on_off {
($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
assert_optimized!(
$EXPECTED_PLAN_LINES,
$EXPECTED_OPTIMIZED_PLAN_LINES,
$PLAN.clone(),
false
);
assert_optimized!(
$EXPECTED_PLAN_LINES,
$EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
$PLAN,
true
);
};
}
macro_rules! assert_optimized {
($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $PREFER_EXISTING_SORT: expr) => {
let physical_plan = $PLAN;
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
.iter().map(|s| *s).collect();
assert_eq!(
expected_plan_lines, actual,
"\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect();
let config = SessionConfig::new().with_prefer_existing_sort($PREFER_EXISTING_SORT);
let plan_with_pipeline_fixer = OrderPreservationContext::new_default(physical_plan);
let parallel = plan_with_pipeline_fixer.transform_up(|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options())).data().and_then(check_integrity)?;
let optimized_physical_plan = parallel.plan;
let actual = get_plan_string(&optimized_physical_plan);
assert_eq!(
expected_optimized_lines, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
};
}
#[rstest]
#[tokio::test]
async fn test_replace_multiple_input_repartition_1(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition = repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_with_inter_children_change_only(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_default("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let sort = sort_exec(
vec![sort_expr_default("a", &coalesce_partitions.schema())],
coalesce_partitions,
false,
);
let repartition_rr2 = repartition_exec_round_robin(sort);
let repartition_hash2 = repartition_exec_hash(repartition_rr2);
let filter = filter_exec(repartition_hash2);
let sort2 =
sort_exec(vec![sort_expr_default("a", &filter.schema())], filter, true);
let physical_plan = sort_preserving_merge_exec(
vec![sort_expr_default("a", &sort2.schema())],
sort2,
);
let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_replace_multiple_input_repartition_2(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let filter = filter_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(filter);
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_replace_multiple_input_repartition_with_extra_steps(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
let coalesce_batches_exec: Arc<dyn ExecutionPlan> = coalesce_batches_exec(filter);
let sort = sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_replace_multiple_input_repartition_with_extra_steps_2(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
let filter = filter_exec(repartition_hash);
let coalesce_batches_exec_2 = coalesce_batches_exec(filter);
let sort =
sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec_2, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_not_replacing_when_no_need_to_preserve_sorting(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
let coalesce_batches_exec: Arc<dyn ExecutionPlan> = coalesce_batches_exec(filter);
let physical_plan: Arc<dyn ExecutionPlan> =
coalesce_partitions_exec(coalesce_batches_exec);
let expected_input_unbounded = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = expected_optimized_bounded;
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_with_multiple_replacable_repartitions(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
let coalesce_batches = coalesce_batches_exec(filter);
let repartition_hash_2 = repartition_exec_hash(coalesce_batches);
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash_2, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_not_replace_with_different_orderings(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let sort = sort_exec(
vec![sort_expr_default("c", &repartition_hash.schema())],
repartition_hash,
true,
);
let physical_plan = sort_preserving_merge_exec(
vec![sort_expr_default("c", &sort.schema())],
sort,
);
let expected_input_unbounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = expected_optimized_bounded;
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_with_lost_ordering(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan =
sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, false);
let expected_input_unbounded = [
"SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_with_lost_and_kept_ordering(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_sorted(&schema, sort_exprs)
};
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let sort = sort_exec(
vec![sort_expr_default("c", &coalesce_partitions.schema())],
coalesce_partitions,
false,
);
let repartition_rr2 = repartition_exec_round_robin(sort);
let repartition_hash2 = repartition_exec_hash(repartition_rr2);
let filter = filter_exec(repartition_hash2);
let sort2 =
sort_exec(vec![sort_expr_default("c", &filter.schema())], filter, true);
let physical_plan = sort_preserving_merge_exec(
vec![sort_expr_default("c", &sort2.schema())],
sort2,
);
let expected_input_unbounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = [
"SortPreservingMergeExec: [c@1 ASC]",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=c@1 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[c@1 ASC], preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
#[rstest]
#[tokio::test]
async fn test_with_multiple_child_trees(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema()?;
let left_sort_exprs = vec![sort_expr("a", &schema)];
let left_source = if source_unbounded {
stream_exec_ordered(&schema, left_sort_exprs)
} else {
csv_exec_sorted(&schema, left_sort_exprs)
};
let left_repartition_rr = repartition_exec_round_robin(left_source);
let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
let left_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
let right_sort_exprs = vec![sort_expr("a", &schema)];
let right_source = if source_unbounded {
stream_exec_ordered(&schema, right_sort_exprs)
} else {
csv_exec_sorted(&schema, right_sort_exprs)
};
let right_repartition_rr = repartition_exec_round_robin(right_source);
let right_repartition_hash = repartition_exec_hash(right_repartition_rr);
let right_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(right_repartition_hash, 4096));
let hash_join_exec =
hash_join_exec(left_coalesce_partitions, right_coalesce_partitions);
let sort = sort_exec(
vec![sort_expr_default("a", &hash_join_exec.schema())],
hash_join_exec,
true,
);
let physical_plan = sort_preserving_merge_exec(
vec![sort_expr_default("a", &sort.schema())],
sort,
);
let expected_input_unbounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_input_bounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_unbounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]",
];
let expected_optimized_bounded = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized_bounded_sort_preserve = expected_optimized_bounded;
assert_optimized_in_all_boundedness_situations!(
expected_input_unbounded,
expected_input_bounded,
expected_optimized_unbounded,
expected_optimized_bounded,
expected_optimized_bounded_sort_preserve,
physical_plan,
source_unbounded
);
Ok(())
}
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
let sort_opts = SortOptions {
nulls_first: false,
descending: false,
};
sort_expr_options(name, schema, sort_opts)
}
fn sort_expr_default(name: &str, schema: &Schema) -> PhysicalSortExpr {
let sort_opts = SortOptions::default();
sort_expr_options(name, schema, sort_opts)
}
fn sort_expr_options(
name: &str,
schema: &Schema,
options: SortOptions,
) -> PhysicalSortExpr {
PhysicalSortExpr {
expr: col(name, schema).unwrap(),
options,
}
}
fn sort_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
preserve_partitioning: bool,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(
SortExec::new(sort_exprs, input)
.with_preserve_partitioning(preserve_partitioning),
)
}
fn sort_preserving_merge_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}
fn repartition_exec_round_robin(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(8)).unwrap(),
)
}
fn repartition_exec_hash(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let input_schema = input.schema();
Arc::new(
RepartitionExec::try_new(
input,
Partitioning::Hash(vec![col("c", &input_schema).unwrap()], 8),
)
.unwrap(),
)
}
fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let input_schema = input.schema();
let predicate = expressions::binary(
col("c", &input_schema).unwrap(),
Operator::Gt,
expressions::lit(3i32),
&input_schema,
)
.unwrap();
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}
fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalesceBatchesExec::new(input, 8192))
}
fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalescePartitionsExec::new(input))
}
fn hash_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let left_on = col("c", &left.schema()).unwrap();
let right_on = col("c", &right.schema()).unwrap();
let left_col = left_on.as_any().downcast_ref::<Column>().unwrap();
let right_col = right_on.as_any().downcast_ref::<Column>().unwrap();
Arc::new(
HashJoinExec::try_new(
left,
right,
vec![(Arc::new(left_col.clone()), Arc::new(right_col.clone()))],
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
false,
)
.unwrap(),
)
}
fn create_test_schema() -> Result<SchemaRef> {
let column_a = Field::new("a", DataType::Int32, false);
let column_b = Field::new("b", DataType::Int32, false);
let column_c = Field::new("c", DataType::Int32, false);
let column_d = Field::new("d", DataType::Int32, false);
let schema = Arc::new(Schema::new(vec![column_a, column_b, column_c, column_d]));
Ok(schema)
}
fn stream_exec_ordered(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
let projection: Vec<usize> = vec![0, 2, 3];
Arc::new(
StreamingTableExec::try_new(
schema.clone(),
vec![Arc::new(TestStreamPartition {
schema: schema.clone(),
}) as _],
Some(&projection),
vec![sort_exprs],
true,
None,
)
.unwrap(),
)
}
fn csv_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
let projection: Vec<usize> = vec![0, 2, 3];
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("file_path".to_string(), 100))
.with_projection(Some(projection))
.with_output_ordering(vec![sort_exprs]),
)
.with_has_header(true)
.with_delimeter(0)
.with_quote(b'"')
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}
}