Skip to main content

datafusion_physical_optimizer/
optimizer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical optimizer traits
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::aggregate_statistics::AggregateStatistics;
24use crate::combine_partial_final_agg::CombinePartialFinalAggregate;
25use crate::enforce_distribution::EnforceDistribution;
26use crate::enforce_sorting::EnforceSorting;
27use crate::ensure_coop::EnsureCooperative;
28use crate::filter_pushdown::FilterPushdown;
29use crate::join_selection::JoinSelection;
30use crate::limit_pushdown::LimitPushdown;
31use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
32use crate::output_requirements::OutputRequirements;
33use crate::projection_pushdown::ProjectionPushdown;
34use crate::sanity_checker::SanityCheckPlan;
35use crate::topk_aggregation::TopKAggregation;
36use crate::topk_repartition::TopKRepartition;
37use crate::update_aggr_exprs::OptimizeAggregateOrder;
38
39use crate::hash_join_buffering::HashJoinBuffering;
40use crate::limit_pushdown_past_window::LimitPushPastWindows;
41use crate::pushdown_sort::PushdownSort;
42use crate::window_topn::WindowTopN;
43use datafusion_common::Result;
44use datafusion_common::config::ConfigOptions;
45use datafusion_physical_plan::ExecutionPlan;
46use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
47
48/// Context available to physical optimizer rules.
49///
50/// This trait provides access to configuration options and optional statistics
51/// registry for enhanced statistics lookup. It allows optimizer rules to access
52/// extended context without changing the core [`PhysicalOptimizerRule::optimize`]
53/// signature.
54pub trait PhysicalOptimizerContext: Send + Sync {
55    /// Returns the configuration options.
56    fn config_options(&self) -> &ConfigOptions;
57
58    /// Returns the statistics registry for enhanced statistics lookup.
59    ///
60    /// Returns `None` if no registry is configured, in which case rules
61    /// should fall back to using `ExecutionPlan::partition_statistics()`.
62    fn statistics_registry(&self) -> Option<&StatisticsRegistry> {
63        None
64    }
65}
66
67/// Simple context wrapping [`ConfigOptions`] for backward compatibility.
68///
69/// This struct provides a minimal implementation of [`PhysicalOptimizerContext`]
70/// that only supplies configuration options. Used when no statistics registry
71/// is available or needed.
72pub struct ConfigOnlyContext<'a> {
73    config: &'a ConfigOptions,
74}
75
76impl<'a> ConfigOnlyContext<'a> {
77    /// Create a new context wrapping the given config options.
78    pub fn new(config: &'a ConfigOptions) -> Self {
79        Self { config }
80    }
81}
82
83impl PhysicalOptimizerContext for ConfigOnlyContext<'_> {
84    fn config_options(&self) -> &ConfigOptions {
85        self.config
86    }
87}
88
89/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
90/// computes the same results, but in a potentially more efficient way.
91///
92/// Use [`SessionState::add_physical_optimizer_rule`] to register additional
93/// `PhysicalOptimizerRule`s.
94///
95/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
96pub trait PhysicalOptimizerRule: Debug + std::any::Any {
97    /// Rewrite `plan` to an optimized form.
98    ///
99    /// This is the primary optimization method. For rules that need access to
100    /// the statistics registry, override [`optimize_with_context`](Self::optimize_with_context) instead.
101    fn optimize(
102        &self,
103        plan: Arc<dyn ExecutionPlan>,
104        config: &ConfigOptions,
105    ) -> Result<Arc<dyn ExecutionPlan>>;
106
107    /// Rewrite `plan` with access to extended context (statistics registry, etc.).
108    ///
109    /// Override this method if you need access to the statistics registry for
110    /// enhanced statistics lookup. The default implementation simply calls
111    /// [`optimize`](Self::optimize) with the config options from the context.
112    fn optimize_with_context(
113        &self,
114        plan: Arc<dyn ExecutionPlan>,
115        context: &dyn PhysicalOptimizerContext,
116    ) -> Result<Arc<dyn ExecutionPlan>> {
117        self.optimize(plan, context.config_options())
118    }
119
120    /// A human readable name for this optimizer rule
121    fn name(&self) -> &str;
122
123    /// A flag to indicate whether the physical planner should validate that the rule will not
124    /// change the schema of the plan after the rewriting.
125    /// Some of the optimization rules might change the nullable properties of the schema
126    /// and should disable the schema check.
127    fn schema_check(&self) -> bool;
128}
129
130/// A rule-based physical optimizer.
131#[derive(Clone, Debug)]
132pub struct PhysicalOptimizer {
133    /// All rules to apply
134    pub rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
135}
136
137impl Default for PhysicalOptimizer {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143impl PhysicalOptimizer {
144    /// Create a new optimizer using the recommended list of rules
145    pub fn new() -> Self {
146        // NOTEs:
147        // - The order of rules in this list is important, as it determines the
148        //   order in which they are applied.
149        // - Adding a new rule here is expensive as it will be applied to all
150        //   queries, and will likely increase the optimization time. Please extend
151        //   existing rules when possible, rather than adding a new rule.
152        let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
153            // If there is a output requirement of the query, make sure that
154            // this information is not lost across different rules during optimization.
155            Arc::new(OutputRequirements::new_add_mode()),
156            Arc::new(AggregateStatistics::new()),
157            // Statistics-based join selection will change the Auto mode to a real join implementation,
158            // like collect left, or hash join, or future sort merge join, which will influence the
159            // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
160            // repartitioning and local sorting steps to meet distribution and ordering requirements.
161            // Therefore, it should run before EnforceDistribution and EnforceSorting.
162            Arc::new(JoinSelection::new()),
163            // The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule,
164            // as that rule may inject other operations in between the different AggregateExecs.
165            // Applying the rule early means only directly-connected AggregateExecs must be examined.
166            Arc::new(LimitedDistinctAggregation::new()),
167            // The FilterPushdown rule tries to push down filters as far as it can.
168            // For example, it will push down filtering from a `FilterExec` to `DataSourceExec`.
169            // Note that this does not push down dynamic filters (such as those created by a `SortExec` operator in TopK mode),
170            // those are handled by the later `FilterPushdown` rule.
171            // See `FilterPushdownPhase` for more details.
172            Arc::new(FilterPushdown::new()),
173            // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
174            // requirements. Please make sure that the whole plan tree is determined before this rule.
175            // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
176            // least one of the operators in the plan benefits from increased parallelism.
177            Arc::new(EnforceDistribution::new()),
178            // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
179            Arc::new(CombinePartialFinalAggregate::new()),
180            // The EnforceSorting rule is for adding essential local sorting to satisfy the required
181            // ordering. Please make sure that the whole plan tree is determined before this rule.
182            // Note that one should always run this rule after running the EnforceDistribution rule
183            // as the latter may break local sorting requirements.
184            Arc::new(EnforceSorting::new()),
185            // Run once after the local sorting requirement is changed
186            Arc::new(OptimizeAggregateOrder::new()),
187            // WindowTopN: replaces Filter(rn<=K) → Window(ROW_NUMBER) → Sort
188            // with Window(ROW_NUMBER) → PartitionedTopKExec(fetch=K).
189            // Must run after EnforceSorting (which inserts SortExec) and before
190            // ProjectionPushdown (which embeds projections into FilterExec).
191            Arc::new(WindowTopN::new()),
192            // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future.
193            Arc::new(ProjectionPushdown::new()),
194            // Remove the ancillary output requirement operator since we are done with the planning
195            // phase.
196            Arc::new(OutputRequirements::new_remove_mode()),
197            // The aggregation limiter will try to find situations where the accumulator count
198            // is not tied to the cardinality, i.e. when the output of the aggregation is passed
199            // into an `order by max(x) limit y`. In this case it will copy the limit value down
200            // to the aggregation, allowing it to use only y number of accumulators.
201            Arc::new(TopKAggregation::new()),
202            // Tries to push limits down through window functions, growing as appropriate
203            // This can possibly be combined with [LimitPushdown]
204            // It needs to come after [EnforceSorting]
205            Arc::new(LimitPushPastWindows::new()),
206            // The HashJoinBuffering rule adds a BufferExec node with the configured capacity
207            // in the prob side of hash joins. That way, the probe side gets eagerly polled before
208            // the build side is completely finished.
209            Arc::new(HashJoinBuffering::new()),
210            // The LimitPushdown rule tries to push limits down as far as possible,
211            // replacing operators with fetching variants, or adding limits
212            // past operators that support limit pushdown.
213            Arc::new(LimitPushdown::new()),
214            // TopKRepartition pushes TopK (Sort with fetch) below Hash
215            // repartition when the partition key is a prefix of the sort key.
216            // This reduces data volume before a hash shuffle. It must run
217            // after LimitPushdown so that the TopK already exists on the SortExec.
218            Arc::new(TopKRepartition::new()),
219            // The ProjectionPushdown rule tries to push projections towards
220            // the sources in the execution plan. As a result of this process,
221            // a projection can disappear if it reaches the source providers, and
222            // sequential projections can merge into one. Even if these two cases
223            // are not present, the load of executors such as join or union will be
224            // reduced by narrowing their input tables.
225            Arc::new(ProjectionPushdown::new()),
226            // PushdownSort: Detect sorts that can be pushed down to data sources.
227            Arc::new(PushdownSort::new()),
228            Arc::new(EnsureCooperative::new()),
229            // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan.
230            // Therefore, it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references.
231            // See `FilterPushdownPhase` for more details.
232            Arc::new(FilterPushdown::new_post_optimization()),
233            // The SanityCheckPlan rule checks whether the order and
234            // distribution requirements of each node in the plan
235            // is satisfied. It will also reject non-runnable query
236            // plans that use pipeline-breaking operators on infinite
237            // input(s). The rule generates a diagnostic error
238            // message for invalid plans. It makes no changes to the
239            // given query plan; i.e. it only acts as a final
240            // gatekeeping rule.
241            Arc::new(SanityCheckPlan::new()),
242        ];
243
244        Self::with_rules(rules)
245    }
246
247    /// Create a new optimizer with the given rules
248    pub fn with_rules(rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
249        Self { rules }
250    }
251}