use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::add_sort_above_child;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::izip;
use std::iter::zip;
use std::sync::Arc;
#[derive(Default)]
pub struct EnforceSorting {}
impl EnforceSorting {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
#[derive(Debug, Clone)]
struct PlanWithCorrespondingSort {
plan: Arc<dyn ExecutionPlan>,
sort_onwards: Vec<Vec<(usize, Arc<dyn ExecutionPlan>)>>,
}
impl PlanWithCorrespondingSort {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
PlanWithCorrespondingSort {
plan,
sort_onwards: vec![vec![]; length],
}
}
pub fn children(&self) -> Vec<PlanWithCorrespondingSort> {
self.plan
.children()
.into_iter()
.map(|child| PlanWithCorrespondingSort::new(child))
.collect()
}
}
impl TreeNodeRewritable for PlanWithCorrespondingSort {
fn map_children<F>(self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.children();
if children.is_empty() {
Ok(self)
} else {
let children_requirements = children
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
let children_plans = children_requirements
.iter()
.map(|elem| elem.plan.clone())
.collect::<Vec<_>>();
let sort_onwards = children_requirements
.iter()
.map(|item| {
let onwards = &item.sort_onwards;
if !onwards.is_empty() {
let flags = item.plan.maintains_input_order();
for (maintains, element) in flags.into_iter().zip(onwards.iter())
{
if (maintains || is_sort(&item.plan)) && !element.is_empty() {
return element.clone();
}
}
}
vec![]
})
.collect::<Vec<_>>();
let plan = with_new_children_if_necessary(self.plan, children_plans)?;
Ok(PlanWithCorrespondingSort { plan, sort_onwards })
}
}
}
impl PhysicalOptimizerRule for EnforceSorting {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_requirements = PlanWithCorrespondingSort::new(plan);
let adjusted = plan_requirements.transform_up(&ensure_sorting)?;
Ok(adjusted.plan)
}
fn name(&self) -> &str {
"EnforceSorting"
}
fn schema_check(&self) -> bool {
true
}
}
fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<SortExec>()
}
fn ensure_sorting(
requirements: PlanWithCorrespondingSort,
) -> Result<Option<PlanWithCorrespondingSort>> {
if let Some(result) = analyze_immediate_sort_removal(&requirements)? {
return Ok(Some(result));
}
let plan = &requirements.plan;
let mut new_children = plan.children().clone();
let mut new_onwards = requirements.sort_onwards.clone();
for (idx, (child, sort_onwards, required_ordering)) in izip!(
new_children.iter_mut(),
new_onwards.iter_mut(),
plan.required_input_ordering()
)
.enumerate()
{
let physical_ordering = child.output_ordering();
match (required_ordering, physical_ordering) {
(Some(required_ordering), Some(physical_ordering)) => {
let is_ordering_satisfied = ordering_satisfy_concrete(
physical_ordering,
required_ordering,
|| child.equivalence_properties(),
);
if !is_ordering_satisfied {
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
let sort_expr = required_ordering.to_vec();
*child = add_sort_above_child(child, sort_expr)?;
sort_onwards.push((idx, child.clone()))
}
if let [first, ..] = sort_onwards.as_slice() {
let sort_any = first.1.clone();
let sort_exec = convert_to_sort_exec(&sort_any)?;
let sort_output_ordering = sort_exec.output_ordering();
let sort_input_ordering = sort_exec.input().output_ordering();
if ordering_satisfy(sort_input_ordering, sort_output_ordering, || {
sort_exec.input().equivalence_properties()
}) {
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
}
else if let Some(exec) =
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
{
if let Some(result) = analyze_window_sort_removal(
exec.window_expr(),
&exec.partition_keys,
sort_exec,
sort_onwards,
)? {
return Ok(Some(result));
}
} else if let Some(exec) = requirements
.plan
.as_any()
.downcast_ref::<BoundedWindowAggExec>()
{
if let Some(result) = analyze_window_sort_removal(
exec.window_expr(),
&exec.partition_keys,
sort_exec,
sort_onwards,
)? {
return Ok(Some(result));
}
}
}
}
(Some(required), None) => {
let sort_expr = required.to_vec();
*child = add_sort_above_child(child, sort_expr)?;
*sort_onwards = vec![(idx, child.clone())];
}
(None, Some(_)) => {
if !requirements.plan.maintains_input_order()[idx] {
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
}
}
(None, None) => {}
}
}
if plan.children().is_empty() {
Ok(Some(requirements))
} else {
let new_plan = requirements.plan.with_new_children(new_children)?;
for (idx, (trace, required_ordering)) in new_onwards
.iter_mut()
.zip(new_plan.required_input_ordering())
.enumerate()
.take(new_plan.children().len())
{
if new_plan.maintains_input_order()[idx]
&& required_ordering.is_none()
&& !trace.is_empty()
{
trace.push((idx, new_plan.clone()));
} else {
trace.clear();
if is_sort(&new_plan) {
trace.push((idx, new_plan.clone()));
}
}
}
Ok(Some(PlanWithCorrespondingSort {
plan: new_plan,
sort_onwards: new_onwards,
}))
}
}
fn analyze_immediate_sort_removal(
requirements: &PlanWithCorrespondingSort,
) -> Result<Option<PlanWithCorrespondingSort>> {
if let Some(sort_exec) = requirements.plan.as_any().downcast_ref::<SortExec>() {
if ordering_satisfy(
sort_exec.input().output_ordering(),
sort_exec.output_ordering(),
|| sort_exec.input().equivalence_properties(),
) {
let mut new_onwards = requirements.sort_onwards[0].to_vec();
if !new_onwards.is_empty() {
new_onwards.pop();
}
return Ok(Some(PlanWithCorrespondingSort {
plan: sort_exec.input().clone(),
sort_onwards: vec![new_onwards],
}));
}
}
Ok(None)
}
fn analyze_window_sort_removal(
window_expr: &[Arc<dyn WindowExpr>],
partition_keys: &[Arc<dyn PhysicalExpr>],
sort_exec: &SortExec,
sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Option<PlanWithCorrespondingSort>> {
let required_ordering = sort_exec.output_ordering().ok_or_else(|| {
DataFusionError::Plan("A SortExec should have output ordering".to_string())
})?;
let physical_ordering = sort_exec.input().output_ordering();
let physical_ordering = if let Some(physical_ordering) = physical_ordering {
physical_ordering
} else {
return Ok(None);
};
let (can_skip_sorting, should_reverse) = can_skip_sort(
window_expr[0].partition_by(),
required_ordering,
&sort_exec.input().schema(),
physical_ordering,
)?;
if can_skip_sorting {
let new_window_expr = if should_reverse {
window_expr
.iter()
.map(|e| e.get_reverse_expr())
.collect::<Option<Vec<_>>>()
} else {
Some(window_expr.to_vec())
};
if let Some(window_expr) = new_window_expr {
let new_child = remove_corresponding_sort_from_sub_plan(sort_onward)?;
let new_schema = new_child.schema();
let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
let new_plan = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering.to_vec()),
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering.to_vec()),
)?) as _
};
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
}
}
Ok(None)
}
fn update_child_to_remove_unnecessary_sort(
child: &mut Arc<dyn ExecutionPlan>,
sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<()> {
if !sort_onwards.is_empty() {
*child = remove_corresponding_sort_from_sub_plan(sort_onwards)?;
}
Ok(())
}
fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&SortExec> {
sort_any.as_any().downcast_ref::<SortExec>().ok_or_else(|| {
DataFusionError::Plan("Given ExecutionPlan is not a SortExec".to_string())
})
}
fn remove_corresponding_sort_from_sub_plan(
sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (_, sort_any) = sort_onwards[0].clone();
let sort_exec = convert_to_sort_exec(&sort_any)?;
let mut prev_layer = sort_exec.input().clone();
for (child_idx, layer) in sort_onwards.iter().skip(1) {
let mut children = layer.children();
children[*child_idx] = prev_layer;
prev_layer = layer.clone().with_new_children(children)?;
}
sort_onwards.clear();
Ok(prev_layer)
}
#[derive(Debug)]
pub struct ColumnInfo {
is_aligned: bool,
reverse: bool,
is_partition: bool,
}
pub fn can_skip_sort(
partition_keys: &[Arc<dyn PhysicalExpr>],
required: &[PhysicalSortExpr],
input_schema: &SchemaRef,
physical_ordering: &[PhysicalSortExpr],
) -> Result<(bool, bool)> {
if required.len() > physical_ordering.len() {
return Ok((false, false));
}
let mut col_infos = vec![];
for (sort_expr, physical_expr) in zip(required, physical_ordering) {
let column = sort_expr.expr.clone();
let is_partition = partition_keys.iter().any(|e| e.eq(&column));
let (is_aligned, reverse) =
check_alignment(input_schema, physical_expr, sort_expr);
col_infos.push(ColumnInfo {
is_aligned,
reverse,
is_partition,
});
}
let partition_by_sections = col_infos
.iter()
.filter(|elem| elem.is_partition)
.collect::<Vec<_>>();
let can_skip_partition_bys = if partition_by_sections.is_empty() {
true
} else {
let first_reverse = partition_by_sections[0].reverse;
let can_skip_partition_bys = partition_by_sections
.iter()
.all(|c| c.is_aligned && c.reverse == first_reverse);
can_skip_partition_bys
};
let order_by_sections = col_infos
.iter()
.filter(|elem| !elem.is_partition)
.collect::<Vec<_>>();
let (can_skip_order_bys, should_reverse_order_bys) = if order_by_sections.is_empty() {
(true, false)
} else {
let first_reverse = order_by_sections[0].reverse;
let can_skip_order_bys = order_by_sections
.iter()
.all(|c| c.is_aligned && c.reverse == first_reverse);
(can_skip_order_bys, first_reverse)
};
let can_skip = can_skip_order_bys && can_skip_partition_bys;
Ok((can_skip, should_reverse_order_bys))
}
fn check_alignment(
input_schema: &SchemaRef,
physical_ordering: &PhysicalSortExpr,
required: &PhysicalSortExpr,
) -> (bool, bool) {
if required.expr.eq(&physical_ordering.expr) {
let nullable = required.expr.nullable(input_schema).unwrap();
let physical_opts = physical_ordering.options;
let required_opts = required.options;
let is_reversed = if nullable {
physical_opts == reverse_sort_options(required_opts)
} else {
physical_opts.descending != required_opts.descending
};
let can_skip = !nullable || is_reversed || (physical_opts == required_opts);
(can_skip, is_reversed)
} else {
(false, false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::displayable;
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::create_window_expr;
use crate::prelude::SessionContext;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{Result, Statistics};
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::{col, NotExpr};
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;
fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
let schema = Arc::new(Schema::new(vec![nullable_column, non_nullable_column]));
Ok(schema)
}
#[tokio::test]
async fn test_is_column_aligned_nullable() -> Result<()> {
let schema = create_test_schema()?;
let params = vec![
((true, true), (false, false), (true, true)),
((true, true), (false, true), (false, false)),
((true, true), (true, false), (false, false)),
((true, false), (false, true), (true, true)),
((true, false), (false, false), (false, false)),
((true, false), (true, true), (false, false)),
];
for (
(physical_desc, physical_nulls_first),
(req_desc, req_nulls_first),
(is_aligned_expected, reverse_expected),
) in params
{
let physical_ordering = PhysicalSortExpr {
expr: col("nullable_col", &schema)?,
options: SortOptions {
descending: physical_desc,
nulls_first: physical_nulls_first,
},
};
let required_ordering = PhysicalSortExpr {
expr: col("nullable_col", &schema)?,
options: SortOptions {
descending: req_desc,
nulls_first: req_nulls_first,
},
};
let (is_aligned, reverse) =
check_alignment(&schema, &physical_ordering, &required_ordering);
assert_eq!(is_aligned, is_aligned_expected);
assert_eq!(reverse, reverse_expected);
}
Ok(())
}
#[tokio::test]
async fn test_is_column_aligned_non_nullable() -> Result<()> {
let schema = create_test_schema()?;
let params = vec![
((true, true), (false, false), (true, true)),
((true, true), (false, true), (true, true)),
((true, true), (true, false), (true, false)),
((true, false), (false, true), (true, true)),
((true, false), (false, false), (true, true)),
((true, false), (true, true), (true, false)),
];
for (
(physical_desc, physical_nulls_first),
(req_desc, req_nulls_first),
(is_aligned_expected, reverse_expected),
) in params
{
let physical_ordering = PhysicalSortExpr {
expr: col("non_nullable_col", &schema)?,
options: SortOptions {
descending: physical_desc,
nulls_first: physical_nulls_first,
},
};
let required_ordering = PhysicalSortExpr {
expr: col("non_nullable_col", &schema)?,
options: SortOptions {
descending: req_desc,
nulls_first: req_nulls_first,
},
};
let (is_aligned, reverse) =
check_alignment(&schema, &physical_ordering, &required_ordering);
assert_eq!(is_aligned, is_aligned_expected);
assert_eq!(reverse, reverse_expected);
}
Ok(())
}
macro_rules! assert_optimized {
($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let physical_plan = $PLAN;
let formatted = displayable(physical_plan.as_ref()).indent().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
.iter().map(|s| *s).collect();
assert_eq!(
expected_plan_lines, actual,
"\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES
.iter().map(|s| *s).collect();
let optimized_physical_plan =
EnforceSorting::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected_optimized_lines, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
};
}
#[tokio::test]
async fn test_remove_unnecessary_sort() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source);
let physical_plan = sort_exec(vec![sort_expr("nullable_col", &schema)], input);
let expected_input = vec![
"SortExec: [nullable_col@0 ASC]",
" SortExec: [non_nullable_col@1 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"SortExec: [nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let sort_exprs = vec![sort_expr_options(
"non_nullable_col",
&source.schema(),
SortOptions {
descending: true,
nulls_first: true,
},
)];
let sort = sort_exec(sort_exprs.clone(), source);
let window_agg = window_exec("non_nullable_col", sort_exprs, sort);
let sort_exprs = vec![sort_expr_options(
"non_nullable_col",
&window_agg.schema(),
SortOptions {
descending: false,
nulls_first: false,
},
)];
let sort = sort_exec(sort_exprs.clone(), window_agg);
let filter = filter_exec(
Arc::new(NotExpr::new(
col("non_nullable_col", schema.as_ref()).unwrap(),
)),
sort,
);
let physical_plan = window_exec("non_nullable_col", sort_exprs, filter);
let expected_input = vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
" FilterExec: NOT non_nullable_col@1",
" SortExec: [non_nullable_col@1 ASC NULLS LAST]",
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
" SortExec: [non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: Following(NULL) }]",
" FilterExec: NOT non_nullable_col@1",
" WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }]",
" SortExec: [non_nullable_col@1 DESC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_add_required_sort() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
let physical_plan = sort_preserving_merge_exec(sort_exprs, source);
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_remove_unnecessary_sort1() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
let sort = sort_exec(sort_exprs.clone(), source);
let spm = sort_preserving_merge_exec(sort_exprs, sort);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
let sort = sort_exec(sort_exprs.clone(), spm);
let physical_plan = sort_preserving_merge_exec(sort_exprs, sort);
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_change_wrong_sorting() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let sort_exprs = vec![
sort_expr("nullable_col", &schema),
sort_expr("non_nullable_col", &schema),
];
let sort = sort_exec(vec![sort_exprs[0].clone()], source);
let physical_plan = sort_preserving_merge_exec(sort_exprs, sort);
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: [nullable_col@0 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_union_inputs_sorted() -> Result<()> {
let schema = create_test_schema()?;
let source1 = parquet_exec(&schema);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
let sort = sort_exec(sort_exprs.clone(), source1);
let source2 = parquet_exec_sorted(&schema, sort_exprs.clone());
let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = expected_input.clone();
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_union_inputs_different_sorted() -> Result<()> {
let schema = create_test_schema()?;
let source1 = parquet_exec(&schema);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
let sort = sort_exec(sort_exprs.clone(), source1);
let parquet_sort_exprs = vec![
sort_expr("nullable_col", &schema),
sort_expr("non_nullable_col", &schema),
];
let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = expected_input.clone();
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_union_inputs_different_sorted2() -> Result<()> {
let schema = create_test_schema()?;
let source1 = parquet_exec(&schema);
let sort_exprs = vec![
sort_expr("nullable_col", &schema),
sort_expr("non_nullable_col", &schema),
];
let sort = sort_exec(sort_exprs.clone(), source1);
let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);
let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
}
fn sort_expr_options(
name: &str,
schema: &Schema,
options: SortOptions,
) -> PhysicalSortExpr {
PhysicalSortExpr {
expr: col(name, schema).unwrap(),
options,
}
}
fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
Arc::new(MemoryExec::try_new(&[], schema.clone(), None).unwrap())
}
fn sort_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortExec::try_new(sort_exprs, input, None).unwrap())
}
fn sort_preserving_merge_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}
fn filter_exec(
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}
fn window_exec(
col_name: &str,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs: Vec<_> = sort_exprs.into_iter().collect();
let schema = input.schema();
Arc::new(
WindowAggExec::try_new(
vec![create_window_expr(
&WindowFunction::AggregateFunction(AggregateFunction::Count),
"count".to_owned(),
&[col(col_name, &schema).unwrap()],
&[],
&sort_exprs,
Arc::new(WindowFrame::new(true)),
schema.as_ref(),
)
.unwrap()],
input.clone(),
input.schema(),
vec![],
Some(sort_exprs),
)
.unwrap(),
)
}
fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
None,
None,
))
}
fn parquet_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<ParquetExec> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: Some(sort_exprs),
infinite_source: false,
},
None,
None,
))
}
fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
Arc::new(UnionExec::new(input))
}
}