datafusion_physical_optimizer/
update_aggr_exprs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An optimizer rule that checks ordering requirements of aggregate expressions
19//! and modifies the expressions to work more efficiently if possible.
20
21use std::sync::Arc;
22
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25use datafusion_common::{plan_datafusion_err, Result};
26use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
27use datafusion_physical_expr::LexRequirement;
28use datafusion_physical_expr::{
29    reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
30};
31use datafusion_physical_expr_common::sort_expr::LexOrdering;
32use datafusion_physical_plan::aggregates::concat_slices;
33use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
34use datafusion_physical_plan::{
35    aggregates::AggregateExec, ExecutionPlan, ExecutionPlanProperties,
36};
37
38use crate::PhysicalOptimizerRule;
39
40/// This optimizer rule checks ordering requirements of aggregate expressions.
41///
42/// There are 3 kinds of aggregators in terms of ordering requirements:
43/// - `AggregateOrderSensitivity::Insensitive`, meaning that ordering is not
44///   important.
45/// - `AggregateOrderSensitivity::HardRequirement`, meaning that the aggregator
46///   requires a specific ordering.
47/// - `AggregateOrderSensitivity::Beneficial`, meaning that the aggregator can
48///   handle unordered input, but can run more efficiently if its input conforms
49///   to a specific ordering.
50///
51/// This rule analyzes aggregate expressions of type `Beneficial` to see whether
52/// their input ordering requirements are satisfied. If this is the case, the
53/// aggregators are modified to run in a more efficient mode.
54#[derive(Default, Debug)]
55pub struct OptimizeAggregateOrder {}
56
57impl OptimizeAggregateOrder {
58    #[allow(missing_docs)]
59    pub fn new() -> Self {
60        Self::default()
61    }
62}
63
64impl PhysicalOptimizerRule for OptimizeAggregateOrder {
65    /// Applies the `OptimizeAggregateOrder` rule to the provided execution plan.
66    ///
67    /// This function traverses the execution plan tree, identifies `AggregateExec` nodes,
68    /// and optimizes their aggregate expressions based on existing input orderings.
69    /// If optimizations are applied, it returns a modified execution plan.
70    ///
71    /// # Arguments
72    ///
73    /// * `plan` - The root of the execution plan to optimize.
74    /// * `_config` - Configuration options (currently unused).
75    ///
76    /// # Returns
77    ///
78    /// A `Result` containing the potentially optimized execution plan or an error.
79    fn optimize(
80        &self,
81        plan: Arc<dyn ExecutionPlan>,
82        _config: &ConfigOptions,
83    ) -> Result<Arc<dyn ExecutionPlan>> {
84        plan.transform_up(|plan| {
85            if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
86                // Final stage implementations do not rely on ordering -- those
87                // ordering fields may be pruned out by first stage aggregates.
88                // Hence, necessary information for proper merge is added during
89                // the first stage to the state field, which the final stage uses.
90                if !aggr_exec.mode().is_first_stage() {
91                    return Ok(Transformed::no(plan));
92                }
93                let input = aggr_exec.input();
94                let mut aggr_expr = aggr_exec.aggr_expr().to_vec();
95
96                let groupby_exprs = aggr_exec.group_expr().input_exprs();
97                // If the existing ordering satisfies a prefix of the GROUP BY
98                // expressions, prefix requirements with this section. In this
99                // case, aggregation will work more efficiently.
100                let indices = get_ordered_partition_by_indices(&groupby_exprs, input);
101                let requirement = indices
102                    .iter()
103                    .map(|&idx| {
104                        PhysicalSortRequirement::new(
105                            Arc::<dyn datafusion_physical_plan::PhysicalExpr>::clone(
106                                &groupby_exprs[idx],
107                            ),
108                            None,
109                        )
110                    })
111                    .collect::<Vec<_>>();
112
113                aggr_expr = try_convert_aggregate_if_better(
114                    aggr_expr,
115                    &requirement,
116                    input.equivalence_properties(),
117                )?;
118
119                let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_expr);
120
121                Ok(Transformed::yes(Arc::new(aggr_exec) as _))
122            } else {
123                Ok(Transformed::no(plan))
124            }
125        })
126        .data()
127    }
128
129    fn name(&self) -> &str {
130        "OptimizeAggregateOrder"
131    }
132
133    fn schema_check(&self) -> bool {
134        true
135    }
136}
137
138/// Tries to convert each aggregate expression to a potentially more efficient
139/// version.
140///
141/// # Parameters
142///
143/// * `aggr_exprs` - A vector of `AggregateFunctionExpr` representing the
144///   aggregate expressions to be optimized.
145/// * `prefix_requirement` - An array slice representing the ordering
146///   requirements preceding the aggregate expressions.
147/// * `eq_properties` - A reference to the `EquivalenceProperties` object
148///   containing ordering information.
149///
150/// # Returns
151///
152/// Returns `Ok(converted_aggr_exprs)` if the conversion process completes
153/// successfully. Any errors occurring during the conversion process are
154/// passed through.
155fn try_convert_aggregate_if_better(
156    aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
157    prefix_requirement: &[PhysicalSortRequirement],
158    eq_properties: &EquivalenceProperties,
159) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
160    aggr_exprs
161        .into_iter()
162        .map(|aggr_expr| {
163            let aggr_sort_exprs = aggr_expr
164                .order_bys()
165                .unwrap_or_else(|| LexOrdering::empty());
166            let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
167            let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone());
168            let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs);
169
170            // If the aggregate expression benefits from input ordering, and
171            // there is an actual ordering enabling this, try to update the
172            // aggregate expression to benefit from the existing ordering.
173            // Otherwise, leave it as is.
174            if aggr_expr.order_sensitivity().is_beneficial() && !aggr_sort_reqs.is_empty()
175            {
176                let reqs = LexRequirement {
177                    inner: concat_slices(prefix_requirement, &aggr_sort_reqs),
178                };
179
180                let prefix_requirement = LexRequirement {
181                    inner: prefix_requirement.to_vec(),
182                };
183
184                if eq_properties.ordering_satisfy_requirement(&reqs) {
185                    // Existing ordering satisfies the aggregator requirements:
186                    aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
187                } else if eq_properties.ordering_satisfy_requirement(&LexRequirement {
188                    inner: concat_slices(&prefix_requirement, &reverse_aggr_req),
189                }) {
190                    // Converting to reverse enables more efficient execution
191                    // given the existing ordering (if possible):
192                    aggr_expr
193                        .reverse_expr()
194                        .map(Arc::new)
195                        .unwrap_or(aggr_expr)
196                        .with_beneficial_ordering(true)?
197                        .map(Arc::new)
198                } else {
199                    // There is no beneficial ordering present -- aggregation
200                    // will still work albeit in a less efficient mode.
201                    aggr_expr.with_beneficial_ordering(false)?.map(Arc::new)
202                }
203                .ok_or_else(|| {
204                    plan_datafusion_err!(
205                    "Expects an aggregate expression that can benefit from input ordering"
206                )
207                })
208            } else {
209                Ok(aggr_expr)
210            }
211        })
212        .collect()
213}