Skip to main content

datafusion_physical_optimizer/
pushdown_sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort Pushdown Optimization
19//!
20//! This optimizer attempts to push sort requirements down through the execution plan
21//! tree to data sources that can natively handle them (e.g., by scanning files in
22//! reverse order).
23//!
24//! ## How it works
25//!
26//! 1. Detects `SortExec` nodes in the plan
27//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort requirement
28//! 3. Each node type defines its own pushdown behavior:
29//!    - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.) delegate to
30//!      their children and wrap the result
31//!    - **Data sources** (DataSourceExec) check if they can optimize for the ordering
32//!    - **Blocking nodes** return `Unsupported` to stop pushdown
33//! 4. Based on the result:
34//!    - `Exact`: Remove the Sort operator (data source guarantees perfect ordering)
35//!    - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK)
36//!    - `Unsupported`: No change
37//!
38//! ## Capabilities
39//!
40//! - **Sort elimination**: when a data source's natural ordering satisfies the
41//!   request, return `Exact` and remove the `SortExec` entirely. Preserves
42//!   `fetch` (LIMIT) from the eliminated `SortExec` for early termination.
43//! - **Statistics-based file sorting**: sort files within each partition by
44//!   min/max statistics. When files are non-overlapping but listed in wrong
45//!   order (e.g., alphabetical order ≠ sort key order), this fixes the ordering
46//!   and enables sort elimination. Works for both single-partition and
47//!   multi-partition plans with multi-file groups.
48//! - **Reverse scan optimization**: when required sort is the reverse of the data source's
49//!   natural ordering, enable reverse scanning (reading row groups in reverse order)
50//! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs
51//!   [A DESC], the existing ordering satisfies the requirement (`Exact`).
52//!   If the query needs [A ASC] (reverse of the prefix), a reverse scan is
53//!   used (`Inexact`, `SortExec` retained)
54//!
55//! Related issue: <https://github.com/apache/datafusion/issues/17348>
56
57use crate::PhysicalOptimizerRule;
58use datafusion_common::Result;
59use datafusion_common::config::ConfigOptions;
60use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
61use datafusion_physical_plan::ExecutionPlan;
62use datafusion_physical_plan::SortOrderPushdownResult;
63use datafusion_physical_plan::buffer::BufferExec;
64use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
65use datafusion_physical_plan::sorts::sort::SortExec;
66use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
67use std::sync::Arc;
68
69/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
70///
71/// See module-level documentation for details.
72#[derive(Debug, Clone, Default)]
73pub struct PushdownSort;
74
75impl PushdownSort {
76    pub fn new() -> Self {
77        Self {}
78    }
79}
80
81impl PhysicalOptimizerRule for PushdownSort {
82    fn optimize(
83        &self,
84        plan: Arc<dyn ExecutionPlan>,
85        config: &ConfigOptions,
86    ) -> Result<Arc<dyn ExecutionPlan>> {
87        // Check if sort pushdown optimization is enabled
88        if !config.optimizer.enable_sort_pushdown {
89            return Ok(plan);
90        }
91
92        let buffer_capacity = config.execution.sort_pushdown_buffer_capacity;
93
94        // Use transform_down to find and optimize all SortExec nodes (including nested ones)
95        // Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated
96        plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
97            // Pattern 1: SPM → SortExec(preserve_partitioning)
98            // When we eliminate the SortExec, SPM loses its memory buffer and reads
99            // directly from I/O-bound sources. Insert a BufferExec to compensate.
100            if let Some(spm) = plan.downcast_ref::<SortPreservingMergeExec>()
101                && let Some(sort_child) = spm.input().downcast_ref::<SortExec>()
102                && sort_child.preserve_partitioning()
103            {
104                let sort_input = Arc::clone(sort_child.input());
105                let required_ordering = sort_child.expr();
106                match sort_input.try_pushdown_sort(required_ordering)? {
107                    SortOrderPushdownResult::Exact { inner } => {
108                        // Preserve fetch (LIMIT) from the eliminated SortExec.
109                        // Use LocalLimitExec (not Global) since input is multi-partition.
110                        let inner = if let Some(fetch) = sort_child.fetch() {
111                            inner.with_fetch(Some(fetch)).unwrap_or_else(|| {
112                                Arc::new(LocalLimitExec::new(inner, fetch))
113                            })
114                        } else {
115                            inner
116                        };
117                        // Insert BufferExec to replace SortExec's buffering role.
118                        // SortExec buffered all data in memory; BufferExec provides
119                        // bounded buffering so SPM doesn't stall on I/O.
120                        let buffered: Arc<dyn ExecutionPlan> =
121                            Arc::new(BufferExec::new(inner, buffer_capacity));
122                        let new_spm =
123                            SortPreservingMergeExec::new(spm.expr().clone(), buffered)
124                                .with_fetch(spm.fetch());
125                        return Ok(Transformed::yes(Arc::new(new_spm)));
126                    }
127                    SortOrderPushdownResult::Inexact { inner } => {
128                        let new_sort = SortExec::new(required_ordering.clone(), inner)
129                            .with_fetch(sort_child.fetch())
130                            .with_preserve_partitioning(true);
131                        let new_spm = SortPreservingMergeExec::new(
132                            spm.expr().clone(),
133                            Arc::new(new_sort),
134                        )
135                        .with_fetch(spm.fetch());
136                        return Ok(Transformed::yes(Arc::new(new_spm)));
137                    }
138                    SortOrderPushdownResult::Unsupported => {
139                        return Ok(Transformed::no(plan));
140                    }
141                }
142            }
143
144            // Pattern 2: Standalone SortExec (no SPM parent)
145            let Some(sort_exec) = plan.downcast_ref::<SortExec>() else {
146                return Ok(Transformed::no(plan));
147            };
148
149            let sort_input = Arc::clone(sort_exec.input());
150            let required_ordering = sort_exec.expr();
151
152            // Try to push the sort requirement down through the plan tree
153            // Each node type defines its own pushdown behavior via try_pushdown_sort()
154            match sort_input.try_pushdown_sort(required_ordering)? {
155                SortOrderPushdownResult::Exact { inner } => {
156                    // Data source guarantees perfect ordering - remove the Sort operator.
157                    //
158                    // If the SortExec carried a fetch (LIMIT), we must preserve it.
159                    // First try pushing the limit into the source via `with_fetch()`.
160                    // If the source doesn't support `with_fetch`, fall back to
161                    // wrapping with GlobalLimitExec.
162                    if let Some(fetch) = sort_exec.fetch() {
163                        let inner = inner.with_fetch(Some(fetch)).unwrap_or_else(|| {
164                            Arc::new(GlobalLimitExec::new(inner, 0, Some(fetch)))
165                        });
166                        Ok(Transformed::yes(inner))
167                    } else {
168                        Ok(Transformed::yes(inner))
169                    }
170                }
171                SortOrderPushdownResult::Inexact { inner } => {
172                    // Data source is optimized for the ordering but not perfectly sorted
173                    // Keep the Sort operator but use the optimized input
174                    // Benefits: TopK queries can terminate early, better cache locality
175                    Ok(Transformed::yes(Arc::new(
176                        SortExec::new(required_ordering.clone(), inner)
177                            .with_fetch(sort_exec.fetch())
178                            .with_preserve_partitioning(
179                                sort_exec.preserve_partitioning(),
180                            ),
181                    )))
182                }
183                SortOrderPushdownResult::Unsupported => {
184                    // Cannot optimize for this ordering - no change
185                    Ok(Transformed::no(plan))
186                }
187            }
188        })
189        .data()
190    }
191
192    fn name(&self) -> &str {
193        "PushdownSort"
194    }
195
196    fn schema_check(&self) -> bool {
197        true
198    }
199}