datafusion_physical_optimizer/
optimizer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical optimizer traits
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::aggregate_statistics::AggregateStatistics;
24use crate::coalesce_batches::CoalesceBatches;
25use crate::combine_partial_final_agg::CombinePartialFinalAggregate;
26use crate::enforce_distribution::EnforceDistribution;
27use crate::enforce_sorting::EnforceSorting;
28use crate::filter_pushdown::FilterPushdown;
29use crate::join_selection::JoinSelection;
30use crate::limit_pushdown::LimitPushdown;
31use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
32use crate::output_requirements::OutputRequirements;
33use crate::projection_pushdown::ProjectionPushdown;
34use crate::sanity_checker::SanityCheckPlan;
35use crate::topk_aggregation::TopKAggregation;
36use crate::update_aggr_exprs::OptimizeAggregateOrder;
37
38use datafusion_common::config::ConfigOptions;
39use datafusion_common::Result;
40use datafusion_physical_plan::ExecutionPlan;
41
42/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
43/// computes the same results, but in a potentially more efficient way.
44///
45/// Use [`SessionState::add_physical_optimizer_rule`] to register additional
46/// `PhysicalOptimizerRule`s.
47///
48/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
49pub trait PhysicalOptimizerRule: Debug {
50    /// Rewrite `plan` to an optimized form
51    fn optimize(
52        &self,
53        plan: Arc<dyn ExecutionPlan>,
54        config: &ConfigOptions,
55    ) -> Result<Arc<dyn ExecutionPlan>>;
56
57    /// A human readable name for this optimizer rule
58    fn name(&self) -> &str;
59
60    /// A flag to indicate whether the physical planner should valid the rule will not
61    /// change the schema of the plan after the rewriting.
62    /// Some of the optimization rules might change the nullable properties of the schema
63    /// and should disable the schema check.
64    fn schema_check(&self) -> bool;
65}
66
67/// A rule-based physical optimizer.
68#[derive(Clone, Debug)]
69pub struct PhysicalOptimizer {
70    /// All rules to apply
71    pub rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
72}
73
74impl Default for PhysicalOptimizer {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl PhysicalOptimizer {
81    /// Create a new optimizer using the recommended list of rules
82    pub fn new() -> Self {
83        let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
84            // If there is a output requirement of the query, make sure that
85            // this information is not lost across different rules during optimization.
86            Arc::new(OutputRequirements::new_add_mode()),
87            Arc::new(AggregateStatistics::new()),
88            // Statistics-based join selection will change the Auto mode to a real join implementation,
89            // like collect left, or hash join, or future sort merge join, which will influence the
90            // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
91            // repartitioning and local sorting steps to meet distribution and ordering requirements.
92            // Therefore, it should run before EnforceDistribution and EnforceSorting.
93            Arc::new(JoinSelection::new()),
94            // The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule,
95            // as that rule may inject other operations in between the different AggregateExecs.
96            // Applying the rule early means only directly-connected AggregateExecs must be examined.
97            Arc::new(LimitedDistinctAggregation::new()),
98            // The FilterPushdown rule tries to push down filters as far as it can.
99            // For example, it will push down filtering from a `FilterExec` to
100            // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`.
101            Arc::new(FilterPushdown::new()),
102            // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
103            // requirements. Please make sure that the whole plan tree is determined before this rule.
104            // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
105            // least one of the operators in the plan benefits from increased parallelism.
106            Arc::new(EnforceDistribution::new()),
107            // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
108            Arc::new(CombinePartialFinalAggregate::new()),
109            // The EnforceSorting rule is for adding essential local sorting to satisfy the required
110            // ordering. Please make sure that the whole plan tree is determined before this rule.
111            // Note that one should always run this rule after running the EnforceDistribution rule
112            // as the latter may break local sorting requirements.
113            Arc::new(EnforceSorting::new()),
114            // Run once after the local sorting requirement is changed
115            Arc::new(OptimizeAggregateOrder::new()),
116            // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future.
117            Arc::new(ProjectionPushdown::new()),
118            // The CoalesceBatches rule will not influence the distribution and ordering of the
119            // whole plan tree. Therefore, to avoid influencing other rules, it should run last.
120            Arc::new(CoalesceBatches::new()),
121            // Remove the ancillary output requirement operator since we are done with the planning
122            // phase.
123            Arc::new(OutputRequirements::new_remove_mode()),
124            // The aggregation limiter will try to find situations where the accumulator count
125            // is not tied to the cardinality, i.e. when the output of the aggregation is passed
126            // into an `order by max(x) limit y`. In this case it will copy the limit value down
127            // to the aggregation, allowing it to use only y number of accumulators.
128            Arc::new(TopKAggregation::new()),
129            // The LimitPushdown rule tries to push limits down as far as possible,
130            // replacing operators with fetching variants, or adding limits
131            // past operators that support limit pushdown.
132            Arc::new(LimitPushdown::new()),
133            // The ProjectionPushdown rule tries to push projections towards
134            // the sources in the execution plan. As a result of this process,
135            // a projection can disappear if it reaches the source providers, and
136            // sequential projections can merge into one. Even if these two cases
137            // are not present, the load of executors such as join or union will be
138            // reduced by narrowing their input tables.
139            Arc::new(ProjectionPushdown::new()),
140            // The SanityCheckPlan rule checks whether the order and
141            // distribution requirements of each node in the plan
142            // is satisfied. It will also reject non-runnable query
143            // plans that use pipeline-breaking operators on infinite
144            // input(s). The rule generates a diagnostic error
145            // message for invalid plans. It makes no changes to the
146            // given query plan; i.e. it only acts as a final
147            // gatekeeping rule.
148            Arc::new(SanityCheckPlan::new()),
149        ];
150
151        Self::with_rules(rules)
152    }
153
154    /// Create a new optimizer with the given rules
155    pub fn with_rules(rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
156        Self { rules }
157    }
158}