datafusion_physical_optimizer/
aggregate_statistics.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilizing exact statistics from sources to avoid scanning data
19use datafusion_common::config::ConfigOptions;
20use datafusion_common::scalar::ScalarValue;
21use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22use datafusion_common::Result;
23use datafusion_physical_plan::aggregates::AggregateExec;
24use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
25use datafusion_physical_plan::projection::ProjectionExec;
26use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
27use datafusion_physical_plan::{expressions, ExecutionPlan};
28use std::sync::Arc;
29
30use crate::PhysicalOptimizerRule;
31
32/// Optimizer that uses available statistics for aggregate functions
33#[derive(Default, Debug)]
34pub struct AggregateStatistics {}
35
36impl AggregateStatistics {
37    #[allow(missing_docs)]
38    pub fn new() -> Self {
39        Self {}
40    }
41}
42
43impl PhysicalOptimizerRule for AggregateStatistics {
44    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
45    #[allow(clippy::only_used_in_recursion)] // See https://github.com/rust-lang/rust-clippy/issues/14566
46    fn optimize(
47        &self,
48        plan: Arc<dyn ExecutionPlan>,
49        config: &ConfigOptions,
50    ) -> Result<Arc<dyn ExecutionPlan>> {
51        if let Some(partial_agg_exec) = take_optimizable(&*plan) {
52            let partial_agg_exec = partial_agg_exec
53                .as_any()
54                .downcast_ref::<AggregateExec>()
55                .expect("take_optimizable() ensures that this is a AggregateExec");
56            let stats = partial_agg_exec.input().partition_statistics(None)?;
57            let mut projections = vec![];
58            for expr in partial_agg_exec.aggr_expr() {
59                let field = expr.field();
60                let args = expr.expressions();
61                let statistics_args = StatisticsArgs {
62                    statistics: &stats,
63                    return_type: field.data_type(),
64                    is_distinct: expr.is_distinct(),
65                    exprs: args.as_slice(),
66                };
67                if let Some((optimizable_statistic, name)) =
68                    take_optimizable_value_from_statistics(&statistics_args, expr)
69                {
70                    projections
71                        .push((expressions::lit(optimizable_statistic), name.to_owned()));
72                } else {
73                    // TODO: we need all aggr_expr to be resolved (cf TODO fullres)
74                    break;
75                }
76            }
77
78            // TODO fullres: use statistics even if not all aggr_expr could be resolved
79            if projections.len() == partial_agg_exec.aggr_expr().len() {
80                // input can be entirely removed
81                Ok(Arc::new(ProjectionExec::try_new(
82                    projections,
83                    Arc::new(PlaceholderRowExec::new(plan.schema())),
84                )?))
85            } else {
86                plan.map_children(|child| {
87                    self.optimize(child, config).map(Transformed::yes)
88                })
89                .data()
90            }
91        } else {
92            plan.map_children(|child| self.optimize(child, config).map(Transformed::yes))
93                .data()
94        }
95    }
96
97    fn name(&self) -> &str {
98        "aggregate_statistics"
99    }
100
101    /// This rule will change the nullable properties of the schema, disable the schema check.
102    fn schema_check(&self) -> bool {
103        false
104    }
105}
106
107/// assert if the node passed as argument is a final `AggregateExec` node that can be optimized:
108/// - its child (with possible intermediate layers) is a partial `AggregateExec` node
109/// - they both have no grouping expression
110///
111/// If this is the case, return a ref to the partial `AggregateExec`, else `None`.
112/// We would have preferred to return a casted ref to AggregateExec but the recursion requires
113/// the `ExecutionPlan.children()` method that returns an owned reference.
114fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
115    if let Some(final_agg_exec) = node.as_any().downcast_ref::<AggregateExec>() {
116        if !final_agg_exec.mode().is_first_stage()
117            && final_agg_exec.group_expr().is_empty()
118        {
119            let mut child = Arc::clone(final_agg_exec.input());
120            loop {
121                if let Some(partial_agg_exec) =
122                    child.as_any().downcast_ref::<AggregateExec>()
123                {
124                    if partial_agg_exec.mode().is_first_stage()
125                        && partial_agg_exec.group_expr().is_empty()
126                        && partial_agg_exec.filter_expr().iter().all(|e| e.is_none())
127                    {
128                        return Some(child);
129                    }
130                }
131                if let [childrens_child] = child.children().as_slice() {
132                    child = Arc::clone(childrens_child);
133                } else {
134                    break;
135                }
136            }
137        }
138    }
139    None
140}
141
142/// If this agg_expr is a max that is exactly defined in the statistics, return it.
143fn take_optimizable_value_from_statistics(
144    statistics_args: &StatisticsArgs,
145    agg_expr: &AggregateFunctionExpr,
146) -> Option<(ScalarValue, String)> {
147    let value = agg_expr.fun().value_from_stats(statistics_args);
148    value.map(|val| (val, agg_expr.name().to_string()))
149}
150
151// See tests in datafusion/core/tests/physical_optimizer