datafusion_physical_optimizer/
pushdown_sort.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort Pushdown Optimization
19//!
20//! This optimizer attempts to push sort requirements down through the execution plan
21//! tree to data sources that can natively handle them (e.g., by scanning files in
22//! reverse order).
23//!
24//! ## How it works
25//!
26//! 1. Detects `SortExec` nodes in the plan
27//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort requirement
28//! 3. Each node type defines its own pushdown behavior:
29//!    - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.) delegate to
30//!      their children and wrap the result
31//!    - **Data sources** (DataSourceExec) check if they can optimize for the ordering
32//!    - **Blocking nodes** return `Unsupported` to stop pushdown
33//! 4. Based on the result:
34//!    - `Exact`: Remove the Sort operator (data source guarantees perfect ordering)
35//!    - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK)
36//!    - `Unsupported`: No change
37//!
38//! ## Current capabilities (Phase 1)
39//!
40//! - Reverse scan optimization: when required sort is the reverse of the data source's
41//!   natural ordering, enable reverse scanning (reading row groups in reverse order)
42//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query needs
43//!   [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement
44//!
45//! TODO Issue: <https://github.com/apache/datafusion/issues/19329>
46//! ## Future enhancements (Phase 2),
47//!
48//! - File reordering based on statistics
49//! - Return `Exact` when files are known to be perfectly sorted
50//! - Complete Sort elimination when ordering is guaranteed
51
52use crate::PhysicalOptimizerRule;
53use datafusion_common::Result;
54use datafusion_common::config::ConfigOptions;
55use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
56use datafusion_physical_plan::ExecutionPlan;
57use datafusion_physical_plan::SortOrderPushdownResult;
58use datafusion_physical_plan::sorts::sort::SortExec;
59use std::sync::Arc;
60
61/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
62///
63/// See module-level documentation for details.
64#[derive(Debug, Clone, Default)]
65pub struct PushdownSort;
66
67impl PushdownSort {
68    pub fn new() -> Self {
69        Self {}
70    }
71}
72
73impl PhysicalOptimizerRule for PushdownSort {
74    fn optimize(
75        &self,
76        plan: Arc<dyn ExecutionPlan>,
77        config: &ConfigOptions,
78    ) -> Result<Arc<dyn ExecutionPlan>> {
79        // Check if sort pushdown optimization is enabled
80        if !config.optimizer.enable_sort_pushdown {
81            return Ok(plan);
82        }
83
84        // Use transform_down to find and optimize all SortExec nodes (including nested ones)
85        plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
86            // Check if this is a SortExec
87            let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() else {
88                return Ok(Transformed::no(plan));
89            };
90
91            let sort_input = Arc::clone(sort_exec.input());
92            let required_ordering = sort_exec.expr();
93
94            // Try to push the sort requirement down through the plan tree
95            // Each node type defines its own pushdown behavior via try_pushdown_sort()
96            match sort_input.try_pushdown_sort(required_ordering)? {
97                SortOrderPushdownResult::Exact { inner } => {
98                    // Data source guarantees perfect ordering - remove the Sort operator
99                    Ok(Transformed::yes(inner))
100                }
101                SortOrderPushdownResult::Inexact { inner } => {
102                    // Data source is optimized for the ordering but not perfectly sorted
103                    // Keep the Sort operator but use the optimized input
104                    // Benefits: TopK queries can terminate early, better cache locality
105                    Ok(Transformed::yes(Arc::new(
106                        SortExec::new(required_ordering.clone(), inner)
107                            .with_fetch(sort_exec.fetch())
108                            .with_preserve_partitioning(
109                                sort_exec.preserve_partitioning(),
110                            ),
111                    )))
112                }
113                SortOrderPushdownResult::Unsupported => {
114                    // Cannot optimize for this ordering - no change
115                    Ok(Transformed::no(plan))
116                }
117            }
118        })
119        .data()
120    }
121
122    fn name(&self) -> &str {
123        "PushdownSort"
124    }
125
126    fn schema_check(&self) -> bool {
127        true
128    }
129}