datafusion_physical_optimizer/update_aggr_exprs.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An optimizer rule that checks ordering requirements of aggregate expressions
19//! and modifies the expressions to work more efficiently if possible.
20
21use std::sync::Arc;
22
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25use datafusion_common::{Result, plan_datafusion_err};
26use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
27use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
28use datafusion_physical_plan::aggregates::{
29 AggregateExec, AggregateInputMode, concat_slices,
30};
31use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
32use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
33
34use crate::PhysicalOptimizerRule;
35
36/// This optimizer rule checks ordering requirements of aggregate expressions.
37///
38/// There are 3 kinds of aggregators in terms of ordering requirements:
39/// - `AggregateOrderSensitivity::Insensitive`, meaning that ordering is not
40/// important.
41/// - `AggregateOrderSensitivity::HardRequirement`, meaning that the aggregator
42/// requires a specific ordering.
43/// - `AggregateOrderSensitivity::Beneficial`, meaning that the aggregator can
44/// handle unordered input, but can run more efficiently if its input conforms
45/// to a specific ordering.
46///
47/// This rule analyzes aggregate expressions of type `Beneficial` to see whether
48/// their input ordering requirements are satisfied. If this is the case, the
49/// aggregators are modified to run in a more efficient mode.
50#[derive(Default, Debug)]
51pub struct OptimizeAggregateOrder {}
52
53impl OptimizeAggregateOrder {
54 #[expect(missing_docs)]
55 pub fn new() -> Self {
56 Self::default()
57 }
58}
59
60impl PhysicalOptimizerRule for OptimizeAggregateOrder {
61 /// Applies the `OptimizeAggregateOrder` rule to the provided execution plan.
62 ///
63 /// This function traverses the execution plan tree, identifies `AggregateExec` nodes,
64 /// and optimizes their aggregate expressions based on existing input orderings.
65 /// If optimizations are applied, it returns a modified execution plan.
66 ///
67 /// # Arguments
68 ///
69 /// * `plan` - The root of the execution plan to optimize.
70 /// * `_config` - Configuration options (currently unused).
71 ///
72 /// # Returns
73 ///
74 /// A `Result` containing the potentially optimized execution plan or an error.
75 fn optimize(
76 &self,
77 plan: Arc<dyn ExecutionPlan>,
78 _config: &ConfigOptions,
79 ) -> Result<Arc<dyn ExecutionPlan>> {
80 plan.transform_up(|plan| {
81 if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
82 // Final stage implementations do not rely on ordering -- those
83 // ordering fields may be pruned out by first stage aggregates.
84 // Hence, necessary information for proper merge is added during
85 // the first stage to the state field, which the final stage uses.
86 if aggr_exec.mode().input_mode() == AggregateInputMode::Partial {
87 return Ok(Transformed::no(plan));
88 }
89 let input = aggr_exec.input();
90 let mut aggr_exprs = aggr_exec.aggr_expr().to_vec();
91
92 let groupby_exprs = aggr_exec.group_expr().input_exprs();
93 // If the existing ordering satisfies a prefix of the GROUP BY
94 // expressions, prefix requirements with this section. In this
95 // case, aggregation will work more efficiently.
96 let indices = get_ordered_partition_by_indices(&groupby_exprs, input)?;
97 let requirement = indices
98 .iter()
99 .map(|&idx| {
100 PhysicalSortRequirement::new(
101 Arc::clone(&groupby_exprs[idx]),
102 None,
103 )
104 })
105 .collect::<Vec<_>>();
106
107 aggr_exprs = try_convert_aggregate_if_better(
108 aggr_exprs,
109 &requirement,
110 input.equivalence_properties(),
111 )?;
112
113 let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_exprs);
114
115 Ok(Transformed::yes(Arc::new(aggr_exec) as _))
116 } else {
117 Ok(Transformed::no(plan))
118 }
119 })
120 .data()
121 }
122
123 fn name(&self) -> &str {
124 "OptimizeAggregateOrder"
125 }
126
127 fn schema_check(&self) -> bool {
128 true
129 }
130}
131
132/// Tries to convert each aggregate expression to a potentially more efficient
133/// version.
134///
135/// # Parameters
136///
137/// * `aggr_exprs` - A vector of `AggregateFunctionExpr` representing the
138/// aggregate expressions to be optimized.
139/// * `prefix_requirement` - An array slice representing the ordering
140/// requirements preceding the aggregate expressions.
141/// * `eq_properties` - A reference to the `EquivalenceProperties` object
142/// containing ordering information.
143///
144/// # Returns
145///
146/// Returns `Ok(converted_aggr_exprs)` if the conversion process completes
147/// successfully. Any errors occurring during the conversion process are
148/// passed through.
149fn try_convert_aggregate_if_better(
150 aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
151 prefix_requirement: &[PhysicalSortRequirement],
152 eq_properties: &EquivalenceProperties,
153) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
154 aggr_exprs
155 .into_iter()
156 .map(|aggr_expr| {
157 let order_bys = aggr_expr.order_bys();
158 // If the aggregate expression benefits from input ordering, and
159 // there is an actual ordering enabling this, try to update the
160 // aggregate expression to benefit from the existing ordering.
161 // Otherwise, leave it as is.
162 if !aggr_expr.order_sensitivity().is_beneficial() {
163 Ok(aggr_expr)
164 } else if !order_bys.is_empty() {
165 if eq_properties.ordering_satisfy_requirement(concat_slices(
166 prefix_requirement,
167 &order_bys
168 .iter()
169 .map(|e| e.clone().into())
170 .collect::<Vec<_>>(),
171 ))? {
172 // Existing ordering satisfies the aggregator requirements:
173 aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
174 } else if eq_properties.ordering_satisfy_requirement(concat_slices(
175 prefix_requirement,
176 &order_bys
177 .iter()
178 .map(|e| e.reverse().into())
179 .collect::<Vec<_>>(),
180 ))? {
181 // Converting to reverse enables more efficient execution
182 // given the existing ordering (if possible):
183 aggr_expr
184 .reverse_expr()
185 .map(Arc::new)
186 .unwrap_or(aggr_expr)
187 .with_beneficial_ordering(true)?
188 .map(Arc::new)
189 } else {
190 // There is no beneficial ordering present -- aggregation
191 // will still work albeit in a less efficient mode.
192 aggr_expr.with_beneficial_ordering(false)?.map(Arc::new)
193 }
194 .ok_or_else(|| {
195 plan_datafusion_err!(
196 "Expects an aggregate expression that can benefit from input ordering"
197 )
198 })
199 } else {
200 Ok(aggr_expr)
201 }
202 })
203 .collect()
204}