datafusion_physical_optimizer/optimizer.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical optimizer traits
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::aggregate_statistics::AggregateStatistics;
24use crate::combine_partial_final_agg::CombinePartialFinalAggregate;
25use crate::enforce_distribution::EnforceDistribution;
26use crate::enforce_sorting::EnforceSorting;
27use crate::ensure_coop::EnsureCooperative;
28use crate::filter_pushdown::FilterPushdown;
29use crate::join_selection::JoinSelection;
30use crate::limit_pushdown::LimitPushdown;
31use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
32use crate::output_requirements::OutputRequirements;
33use crate::projection_pushdown::ProjectionPushdown;
34use crate::sanity_checker::SanityCheckPlan;
35use crate::topk_aggregation::TopKAggregation;
36use crate::topk_repartition::TopKRepartition;
37use crate::update_aggr_exprs::OptimizeAggregateOrder;
38
39use crate::hash_join_buffering::HashJoinBuffering;
40use crate::limit_pushdown_past_window::LimitPushPastWindows;
41use crate::pushdown_sort::PushdownSort;
42use crate::window_topn::WindowTopN;
43use datafusion_common::Result;
44use datafusion_common::config::ConfigOptions;
45use datafusion_physical_plan::ExecutionPlan;
46use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
47
48/// Context available to physical optimizer rules.
49///
50/// This trait provides access to configuration options and optional statistics
51/// registry for enhanced statistics lookup. It allows optimizer rules to access
52/// extended context without changing the core [`PhysicalOptimizerRule::optimize`]
53/// signature.
54pub trait PhysicalOptimizerContext: Send + Sync {
55 /// Returns the configuration options.
56 fn config_options(&self) -> &ConfigOptions;
57
58 /// Returns the statistics registry for enhanced statistics lookup.
59 ///
60 /// Returns `None` if no registry is configured, in which case rules
61 /// should fall back to using `ExecutionPlan::partition_statistics()`.
62 fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
63 None
64 }
65}
66
67/// Simple context wrapping [`ConfigOptions`] for backward compatibility.
68///
69/// This struct provides a minimal implementation of [`PhysicalOptimizerContext`]
70/// that only supplies configuration options. Used when no statistics registry
71/// is available or needed.
72pub struct ConfigOnlyContext<'a> {
73 config: &'a ConfigOptions,
74}
75
76impl<'a> ConfigOnlyContext<'a> {
77 /// Create a new context wrapping the given config options.
78 pub fn new(config: &'a ConfigOptions) -> Self {
79 Self { config }
80 }
81}
82
83impl PhysicalOptimizerContext for ConfigOnlyContext<'_> {
84 fn config_options(&self) -> &ConfigOptions {
85 self.config
86 }
87}
88
89/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
90/// computes the same results, but in a potentially more efficient way.
91///
92/// Use [`SessionState::add_physical_optimizer_rule`] to register additional
93/// `PhysicalOptimizerRule`s.
94///
95/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
96pub trait PhysicalOptimizerRule: Debug + std::any::Any {
97 /// Rewrite `plan` to an optimized form.
98 ///
99 /// This is the primary optimization method. For rules that need access to
100 /// the statistics registry, override [`optimize_with_context`](Self::optimize_with_context) instead.
101 fn optimize(
102 &self,
103 plan: Arc<dyn ExecutionPlan>,
104 config: &ConfigOptions,
105 ) -> Result<Arc<dyn ExecutionPlan>>;
106
107 /// Rewrite `plan` with access to extended context (statistics registry, etc.).
108 ///
109 /// Override this method if you need access to the statistics registry for
110 /// enhanced statistics lookup. The default implementation simply calls
111 /// [`optimize`](Self::optimize) with the config options from the context.
112 fn optimize_with_context(
113 &self,
114 plan: Arc<dyn ExecutionPlan>,
115 context: &dyn PhysicalOptimizerContext,
116 ) -> Result<Arc<dyn ExecutionPlan>> {
117 self.optimize(plan, context.config_options())
118 }
119
120 /// A human readable name for this optimizer rule
121 fn name(&self) -> &str;
122
123 /// A flag to indicate whether the physical planner should validate that the rule will not
124 /// change the schema of the plan after the rewriting.
125 /// Some of the optimization rules might change the nullable properties of the schema
126 /// and should disable the schema check.
127 fn schema_check(&self) -> bool;
128}
129
130/// A rule-based physical optimizer.
131#[derive(Clone, Debug)]
132pub struct PhysicalOptimizer {
133 /// All rules to apply
134 pub rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
135}
136
137impl Default for PhysicalOptimizer {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143impl PhysicalOptimizer {
144 /// Create a new optimizer using the recommended list of rules
145 pub fn new() -> Self {
146 // NOTEs:
147 // - The order of rules in this list is important, as it determines the
148 // order in which they are applied.
149 // - Adding a new rule here is expensive as it will be applied to all
150 // queries, and will likely increase the optimization time. Please extend
151 // existing rules when possible, rather than adding a new rule.
152 let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
153 // If there is a output requirement of the query, make sure that
154 // this information is not lost across different rules during optimization.
155 Arc::new(OutputRequirements::new_add_mode()),
156 Arc::new(AggregateStatistics::new()),
157 // Statistics-based join selection will change the Auto mode to a real join implementation,
158 // like collect left, or hash join, or future sort merge join, which will influence the
159 // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
160 // repartitioning and local sorting steps to meet distribution and ordering requirements.
161 // Therefore, it should run before EnforceDistribution and EnforceSorting.
162 Arc::new(JoinSelection::new()),
163 // The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule,
164 // as that rule may inject other operations in between the different AggregateExecs.
165 // Applying the rule early means only directly-connected AggregateExecs must be examined.
166 Arc::new(LimitedDistinctAggregation::new()),
167 // The FilterPushdown rule tries to push down filters as far as it can.
168 // For example, it will push down filtering from a `FilterExec` to `DataSourceExec`.
169 // Note that this does not push down dynamic filters (such as those created by a `SortExec` operator in TopK mode),
170 // those are handled by the later `FilterPushdown` rule.
171 // See `FilterPushdownPhase` for more details.
172 Arc::new(FilterPushdown::new()),
173 // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
174 // requirements. Please make sure that the whole plan tree is determined before this rule.
175 // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
176 // least one of the operators in the plan benefits from increased parallelism.
177 Arc::new(EnforceDistribution::new()),
178 // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
179 Arc::new(CombinePartialFinalAggregate::new()),
180 // The EnforceSorting rule is for adding essential local sorting to satisfy the required
181 // ordering. Please make sure that the whole plan tree is determined before this rule.
182 // Note that one should always run this rule after running the EnforceDistribution rule
183 // as the latter may break local sorting requirements.
184 Arc::new(EnforceSorting::new()),
185 // Run once after the local sorting requirement is changed
186 Arc::new(OptimizeAggregateOrder::new()),
187 // WindowTopN: replaces Filter(rn<=K) → Window(ROW_NUMBER) → Sort
188 // with Window(ROW_NUMBER) → PartitionedTopKExec(fetch=K).
189 // Must run after EnforceSorting (which inserts SortExec) and before
190 // ProjectionPushdown (which embeds projections into FilterExec).
191 Arc::new(WindowTopN::new()),
192 // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future.
193 Arc::new(ProjectionPushdown::new()),
194 // Remove the ancillary output requirement operator since we are done with the planning
195 // phase.
196 Arc::new(OutputRequirements::new_remove_mode()),
197 // The aggregation limiter will try to find situations where the accumulator count
198 // is not tied to the cardinality, i.e. when the output of the aggregation is passed
199 // into an `order by max(x) limit y`. In this case it will copy the limit value down
200 // to the aggregation, allowing it to use only y number of accumulators.
201 Arc::new(TopKAggregation::new()),
202 // Tries to push limits down through window functions, growing as appropriate
203 // This can possibly be combined with [LimitPushdown]
204 // It needs to come after [EnforceSorting]
205 Arc::new(LimitPushPastWindows::new()),
206 // The HashJoinBuffering rule adds a BufferExec node with the configured capacity
207 // in the prob side of hash joins. That way, the probe side gets eagerly polled before
208 // the build side is completely finished.
209 Arc::new(HashJoinBuffering::new()),
210 // The LimitPushdown rule tries to push limits down as far as possible,
211 // replacing operators with fetching variants, or adding limits
212 // past operators that support limit pushdown.
213 Arc::new(LimitPushdown::new()),
214 // TopKRepartition pushes TopK (Sort with fetch) below Hash
215 // repartition when the partition key is a prefix of the sort key.
216 // This reduces data volume before a hash shuffle. It must run
217 // after LimitPushdown so that the TopK already exists on the SortExec.
218 Arc::new(TopKRepartition::new()),
219 // The ProjectionPushdown rule tries to push projections towards
220 // the sources in the execution plan. As a result of this process,
221 // a projection can disappear if it reaches the source providers, and
222 // sequential projections can merge into one. Even if these two cases
223 // are not present, the load of executors such as join or union will be
224 // reduced by narrowing their input tables.
225 Arc::new(ProjectionPushdown::new()),
226 // PushdownSort: Detect sorts that can be pushed down to data sources.
227 Arc::new(PushdownSort::new()),
228 Arc::new(EnsureCooperative::new()),
229 // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan.
230 // Therefore, it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references.
231 // See `FilterPushdownPhase` for more details.
232 Arc::new(FilterPushdown::new_post_optimization()),
233 // The SanityCheckPlan rule checks whether the order and
234 // distribution requirements of each node in the plan
235 // is satisfied. It will also reject non-runnable query
236 // plans that use pipeline-breaking operators on infinite
237 // input(s). The rule generates a diagnostic error
238 // message for invalid plans. It makes no changes to the
239 // given query plan; i.e. it only acts as a final
240 // gatekeeping rule.
241 Arc::new(SanityCheckPlan::new()),
242 ];
243
244 Self::with_rules(rules)
245 }
246
247 /// Create a new optimizer with the given rules
248 pub fn with_rules(rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
249 Self { rules }
250 }
251}