datafusion_physical_optimizer/pushdown_sort.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort Pushdown Optimization
19//!
20//! This optimizer attempts to push sort requirements down through the execution plan
21//! tree to data sources that can natively handle them (e.g., by scanning files in
22//! reverse order).
23//!
24//! ## How it works
25//!
26//! 1. Detects `SortExec` nodes in the plan
27//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort requirement
28//! 3. Each node type defines its own pushdown behavior:
29//! - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.) delegate to
30//! their children and wrap the result
31//! - **Data sources** (DataSourceExec) check if they can optimize for the ordering
32//! - **Blocking nodes** return `Unsupported` to stop pushdown
33//! 4. Based on the result:
34//! - `Exact`: Remove the Sort operator (data source guarantees perfect ordering)
35//! - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK)
36//! - `Unsupported`: No change
37//!
38//! ## Capabilities
39//!
40//! - **Sort elimination**: when a data source's natural ordering satisfies the
41//! request, return `Exact` and remove the `SortExec` entirely. Preserves
42//! `fetch` (LIMIT) from the eliminated `SortExec` for early termination.
43//! - **Statistics-based file sorting**: sort files within each partition by
44//! min/max statistics. When files are non-overlapping but listed in wrong
45//! order (e.g., alphabetical order ≠ sort key order), this fixes the ordering
46//! and enables sort elimination. Works for both single-partition and
47//! multi-partition plans with multi-file groups.
48//! - **Reverse scan optimization**: when required sort is the reverse of the data source's
49//! natural ordering, enable reverse scanning (reading row groups in reverse order)
50//! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs
51//! [A DESC], the existing ordering satisfies the requirement (`Exact`).
52//! If the query needs [A ASC] (reverse of the prefix), a reverse scan is
53//! used (`Inexact`, `SortExec` retained)
54//!
55//! Related issue: <https://github.com/apache/datafusion/issues/17348>
56
57use crate::PhysicalOptimizerRule;
58use datafusion_common::Result;
59use datafusion_common::config::ConfigOptions;
60use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
61use datafusion_physical_plan::ExecutionPlan;
62use datafusion_physical_plan::SortOrderPushdownResult;
63use datafusion_physical_plan::buffer::BufferExec;
64use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
65use datafusion_physical_plan::sorts::sort::SortExec;
66use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
67use std::sync::Arc;
68
69/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
70///
71/// See module-level documentation for details.
72#[derive(Debug, Clone, Default)]
73pub struct PushdownSort;
74
75impl PushdownSort {
76 pub fn new() -> Self {
77 Self {}
78 }
79}
80
81impl PhysicalOptimizerRule for PushdownSort {
82 fn optimize(
83 &self,
84 plan: Arc<dyn ExecutionPlan>,
85 config: &ConfigOptions,
86 ) -> Result<Arc<dyn ExecutionPlan>> {
87 // Check if sort pushdown optimization is enabled
88 if !config.optimizer.enable_sort_pushdown {
89 return Ok(plan);
90 }
91
92 let buffer_capacity = config.execution.sort_pushdown_buffer_capacity;
93
94 // Use transform_down to find and optimize all SortExec nodes (including nested ones)
95 // Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated
96 plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
97 // Pattern 1: SPM → SortExec(preserve_partitioning)
98 // When we eliminate the SortExec, SPM loses its memory buffer and reads
99 // directly from I/O-bound sources. Insert a BufferExec to compensate.
100 if let Some(spm) = plan.downcast_ref::<SortPreservingMergeExec>()
101 && let Some(sort_child) = spm.input().downcast_ref::<SortExec>()
102 && sort_child.preserve_partitioning()
103 {
104 let sort_input = Arc::clone(sort_child.input());
105 let required_ordering = sort_child.expr();
106 match sort_input.try_pushdown_sort(required_ordering)? {
107 SortOrderPushdownResult::Exact { inner } => {
108 // Preserve fetch (LIMIT) from the eliminated SortExec.
109 // Use LocalLimitExec (not Global) since input is multi-partition.
110 let inner = if let Some(fetch) = sort_child.fetch() {
111 inner.with_fetch(Some(fetch)).unwrap_or_else(|| {
112 Arc::new(LocalLimitExec::new(inner, fetch))
113 })
114 } else {
115 inner
116 };
117 // Insert BufferExec to replace SortExec's buffering role.
118 // SortExec buffered all data in memory; BufferExec provides
119 // bounded buffering so SPM doesn't stall on I/O.
120 let buffered: Arc<dyn ExecutionPlan> =
121 Arc::new(BufferExec::new(inner, buffer_capacity));
122 let new_spm =
123 SortPreservingMergeExec::new(spm.expr().clone(), buffered)
124 .with_fetch(spm.fetch());
125 return Ok(Transformed::yes(Arc::new(new_spm)));
126 }
127 SortOrderPushdownResult::Inexact { inner } => {
128 let new_sort = SortExec::new(required_ordering.clone(), inner)
129 .with_fetch(sort_child.fetch())
130 .with_preserve_partitioning(true);
131 let new_spm = SortPreservingMergeExec::new(
132 spm.expr().clone(),
133 Arc::new(new_sort),
134 )
135 .with_fetch(spm.fetch());
136 return Ok(Transformed::yes(Arc::new(new_spm)));
137 }
138 SortOrderPushdownResult::Unsupported => {
139 return Ok(Transformed::no(plan));
140 }
141 }
142 }
143
144 // Pattern 2: Standalone SortExec (no SPM parent)
145 let Some(sort_exec) = plan.downcast_ref::<SortExec>() else {
146 return Ok(Transformed::no(plan));
147 };
148
149 let sort_input = Arc::clone(sort_exec.input());
150 let required_ordering = sort_exec.expr();
151
152 // Try to push the sort requirement down through the plan tree
153 // Each node type defines its own pushdown behavior via try_pushdown_sort()
154 match sort_input.try_pushdown_sort(required_ordering)? {
155 SortOrderPushdownResult::Exact { inner } => {
156 // Data source guarantees perfect ordering - remove the Sort operator.
157 //
158 // If the SortExec carried a fetch (LIMIT), we must preserve it.
159 // First try pushing the limit into the source via `with_fetch()`.
160 // If the source doesn't support `with_fetch`, fall back to
161 // wrapping with GlobalLimitExec.
162 if let Some(fetch) = sort_exec.fetch() {
163 let inner = inner.with_fetch(Some(fetch)).unwrap_or_else(|| {
164 Arc::new(GlobalLimitExec::new(inner, 0, Some(fetch)))
165 });
166 Ok(Transformed::yes(inner))
167 } else {
168 Ok(Transformed::yes(inner))
169 }
170 }
171 SortOrderPushdownResult::Inexact { inner } => {
172 // Data source is optimized for the ordering but not perfectly sorted
173 // Keep the Sort operator but use the optimized input
174 // Benefits: TopK queries can terminate early, better cache locality
175 Ok(Transformed::yes(Arc::new(
176 SortExec::new(required_ordering.clone(), inner)
177 .with_fetch(sort_exec.fetch())
178 .with_preserve_partitioning(
179 sort_exec.preserve_partitioning(),
180 ),
181 )))
182 }
183 SortOrderPushdownResult::Unsupported => {
184 // Cannot optimize for this ordering - no change
185 Ok(Transformed::no(plan))
186 }
187 }
188 })
189 .data()
190 }
191
192 fn name(&self) -> &str {
193 "PushdownSort"
194 }
195
196 fn schema_check(&self) -> bool {
197 true
198 }
199}