datafusion_physical_optimizer/optimizer.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical optimizer traits
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::aggregate_statistics::AggregateStatistics;
24use crate::combine_partial_final_agg::CombinePartialFinalAggregate;
25use crate::enforce_distribution::EnforceDistribution;
26use crate::enforce_sorting::EnforceSorting;
27use crate::ensure_coop::EnsureCooperative;
28use crate::filter_pushdown::FilterPushdown;
29use crate::join_selection::JoinSelection;
30use crate::limit_pushdown::LimitPushdown;
31use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
32use crate::output_requirements::OutputRequirements;
33use crate::projection_pushdown::ProjectionPushdown;
34use crate::sanity_checker::SanityCheckPlan;
35use crate::topk_aggregation::TopKAggregation;
36use crate::update_aggr_exprs::OptimizeAggregateOrder;
37
38use crate::limit_pushdown_past_window::LimitPushPastWindows;
39use crate::pushdown_sort::PushdownSort;
40use datafusion_common::Result;
41use datafusion_common::config::ConfigOptions;
42use datafusion_physical_plan::ExecutionPlan;
43
44/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
45/// computes the same results, but in a potentially more efficient way.
46///
47/// Use [`SessionState::add_physical_optimizer_rule`] to register additional
48/// `PhysicalOptimizerRule`s.
49///
50/// [`SessionState::add_physical_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html#method.add_physical_optimizer_rule
51pub trait PhysicalOptimizerRule: Debug {
52 /// Rewrite `plan` to an optimized form
53 fn optimize(
54 &self,
55 plan: Arc<dyn ExecutionPlan>,
56 config: &ConfigOptions,
57 ) -> Result<Arc<dyn ExecutionPlan>>;
58
59 /// A human readable name for this optimizer rule
60 fn name(&self) -> &str;
61
62 /// A flag to indicate whether the physical planner should validate that the rule will not
63 /// change the schema of the plan after the rewriting.
64 /// Some of the optimization rules might change the nullable properties of the schema
65 /// and should disable the schema check.
66 fn schema_check(&self) -> bool;
67}
68
69/// A rule-based physical optimizer.
70#[derive(Clone, Debug)]
71pub struct PhysicalOptimizer {
72 /// All rules to apply
73 pub rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
74}
75
76impl Default for PhysicalOptimizer {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82impl PhysicalOptimizer {
83 /// Create a new optimizer using the recommended list of rules
84 pub fn new() -> Self {
85 // NOTEs:
86 // - The order of rules in this list is important, as it determines the
87 // order in which they are applied.
88 // - Adding a new rule here is expensive as it will be applied to all
89 // queries, and will likely increase the optimization time. Please extend
90 // existing rules when possible, rather than adding a new rule.
91 let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
92 // If there is a output requirement of the query, make sure that
93 // this information is not lost across different rules during optimization.
94 Arc::new(OutputRequirements::new_add_mode()),
95 Arc::new(AggregateStatistics::new()),
96 // Statistics-based join selection will change the Auto mode to a real join implementation,
97 // like collect left, or hash join, or future sort merge join, which will influence the
98 // EnforceDistribution and EnforceSorting rules as they decide whether to add additional
99 // repartitioning and local sorting steps to meet distribution and ordering requirements.
100 // Therefore, it should run before EnforceDistribution and EnforceSorting.
101 Arc::new(JoinSelection::new()),
102 // The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule,
103 // as that rule may inject other operations in between the different AggregateExecs.
104 // Applying the rule early means only directly-connected AggregateExecs must be examined.
105 Arc::new(LimitedDistinctAggregation::new()),
106 // The FilterPushdown rule tries to push down filters as far as it can.
107 // For example, it will push down filtering from a `FilterExec` to `DataSourceExec`.
108 // Note that this does not push down dynamic filters (such as those created by a `SortExec` operator in TopK mode),
109 // those are handled by the later `FilterPushdown` rule.
110 // See `FilterPushdownPhase` for more details.
111 Arc::new(FilterPushdown::new()),
112 // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
113 // requirements. Please make sure that the whole plan tree is determined before this rule.
114 // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
115 // least one of the operators in the plan benefits from increased parallelism.
116 Arc::new(EnforceDistribution::new()),
117 // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule
118 Arc::new(CombinePartialFinalAggregate::new()),
119 // The EnforceSorting rule is for adding essential local sorting to satisfy the required
120 // ordering. Please make sure that the whole plan tree is determined before this rule.
121 // Note that one should always run this rule after running the EnforceDistribution rule
122 // as the latter may break local sorting requirements.
123 Arc::new(EnforceSorting::new()),
124 // Run once after the local sorting requirement is changed
125 Arc::new(OptimizeAggregateOrder::new()),
126 // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future.
127 Arc::new(ProjectionPushdown::new()),
128 // Remove the ancillary output requirement operator since we are done with the planning
129 // phase.
130 Arc::new(OutputRequirements::new_remove_mode()),
131 // The aggregation limiter will try to find situations where the accumulator count
132 // is not tied to the cardinality, i.e. when the output of the aggregation is passed
133 // into an `order by max(x) limit y`. In this case it will copy the limit value down
134 // to the aggregation, allowing it to use only y number of accumulators.
135 Arc::new(TopKAggregation::new()),
136 // Tries to push limits down through window functions, growing as appropriate
137 // This can possibly be combined with [LimitPushdown]
138 // It needs to come after [EnforceSorting]
139 Arc::new(LimitPushPastWindows::new()),
140 // The LimitPushdown rule tries to push limits down as far as possible,
141 // replacing operators with fetching variants, or adding limits
142 // past operators that support limit pushdown.
143 Arc::new(LimitPushdown::new()),
144 // The ProjectionPushdown rule tries to push projections towards
145 // the sources in the execution plan. As a result of this process,
146 // a projection can disappear if it reaches the source providers, and
147 // sequential projections can merge into one. Even if these two cases
148 // are not present, the load of executors such as join or union will be
149 // reduced by narrowing their input tables.
150 Arc::new(ProjectionPushdown::new()),
151 // PushdownSort: Detect sorts that can be pushed down to data sources.
152 Arc::new(PushdownSort::new()),
153 Arc::new(EnsureCooperative::new()),
154 // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan.
155 // Therefore it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references.
156 // See `FilterPushdownPhase` for more details.
157 Arc::new(FilterPushdown::new_post_optimization()),
158 // The SanityCheckPlan rule checks whether the order and
159 // distribution requirements of each node in the plan
160 // is satisfied. It will also reject non-runnable query
161 // plans that use pipeline-breaking operators on infinite
162 // input(s). The rule generates a diagnostic error
163 // message for invalid plans. It makes no changes to the
164 // given query plan; i.e. it only acts as a final
165 // gatekeeping rule.
166 Arc::new(SanityCheckPlan::new()),
167 ];
168
169 Self::with_rules(rules)
170 }
171
172 /// Create a new optimizer with the given rules
173 pub fn with_rules(rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>) -> Self {
174 Self { rules }
175 }
176}