datafusion_physical_optimizer/update_aggr_exprs.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An optimizer rule that checks ordering requirements of aggregate expressions
19//! and modifies the expressions to work more efficiently if possible.
20
21use std::sync::Arc;
22
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25use datafusion_common::{plan_datafusion_err, Result};
26use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
27use datafusion_physical_expr::{
28 reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
29};
30use datafusion_physical_expr::{LexOrdering, LexRequirement};
31use datafusion_physical_plan::aggregates::concat_slices;
32use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
33use datafusion_physical_plan::{
34 aggregates::AggregateExec, ExecutionPlan, ExecutionPlanProperties,
35};
36
37use crate::PhysicalOptimizerRule;
38
39/// This optimizer rule checks ordering requirements of aggregate expressions.
40///
41/// There are 3 kinds of aggregators in terms of ordering requirements:
42/// - `AggregateOrderSensitivity::Insensitive`, meaning that ordering is not
43/// important.
44/// - `AggregateOrderSensitivity::HardRequirement`, meaning that the aggregator
45/// requires a specific ordering.
46/// - `AggregateOrderSensitivity::Beneficial`, meaning that the aggregator can
47/// handle unordered input, but can run more efficiently if its input conforms
48/// to a specific ordering.
49///
50/// This rule analyzes aggregate expressions of type `Beneficial` to see whether
51/// their input ordering requirements are satisfied. If this is the case, the
52/// aggregators are modified to run in a more efficient mode.
53#[derive(Default, Debug)]
54pub struct OptimizeAggregateOrder {}
55
56impl OptimizeAggregateOrder {
57 #[allow(missing_docs)]
58 pub fn new() -> Self {
59 Self::default()
60 }
61}
62
63impl PhysicalOptimizerRule for OptimizeAggregateOrder {
64 /// Applies the `OptimizeAggregateOrder` rule to the provided execution plan.
65 ///
66 /// This function traverses the execution plan tree, identifies `AggregateExec` nodes,
67 /// and optimizes their aggregate expressions based on existing input orderings.
68 /// If optimizations are applied, it returns a modified execution plan.
69 ///
70 /// # Arguments
71 ///
72 /// * `plan` - The root of the execution plan to optimize.
73 /// * `_config` - Configuration options (currently unused).
74 ///
75 /// # Returns
76 ///
77 /// A `Result` containing the potentially optimized execution plan or an error.
78 fn optimize(
79 &self,
80 plan: Arc<dyn ExecutionPlan>,
81 _config: &ConfigOptions,
82 ) -> Result<Arc<dyn ExecutionPlan>> {
83 plan.transform_up(|plan| {
84 if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
85 // Final stage implementations do not rely on ordering -- those
86 // ordering fields may be pruned out by first stage aggregates.
87 // Hence, necessary information for proper merge is added during
88 // the first stage to the state field, which the final stage uses.
89 if !aggr_exec.mode().is_first_stage() {
90 return Ok(Transformed::no(plan));
91 }
92 let input = aggr_exec.input();
93 let mut aggr_expr = aggr_exec.aggr_expr().to_vec();
94
95 let groupby_exprs = aggr_exec.group_expr().input_exprs();
96 // If the existing ordering satisfies a prefix of the GROUP BY
97 // expressions, prefix requirements with this section. In this
98 // case, aggregation will work more efficiently.
99 let indices = get_ordered_partition_by_indices(&groupby_exprs, input);
100 let requirement = indices
101 .iter()
102 .map(|&idx| {
103 PhysicalSortRequirement::new(
104 Arc::<dyn datafusion_physical_plan::PhysicalExpr>::clone(
105 &groupby_exprs[idx],
106 ),
107 None,
108 )
109 })
110 .collect::<Vec<_>>();
111
112 aggr_expr = try_convert_aggregate_if_better(
113 aggr_expr,
114 &requirement,
115 input.equivalence_properties(),
116 )?;
117
118 let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_expr);
119
120 Ok(Transformed::yes(Arc::new(aggr_exec) as _))
121 } else {
122 Ok(Transformed::no(plan))
123 }
124 })
125 .data()
126 }
127
128 fn name(&self) -> &str {
129 "OptimizeAggregateOrder"
130 }
131
132 fn schema_check(&self) -> bool {
133 true
134 }
135}
136
137/// Tries to convert each aggregate expression to a potentially more efficient
138/// version.
139///
140/// # Parameters
141///
142/// * `aggr_exprs` - A vector of `AggregateFunctionExpr` representing the
143/// aggregate expressions to be optimized.
144/// * `prefix_requirement` - An array slice representing the ordering
145/// requirements preceding the aggregate expressions.
146/// * `eq_properties` - A reference to the `EquivalenceProperties` object
147/// containing ordering information.
148///
149/// # Returns
150///
151/// Returns `Ok(converted_aggr_exprs)` if the conversion process completes
152/// successfully. Any errors occurring during the conversion process are
153/// passed through.
154fn try_convert_aggregate_if_better(
155 aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
156 prefix_requirement: &[PhysicalSortRequirement],
157 eq_properties: &EquivalenceProperties,
158) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
159 aggr_exprs
160 .into_iter()
161 .map(|aggr_expr| {
162 let aggr_sort_exprs = aggr_expr.order_bys().unwrap_or(LexOrdering::empty());
163 let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
164 let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone());
165 let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs);
166
167 // If the aggregate expression benefits from input ordering, and
168 // there is an actual ordering enabling this, try to update the
169 // aggregate expression to benefit from the existing ordering.
170 // Otherwise, leave it as is.
171 if aggr_expr.order_sensitivity().is_beneficial() && !aggr_sort_reqs.is_empty()
172 {
173 let reqs = LexRequirement {
174 inner: concat_slices(prefix_requirement, &aggr_sort_reqs),
175 };
176
177 let prefix_requirement = LexRequirement {
178 inner: prefix_requirement.to_vec(),
179 };
180
181 if eq_properties.ordering_satisfy_requirement(&reqs) {
182 // Existing ordering satisfies the aggregator requirements:
183 aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
184 } else if eq_properties.ordering_satisfy_requirement(&LexRequirement {
185 inner: concat_slices(&prefix_requirement, &reverse_aggr_req),
186 }) {
187 // Converting to reverse enables more efficient execution
188 // given the existing ordering (if possible):
189 aggr_expr
190 .reverse_expr()
191 .map(Arc::new)
192 .unwrap_or(aggr_expr)
193 .with_beneficial_ordering(true)?
194 .map(Arc::new)
195 } else {
196 // There is no beneficial ordering present -- aggregation
197 // will still work albeit in a less efficient mode.
198 aggr_expr.with_beneficial_ordering(false)?.map(Arc::new)
199 }
200 .ok_or_else(|| {
201 plan_datafusion_err!(
202 "Expects an aggregate expression that can benefit from input ordering"
203 )
204 })
205 } else {
206 Ok(aggr_expr)
207 }
208 })
209 .collect()
210}