use std::sync::Arc;
use crate::error::Result;
use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort, ExecTree};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use super::utils::is_repartition;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_physical_plan::unbounded_output;
#[derive(Debug, Clone)]
pub(crate) struct OrderPreservationContext {
pub(crate) plan: Arc<dyn ExecutionPlan>,
ordering_onwards: Vec<Option<ExecTree>>,
}
impl OrderPreservationContext {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
OrderPreservationContext {
plan,
ordering_onwards: vec![None; length],
}
}
pub fn new_from_children_nodes(
children_nodes: Vec<OrderPreservationContext>,
parent_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let children_plans = children_nodes
.iter()
.map(|item| item.plan.clone())
.collect();
let ordering_onwards = children_nodes
.into_iter()
.enumerate()
.map(|(idx, item)| {
let plan = item.plan;
let children = plan.children();
let ordering_onwards = item.ordering_onwards;
if children.is_empty() {
None
} else if ordering_onwards[0].is_none()
&& ((is_repartition(&plan) && !plan.maintains_input_order()[0])
|| (is_coalesce_partitions(&plan)
&& children[0].output_ordering().is_some()))
{
Some(ExecTree::new(plan, idx, vec![]))
} else {
let children = ordering_onwards
.into_iter()
.flatten()
.filter(|item| {
plan.maintains_input_order()[item.idx]
|| is_coalesce_partitions(&plan)
|| is_repartition(&plan)
})
.collect::<Vec<_>>();
if children.is_empty() {
None
} else {
Some(ExecTree::new(plan, idx, children))
}
}
})
.collect();
let plan = with_new_children_if_necessary(parent_plan, children_plans)?.into();
Ok(OrderPreservationContext {
plan,
ordering_onwards,
})
}
pub fn children(&self) -> Vec<OrderPreservationContext> {
self.plan
.children()
.into_iter()
.map(OrderPreservationContext::new)
.collect()
}
}
impl TreeNode for OrderPreservationContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
}
fn map_children<F>(self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.children();
if children.is_empty() {
Ok(self)
} else {
let children_nodes = children
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
OrderPreservationContext::new_from_children_nodes(children_nodes, self.plan)
}
}
}
fn get_updated_plan(
exec_tree: &ExecTree,
is_spr_better: bool,
is_spm_better: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = exec_tree.plan.clone();
let mut children = plan.children();
for item in &exec_tree.children {
children[item.idx] = get_updated_plan(item, is_spr_better, is_spm_better)?;
}
let mut plan = plan.with_new_children(children)?;
if is_repartition(&plan) && !plan.maintains_input_order()[0] && is_spr_better {
let child = plan.children().swap_remove(0);
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?
.with_preserve_order();
plan = Arc::new(repartition) as _
}
let mut children = plan.children();
if is_coalesce_partitions(&plan)
&& children[0].output_ordering().is_some()
&& is_spm_better
{
let child = children.swap_remove(0);
plan = Arc::new(SortPreservingMergeExec::new(
child.output_ordering().unwrap_or(&[]).to_vec(),
child,
)) as _
}
Ok(plan)
}
pub(crate) fn replace_with_order_preserving_variants(
requirements: OrderPreservationContext,
is_spr_better: bool,
is_spm_better: bool,
config: &ConfigOptions,
) -> Result<Transformed<OrderPreservationContext>> {
let plan = &requirements.plan;
let ordering_onwards = &requirements.ordering_onwards;
if is_sort(plan) {
let exec_tree = if let Some(exec_tree) = &ordering_onwards[0] {
exec_tree
} else {
return Ok(Transformed::No(requirements));
};
let use_order_preserving_variant =
config.optimizer.prefer_existing_sort || unbounded_output(plan);
let updated_sort_input = get_updated_plan(
exec_tree,
is_spr_better || use_order_preserving_variant,
is_spm_better || use_order_preserving_variant,
)?;
if updated_sort_input
.equivalence_properties()
.ordering_satisfy(plan.output_ordering().unwrap_or(&[]))
{
return Ok(Transformed::Yes(OrderPreservationContext {
plan: updated_sort_input,
ordering_onwards: vec![None],
}));
}
}
Ok(Transformed::No(requirements))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::{HashJoinExec, PartitionMode};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{displayable, Partitioning};
use crate::prelude::SessionConfig;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::TreeNode;
use datafusion_common::{Result, Statistics};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{self, col, Column};
use datafusion_physical_expr::PhysicalSortExpr;
macro_rules! assert_optimized {
($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
assert_optimized!(
$EXPECTED_PLAN_LINES,
$EXPECTED_OPTIMIZED_PLAN_LINES,
$PLAN,
false
);
};
($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, $ALLOW_BOUNDED: expr) => {
let physical_plan = $PLAN;
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
.iter().map(|s| *s).collect();
assert_eq!(
expected_plan_lines, actual,
"\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES.iter().map(|s| *s).collect();
let config = SessionConfig::new().with_prefer_existing_sort($ALLOW_BOUNDED);
let plan_with_pipeline_fixer = OrderPreservationContext::new(physical_plan);
let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options()))?;
let optimized_physical_plan = parallel.plan;
let actual = get_plan_string(&optimized_physical_plan);
assert_eq!(
expected_optimized_lines, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
};
}
#[tokio::test]
async fn test_replace_multiple_input_repartition_1() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition = repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_with_inter_children_change_only() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_default("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let sort = sort_exec(
vec![sort_expr_default("a", &schema)],
coalesce_partitions,
false,
);
let repartition_rr2 = repartition_exec_round_robin(sort);
let repartition_hash2 = repartition_exec_hash(repartition_rr2);
let filter = filter_exec(repartition_hash2, &schema);
let sort2 = sort_exec(vec![sort_expr_default("a", &schema)], filter, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)], sort2);
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC]",
" FilterExec: c@2 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortPreservingMergeExec: [a@0 ASC]",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_replace_multiple_input_repartition_2() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let filter = filter_exec(repartition_rr, &schema);
let repartition_hash = repartition_exec_hash(filter);
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_replace_multiple_input_repartition_with_extra_steps() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash, &schema);
let coalesce_batches_exec: Arc<dyn ExecutionPlan> = coalesce_batches_exec(filter);
let sort = sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_replace_multiple_input_repartition_with_extra_steps_2() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
let filter = filter_exec(repartition_hash, &schema);
let coalesce_batches_exec_2 = coalesce_batches_exec(filter);
let sort =
sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec_2, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_not_replacing_when_no_need_to_preserve_sorting() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash, &schema);
let coalesce_batches_exec: Arc<dyn ExecutionPlan> = coalesce_batches_exec(filter);
let physical_plan: Arc<dyn ExecutionPlan> =
coalesce_partitions_exec(coalesce_batches_exec);
let expected_input = ["CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_with_multiple_replacable_repartitions() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash, &schema);
let coalesce_batches = coalesce_batches_exec(filter);
let repartition_hash_2 = repartition_exec_hash(coalesce_batches);
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash_2, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: c@2 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_not_replace_with_different_orderings() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let sort = sort_exec(
vec![sort_expr_default("c", &schema)],
repartition_hash,
true,
);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)], sort);
let expected_input = ["SortPreservingMergeExec: [c@2 ASC]",
" SortExec: expr=[c@2 ASC]",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [c@2 ASC]",
" SortExec: expr=[c@2 ASC]",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_with_lost_ordering() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan =
sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions, false);
let expected_input = ["SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_with_lost_and_kept_ordering() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let sort = sort_exec(
vec![sort_expr_default("c", &schema)],
coalesce_partitions,
false,
);
let repartition_rr2 = repartition_exec_round_robin(sort);
let repartition_hash2 = repartition_exec_hash(repartition_rr2);
let filter = filter_exec(repartition_hash2, &schema);
let sort2 = sort_exec(vec![sort_expr_default("c", &schema)], filter, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)], sort2);
let expected_input = [
"SortPreservingMergeExec: [c@2 ASC]",
" SortExec: expr=[c@2 ASC]",
" FilterExec: c@2 > 3",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[c@2 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [c@2 ASC]",
" FilterExec: c@2 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=c@2 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" SortExec: expr=[c@2 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_with_multiple_child_trees() -> Result<()> {
let schema = create_test_schema()?;
let left_sort_exprs = vec![sort_expr("a", &schema)];
let left_source = csv_exec_sorted(&schema, left_sort_exprs, true);
let left_repartition_rr = repartition_exec_round_robin(left_source);
let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
let left_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
let right_sort_exprs = vec![sort_expr("a", &schema)];
let right_source = csv_exec_sorted(&schema, right_sort_exprs, true);
let right_repartition_rr = repartition_exec_round_robin(right_source);
let right_repartition_hash = repartition_exec_hash(right_repartition_rr);
let right_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(right_repartition_hash, 4096));
let hash_join_exec =
hash_join_exec(left_coalesce_partitions, right_coalesce_partitions);
let sort = sort_exec(vec![sort_expr_default("a", &schema)], hash_join_exec, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)], sort);
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, c@1)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_with_bounded_input() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, false);
let repartition = repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
let physical_plan =
sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
let sort_opts = SortOptions {
nulls_first: false,
descending: false,
};
sort_expr_options(name, schema, sort_opts)
}
fn sort_expr_default(name: &str, schema: &Schema) -> PhysicalSortExpr {
let sort_opts = SortOptions::default();
sort_expr_options(name, schema, sort_opts)
}
fn sort_expr_options(
name: &str,
schema: &Schema,
options: SortOptions,
) -> PhysicalSortExpr {
PhysicalSortExpr {
expr: col(name, schema).unwrap(),
options,
}
}
fn sort_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
preserve_partitioning: bool,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(
SortExec::new(sort_exprs, input)
.with_preserve_partitioning(preserve_partitioning),
)
}
fn sort_preserving_merge_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}
fn repartition_exec_round_robin(
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(8)).unwrap(),
)
}
fn repartition_exec_hash(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(
RepartitionExec::try_new(
input,
Partitioning::Hash(vec![Arc::new(Column::new("c1", 0))], 8),
)
.unwrap(),
)
}
fn filter_exec(
input: Arc<dyn ExecutionPlan>,
schema: &SchemaRef,
) -> Arc<dyn ExecutionPlan> {
let predicate = expressions::binary(
col("c", schema).unwrap(),
Operator::Gt,
expressions::lit(3i32),
schema,
)
.unwrap();
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}
fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalesceBatchesExec::new(input, 8192))
}
fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalescePartitionsExec::new(input))
}
fn hash_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
HashJoinExec::try_new(
left,
right,
vec![(Column::new("c", 1), Column::new("c", 1))],
None,
&JoinType::Inner,
PartitionMode::Partitioned,
false,
)
.unwrap(),
)
}
fn create_test_schema() -> Result<SchemaRef> {
let column_a = Field::new("a", DataType::Int32, false);
let column_b = Field::new("b", DataType::Int32, false);
let column_c = Field::new("c", DataType::Int32, false);
let column_d = Field::new("d", DataType::Int32, false);
let schema = Arc::new(Schema::new(vec![column_a, column_b, column_c, column_d]));
Ok(schema)
}
fn csv_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
infinite_source: bool,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
let projection: Vec<usize> = vec![0, 2, 3];
Arc::new(CsvExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new(
"file_path".to_string(),
100,
)]],
statistics: Statistics::new_unknown(schema),
projection: Some(projection),
limit: None,
table_partition_cols: vec![],
output_ordering: vec![sort_exprs],
infinite_source,
},
true,
0,
b'"',
None,
FileCompressionType::UNCOMPRESSED,
))
}
fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}
}