datafusion_physical_optimizer/pushdown_sort.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Sort Pushdown Optimization
19//!
20//! This optimizer attempts to push sort requirements down through the execution plan
21//! tree to data sources that can natively handle them (e.g., by scanning files in
22//! reverse order).
23//!
24//! ## How it works
25//!
26//! 1. Detects `SortExec` nodes in the plan
27//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort requirement
28//! 3. Each node type defines its own pushdown behavior:
29//! - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.) delegate to
30//! their children and wrap the result
31//! - **Data sources** (DataSourceExec) check if they can optimize for the ordering
32//! - **Blocking nodes** return `Unsupported` to stop pushdown
33//! 4. Based on the result:
34//! - `Exact`: Remove the Sort operator (data source guarantees perfect ordering)
35//! - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK)
36//! - `Unsupported`: No change
37//!
38//! ## Current capabilities (Phase 1)
39//!
40//! - Reverse scan optimization: when required sort is the reverse of the data source's
41//! natural ordering, enable reverse scanning (reading row groups in reverse order)
42//! - Supports prefix matching: if data has ordering [A DESC, B ASC] and query needs
43//! [A ASC], reversing gives [A ASC, B DESC] which satisfies the requirement
44//!
45//! TODO Issue: <https://github.com/apache/datafusion/issues/19329>
46//! ## Future enhancements (Phase 2),
47//!
48//! - File reordering based on statistics
49//! - Return `Exact` when files are known to be perfectly sorted
50//! - Complete Sort elimination when ordering is guaranteed
51
52use crate::PhysicalOptimizerRule;
53use datafusion_common::Result;
54use datafusion_common::config::ConfigOptions;
55use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
56use datafusion_physical_plan::ExecutionPlan;
57use datafusion_physical_plan::SortOrderPushdownResult;
58use datafusion_physical_plan::sorts::sort::SortExec;
59use std::sync::Arc;
60
61/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
62///
63/// See module-level documentation for details.
64#[derive(Debug, Clone, Default)]
65pub struct PushdownSort;
66
67impl PushdownSort {
68 pub fn new() -> Self {
69 Self {}
70 }
71}
72
73impl PhysicalOptimizerRule for PushdownSort {
74 fn optimize(
75 &self,
76 plan: Arc<dyn ExecutionPlan>,
77 config: &ConfigOptions,
78 ) -> Result<Arc<dyn ExecutionPlan>> {
79 // Check if sort pushdown optimization is enabled
80 if !config.optimizer.enable_sort_pushdown {
81 return Ok(plan);
82 }
83
84 // Use transform_down to find and optimize all SortExec nodes (including nested ones)
85 plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
86 // Check if this is a SortExec
87 let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() else {
88 return Ok(Transformed::no(plan));
89 };
90
91 let sort_input = Arc::clone(sort_exec.input());
92 let required_ordering = sort_exec.expr();
93
94 // Try to push the sort requirement down through the plan tree
95 // Each node type defines its own pushdown behavior via try_pushdown_sort()
96 match sort_input.try_pushdown_sort(required_ordering)? {
97 SortOrderPushdownResult::Exact { inner } => {
98 // Data source guarantees perfect ordering - remove the Sort operator
99 Ok(Transformed::yes(inner))
100 }
101 SortOrderPushdownResult::Inexact { inner } => {
102 // Data source is optimized for the ordering but not perfectly sorted
103 // Keep the Sort operator but use the optimized input
104 // Benefits: TopK queries can terminate early, better cache locality
105 Ok(Transformed::yes(Arc::new(
106 SortExec::new(required_ordering.clone(), inner)
107 .with_fetch(sort_exec.fetch())
108 .with_preserve_partitioning(
109 sort_exec.preserve_partitioning(),
110 ),
111 )))
112 }
113 SortOrderPushdownResult::Unsupported => {
114 // Cannot optimize for this ordering - no change
115 Ok(Transformed::no(plan))
116 }
117 }
118 })
119 .data()
120 }
121
122 fn name(&self) -> &str {
123 "PushdownSort"
124 }
125
126 fn schema_check(&self) -> bool {
127 true
128 }
129}