use crate::PhysicalOptimizerRule;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::buffer::BufferExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use std::sync::Arc;
#[derive(Debug, Clone, Default)]
pub struct PushdownSort;
impl PushdownSort {
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for PushdownSort {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if !config.optimizer.enable_sort_pushdown {
return Ok(plan);
}
let buffer_capacity = config.execution.sort_pushdown_buffer_capacity;
plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
if let Some(spm) = plan.downcast_ref::<SortPreservingMergeExec>()
&& let Some(sort_child) = spm.input().downcast_ref::<SortExec>()
&& sort_child.preserve_partitioning()
{
let sort_input = Arc::clone(sort_child.input());
let required_ordering = sort_child.expr();
match sort_input.try_pushdown_sort(required_ordering)? {
SortOrderPushdownResult::Exact { inner } => {
let inner = if let Some(fetch) = sort_child.fetch() {
inner.with_fetch(Some(fetch)).unwrap_or_else(|| {
Arc::new(LocalLimitExec::new(inner, fetch))
})
} else {
inner
};
let buffered: Arc<dyn ExecutionPlan> =
Arc::new(BufferExec::new(inner, buffer_capacity));
let new_spm =
SortPreservingMergeExec::new(spm.expr().clone(), buffered)
.with_fetch(spm.fetch());
return Ok(Transformed::yes(Arc::new(new_spm)));
}
SortOrderPushdownResult::Inexact { inner } => {
let new_sort = SortExec::new(required_ordering.clone(), inner)
.with_fetch(sort_child.fetch())
.with_preserve_partitioning(true);
let new_spm = SortPreservingMergeExec::new(
spm.expr().clone(),
Arc::new(new_sort),
)
.with_fetch(spm.fetch());
return Ok(Transformed::yes(Arc::new(new_spm)));
}
SortOrderPushdownResult::Unsupported => {
return Ok(Transformed::no(plan));
}
}
}
let Some(sort_exec) = plan.downcast_ref::<SortExec>() else {
return Ok(Transformed::no(plan));
};
let sort_input = Arc::clone(sort_exec.input());
let required_ordering = sort_exec.expr();
match sort_input.try_pushdown_sort(required_ordering)? {
SortOrderPushdownResult::Exact { inner } => {
if let Some(fetch) = sort_exec.fetch() {
let inner = inner.with_fetch(Some(fetch)).unwrap_or_else(|| {
Arc::new(GlobalLimitExec::new(inner, 0, Some(fetch)))
});
Ok(Transformed::yes(inner))
} else {
Ok(Transformed::yes(inner))
}
}
SortOrderPushdownResult::Inexact { inner } => {
Ok(Transformed::yes(Arc::new(
SortExec::new(required_ordering.clone(), inner)
.with_fetch(sort_exec.fetch())
.with_preserve_partitioning(
sort_exec.preserve_partitioning(),
),
)))
}
SortOrderPushdownResult::Unsupported => {
Ok(Transformed::no(plan))
}
}
})
.data()
}
fn name(&self) -> &str {
"PushdownSort"
}
fn schema_check(&self) -> bool {
true
}
}