Skip to main content

datafusion_physical_optimizer/
aggregate_statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilizing exact statistics from sources to avoid scanning data
19use datafusion_common::Result;
20use datafusion_common::config::ConfigOptions;
21use datafusion_common::scalar::ScalarValue;
22use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
23use datafusion_physical_plan::aggregates::{
24    AggregateExec, AggregateInputMode, AggregateMode,
25};
26use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
27use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
28use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
29use datafusion_physical_plan::{ExecutionPlan, expressions};
30use std::sync::Arc;
31
32use crate::PhysicalOptimizerRule;
33
34/// Optimizer that uses available statistics for aggregate functions
35#[derive(Default, Debug)]
36pub struct AggregateStatistics {}
37
38impl AggregateStatistics {
39    #[expect(missing_docs)]
40    pub fn new() -> Self {
41        Self {}
42    }
43}
44
45impl PhysicalOptimizerRule for AggregateStatistics {
46    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
47    #[expect(clippy::allow_attributes)] // See https://github.com/apache/datafusion/issues/18881#issuecomment-3621545670
48    #[allow(clippy::only_used_in_recursion)] // See https://github.com/rust-lang/rust-clippy/issues/14566
49    fn optimize(
50        &self,
51        plan: Arc<dyn ExecutionPlan>,
52        config: &ConfigOptions,
53    ) -> Result<Arc<dyn ExecutionPlan>> {
54        if let Some(partial_agg_exec) = take_optimizable(&plan) {
55            let partial_agg_exec = partial_agg_exec
56                .downcast_ref::<AggregateExec>()
57                .expect("take_optimizable() ensures that this is a AggregateExec");
58            let stats = partial_agg_exec.input().partition_statistics(None)?;
59            let mut projections = vec![];
60            for expr in partial_agg_exec.aggr_expr() {
61                let field = expr.field();
62                let args = expr.expressions();
63                let statistics_args = StatisticsArgs {
64                    statistics: &stats,
65                    return_type: field.data_type(),
66                    is_distinct: expr.is_distinct(),
67                    exprs: args.as_slice(),
68                };
69                if let Some((optimizable_statistic, name)) =
70                    take_optimizable_value_from_statistics(&statistics_args, expr)
71                {
72                    projections.push(ProjectionExpr {
73                        expr: expressions::lit(optimizable_statistic),
74                        alias: name.to_owned(),
75                    });
76                } else {
77                    // TODO: we need all aggr_expr to be resolved (cf TODO fullres)
78                    break;
79                }
80            }
81
82            // TODO fullres: use statistics even if not all aggr_expr could be resolved
83            if projections.len() == partial_agg_exec.aggr_expr().len() {
84                // input can be entirely removed
85                Ok(Arc::new(ProjectionExec::try_new(
86                    projections,
87                    Arc::new(PlaceholderRowExec::new(plan.schema())),
88                )?))
89            } else {
90                plan.map_children(|child| {
91                    self.optimize(child, config).map(Transformed::yes)
92                })
93                .data()
94            }
95        } else {
96            plan.map_children(|child| self.optimize(child, config).map(Transformed::yes))
97                .data()
98        }
99    }
100
101    fn name(&self) -> &str {
102        "aggregate_statistics"
103    }
104
105    /// This rule will change the nullable properties of the schema, disable the schema check.
106    fn schema_check(&self) -> bool {
107        false
108    }
109}
110
111/// Returns an `AggregateExec` whose statistics can replace the aggregate with
112/// literal values: either a `Single`/`SinglePartitioned` aggregate, or a
113/// `Final` aggregate wrapping a `Partial`. Must have no GROUP BY and no
114/// filters.
115fn take_optimizable(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
116    let agg_exec = plan.downcast_ref::<AggregateExec>()?;
117
118    if matches!(
119        agg_exec.mode(),
120        AggregateMode::Single | AggregateMode::SinglePartitioned
121    ) && agg_exec.group_expr().is_empty()
122        && agg_exec.filter_expr().iter().all(|e| e.is_none())
123    {
124        return Some(Arc::clone(plan));
125    }
126
127    if agg_exec.mode().input_mode() == AggregateInputMode::Partial
128        && agg_exec.group_expr().is_empty()
129    {
130        let mut child = Arc::clone(agg_exec.input());
131        loop {
132            if let Some(partial_agg_exec) = child.downcast_ref::<AggregateExec>()
133                && partial_agg_exec.mode().input_mode() == AggregateInputMode::Raw
134                && partial_agg_exec.group_expr().is_empty()
135                && partial_agg_exec.filter_expr().iter().all(|e| e.is_none())
136            {
137                return Some(child);
138            }
139            if let [childrens_child] = child.children().as_slice() {
140                child = Arc::clone(childrens_child);
141            } else {
142                break;
143            }
144        }
145    }
146    None
147}
148
149/// If this agg_expr is a max that is exactly defined in the statistics, return it.
150fn take_optimizable_value_from_statistics(
151    statistics_args: &StatisticsArgs,
152    agg_expr: &AggregateFunctionExpr,
153) -> Option<(ScalarValue, String)> {
154    let value = agg_expr.fun().value_from_stats(statistics_args);
155    value.map(|val| (val, agg_expr.name().to_string()))
156}
157
158// See tests in datafusion/core/tests/physical_optimizer