Skip to main content

datafusion_physical_optimizer/
update_aggr_exprs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An optimizer rule that checks ordering requirements of aggregate expressions
19//! and modifies the expressions to work more efficiently if possible.
20
21use std::sync::Arc;
22
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25use datafusion_common::{Result, plan_datafusion_err};
26use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
27use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
28use datafusion_physical_plan::aggregates::{
29    AggregateExec, AggregateInputMode, concat_slices,
30};
31use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
32use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
33
34use crate::PhysicalOptimizerRule;
35
36/// This optimizer rule checks ordering requirements of aggregate expressions.
37///
38/// There are 3 kinds of aggregators in terms of ordering requirements:
39/// - `AggregateOrderSensitivity::Insensitive`, meaning that ordering is not
40///   important.
41/// - `AggregateOrderSensitivity::HardRequirement`, meaning that the aggregator
42///   requires a specific ordering.
43/// - `AggregateOrderSensitivity::Beneficial`, meaning that the aggregator can
44///   handle unordered input, but can run more efficiently if its input conforms
45///   to a specific ordering.
46///
47/// This rule analyzes aggregate expressions of type `Beneficial` to see whether
48/// their input ordering requirements are satisfied. If this is the case, the
49/// aggregators are modified to run in a more efficient mode.
50#[derive(Default, Debug)]
51pub struct OptimizeAggregateOrder {}
52
53impl OptimizeAggregateOrder {
54    #[expect(missing_docs)]
55    pub fn new() -> Self {
56        Self::default()
57    }
58}
59
60impl PhysicalOptimizerRule for OptimizeAggregateOrder {
61    /// Applies the `OptimizeAggregateOrder` rule to the provided execution plan.
62    ///
63    /// This function traverses the execution plan tree, identifies `AggregateExec` nodes,
64    /// and optimizes their aggregate expressions based on existing input orderings.
65    /// If optimizations are applied, it returns a modified execution plan.
66    ///
67    /// # Arguments
68    ///
69    /// * `plan` - The root of the execution plan to optimize.
70    /// * `_config` - Configuration options (currently unused).
71    ///
72    /// # Returns
73    ///
74    /// A `Result` containing the potentially optimized execution plan or an error.
75    fn optimize(
76        &self,
77        plan: Arc<dyn ExecutionPlan>,
78        _config: &ConfigOptions,
79    ) -> Result<Arc<dyn ExecutionPlan>> {
80        plan.transform_up(|plan| {
81            if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
82                // Final stage implementations do not rely on ordering -- those
83                // ordering fields may be pruned out by first stage aggregates.
84                // Hence, necessary information for proper merge is added during
85                // the first stage to the state field, which the final stage uses.
86                if aggr_exec.mode().input_mode() == AggregateInputMode::Partial {
87                    return Ok(Transformed::no(plan));
88                }
89                let input = aggr_exec.input();
90                let mut aggr_exprs = aggr_exec.aggr_expr().to_vec();
91
92                let groupby_exprs = aggr_exec.group_expr().input_exprs();
93                // If the existing ordering satisfies a prefix of the GROUP BY
94                // expressions, prefix requirements with this section. In this
95                // case, aggregation will work more efficiently.
96                let indices = get_ordered_partition_by_indices(&groupby_exprs, input)?;
97                let requirement = indices
98                    .iter()
99                    .map(|&idx| {
100                        PhysicalSortRequirement::new(
101                            Arc::clone(&groupby_exprs[idx]),
102                            None,
103                        )
104                    })
105                    .collect::<Vec<_>>();
106
107                aggr_exprs = try_convert_aggregate_if_better(
108                    aggr_exprs,
109                    &requirement,
110                    input.equivalence_properties(),
111                )?;
112
113                let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_exprs);
114
115                Ok(Transformed::yes(Arc::new(aggr_exec) as _))
116            } else {
117                Ok(Transformed::no(plan))
118            }
119        })
120        .data()
121    }
122
123    fn name(&self) -> &str {
124        "OptimizeAggregateOrder"
125    }
126
127    fn schema_check(&self) -> bool {
128        true
129    }
130}
131
132/// Tries to convert each aggregate expression to a potentially more efficient
133/// version.
134///
135/// # Parameters
136///
137/// * `aggr_exprs` - A vector of `AggregateFunctionExpr` representing the
138///   aggregate expressions to be optimized.
139/// * `prefix_requirement` - An array slice representing the ordering
140///   requirements preceding the aggregate expressions.
141/// * `eq_properties` - A reference to the `EquivalenceProperties` object
142///   containing ordering information.
143///
144/// # Returns
145///
146/// Returns `Ok(converted_aggr_exprs)` if the conversion process completes
147/// successfully. Any errors occurring during the conversion process are
148/// passed through.
149fn try_convert_aggregate_if_better(
150    aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
151    prefix_requirement: &[PhysicalSortRequirement],
152    eq_properties: &EquivalenceProperties,
153) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
154    aggr_exprs
155        .into_iter()
156        .map(|aggr_expr| {
157            let order_bys = aggr_expr.order_bys();
158            // If the aggregate expression benefits from input ordering, and
159            // there is an actual ordering enabling this, try to update the
160            // aggregate expression to benefit from the existing ordering.
161            // Otherwise, leave it as is.
162            if !aggr_expr.order_sensitivity().is_beneficial() {
163                Ok(aggr_expr)
164            } else if !order_bys.is_empty() {
165                if eq_properties.ordering_satisfy_requirement(concat_slices(
166                    prefix_requirement,
167                    &order_bys
168                        .iter()
169                        .map(|e| e.clone().into())
170                        .collect::<Vec<_>>(),
171                ))? {
172                    // Existing ordering satisfies the aggregator requirements:
173                    aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
174                } else if eq_properties.ordering_satisfy_requirement(concat_slices(
175                    prefix_requirement,
176                    &order_bys
177                        .iter()
178                        .map(|e| e.reverse().into())
179                        .collect::<Vec<_>>(),
180                ))? {
181                    // Converting to reverse enables more efficient execution
182                    // given the existing ordering (if possible):
183                    aggr_expr
184                        .reverse_expr()
185                        .map(Arc::new)
186                        .unwrap_or(aggr_expr)
187                        .with_beneficial_ordering(true)?
188                        .map(Arc::new)
189                } else {
190                    // There is no beneficial ordering present -- aggregation
191                    // will still work albeit in a less efficient mode.
192                    aggr_expr.with_beneficial_ordering(false)?.map(Arc::new)
193                }
194                .ok_or_else(|| {
195                    plan_datafusion_err!(
196                    "Expects an aggregate expression that can benefit from input ordering"
197                )
198                })
199            } else {
200                Ok(aggr_expr)
201            }
202        })
203        .collect()
204}