datafusion_physical_optimizer/
aggregate_statistics.rs1use datafusion_common::config::ConfigOptions;
20use datafusion_common::scalar::ScalarValue;
21use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22use datafusion_common::Result;
23use datafusion_physical_plan::aggregates::AggregateExec;
24use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
25use datafusion_physical_plan::projection::ProjectionExec;
26use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
27use datafusion_physical_plan::{expressions, ExecutionPlan};
28use std::sync::Arc;
29
30use crate::PhysicalOptimizerRule;
31
32#[derive(Default, Debug)]
34pub struct AggregateStatistics {}
35
36impl AggregateStatistics {
37 #[allow(missing_docs)]
38 pub fn new() -> Self {
39 Self {}
40 }
41}
42
43impl PhysicalOptimizerRule for AggregateStatistics {
44 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
45 #[allow(clippy::only_used_in_recursion)] fn optimize(
47 &self,
48 plan: Arc<dyn ExecutionPlan>,
49 config: &ConfigOptions,
50 ) -> Result<Arc<dyn ExecutionPlan>> {
51 if let Some(partial_agg_exec) = take_optimizable(&*plan) {
52 let partial_agg_exec = partial_agg_exec
53 .as_any()
54 .downcast_ref::<AggregateExec>()
55 .expect("take_optimizable() ensures that this is a AggregateExec");
56 let stats = partial_agg_exec.input().partition_statistics(None)?;
57 let mut projections = vec![];
58 for expr in partial_agg_exec.aggr_expr() {
59 let field = expr.field();
60 let args = expr.expressions();
61 let statistics_args = StatisticsArgs {
62 statistics: &stats,
63 return_type: field.data_type(),
64 is_distinct: expr.is_distinct(),
65 exprs: args.as_slice(),
66 };
67 if let Some((optimizable_statistic, name)) =
68 take_optimizable_value_from_statistics(&statistics_args, expr)
69 {
70 projections
71 .push((expressions::lit(optimizable_statistic), name.to_owned()));
72 } else {
73 break;
75 }
76 }
77
78 if projections.len() == partial_agg_exec.aggr_expr().len() {
80 Ok(Arc::new(ProjectionExec::try_new(
82 projections,
83 Arc::new(PlaceholderRowExec::new(plan.schema())),
84 )?))
85 } else {
86 plan.map_children(|child| {
87 self.optimize(child, config).map(Transformed::yes)
88 })
89 .data()
90 }
91 } else {
92 plan.map_children(|child| self.optimize(child, config).map(Transformed::yes))
93 .data()
94 }
95 }
96
97 fn name(&self) -> &str {
98 "aggregate_statistics"
99 }
100
101 fn schema_check(&self) -> bool {
103 false
104 }
105}
106
107fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>> {
115 if let Some(final_agg_exec) = node.as_any().downcast_ref::<AggregateExec>() {
116 if !final_agg_exec.mode().is_first_stage()
117 && final_agg_exec.group_expr().is_empty()
118 {
119 let mut child = Arc::clone(final_agg_exec.input());
120 loop {
121 if let Some(partial_agg_exec) =
122 child.as_any().downcast_ref::<AggregateExec>()
123 {
124 if partial_agg_exec.mode().is_first_stage()
125 && partial_agg_exec.group_expr().is_empty()
126 && partial_agg_exec.filter_expr().iter().all(|e| e.is_none())
127 {
128 return Some(child);
129 }
130 }
131 if let [childrens_child] = child.children().as_slice() {
132 child = Arc::clone(childrens_child);
133 } else {
134 break;
135 }
136 }
137 }
138 }
139 None
140}
141
142fn take_optimizable_value_from_statistics(
144 statistics_args: &StatisticsArgs,
145 agg_expr: &AggregateFunctionExpr,
146) -> Option<(ScalarValue, String)> {
147 let value = agg_expr.fun().value_from_stats(statistics_args);
148 value.map(|val| (val, agg_expr.name().to_string()))
149}
150
151