datafusion_physical_optimizer/
aggregate_statistics.rs1use datafusion_common::Result;
20use datafusion_common::config::ConfigOptions;
21use datafusion_common::scalar::ScalarValue;
22use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
23use datafusion_physical_plan::aggregates::{
24 AggregateExec, AggregateInputMode, AggregateMode,
25};
26use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
27use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
28use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs};
29use datafusion_physical_plan::{ExecutionPlan, expressions};
30use std::sync::Arc;
31
32use crate::PhysicalOptimizerRule;
33
34#[derive(Default, Debug)]
36pub struct AggregateStatistics {}
37
38impl AggregateStatistics {
39 #[expect(missing_docs)]
40 pub fn new() -> Self {
41 Self {}
42 }
43}
44
45impl PhysicalOptimizerRule for AggregateStatistics {
46 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
47 #[expect(clippy::allow_attributes)] #[allow(clippy::only_used_in_recursion)] fn optimize(
50 &self,
51 plan: Arc<dyn ExecutionPlan>,
52 config: &ConfigOptions,
53 ) -> Result<Arc<dyn ExecutionPlan>> {
54 if let Some(partial_agg_exec) = take_optimizable(&plan) {
55 let partial_agg_exec = partial_agg_exec
56 .downcast_ref::<AggregateExec>()
57 .expect("take_optimizable() ensures that this is a AggregateExec");
58 let stats = partial_agg_exec.input().partition_statistics(None)?;
59 let mut projections = vec![];
60 for expr in partial_agg_exec.aggr_expr() {
61 let field = expr.field();
62 let args = expr.expressions();
63 let statistics_args = StatisticsArgs {
64 statistics: &stats,
65 return_type: field.data_type(),
66 is_distinct: expr.is_distinct(),
67 exprs: args.as_slice(),
68 };
69 if let Some((optimizable_statistic, name)) =
70 take_optimizable_value_from_statistics(&statistics_args, expr)
71 {
72 projections.push(ProjectionExpr {
73 expr: expressions::lit(optimizable_statistic),
74 alias: name.to_owned(),
75 });
76 } else {
77 break;
79 }
80 }
81
82 if projections.len() == partial_agg_exec.aggr_expr().len() {
84 Ok(Arc::new(ProjectionExec::try_new(
86 projections,
87 Arc::new(PlaceholderRowExec::new(plan.schema())),
88 )?))
89 } else {
90 plan.map_children(|child| {
91 self.optimize(child, config).map(Transformed::yes)
92 })
93 .data()
94 }
95 } else {
96 plan.map_children(|child| self.optimize(child, config).map(Transformed::yes))
97 .data()
98 }
99 }
100
101 fn name(&self) -> &str {
102 "aggregate_statistics"
103 }
104
105 fn schema_check(&self) -> bool {
107 false
108 }
109}
110
111fn take_optimizable(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
116 let agg_exec = plan.downcast_ref::<AggregateExec>()?;
117
118 if matches!(
119 agg_exec.mode(),
120 AggregateMode::Single | AggregateMode::SinglePartitioned
121 ) && agg_exec.group_expr().is_empty()
122 && agg_exec.filter_expr().iter().all(|e| e.is_none())
123 {
124 return Some(Arc::clone(plan));
125 }
126
127 if agg_exec.mode().input_mode() == AggregateInputMode::Partial
128 && agg_exec.group_expr().is_empty()
129 {
130 let mut child = Arc::clone(agg_exec.input());
131 loop {
132 if let Some(partial_agg_exec) = child.downcast_ref::<AggregateExec>()
133 && partial_agg_exec.mode().input_mode() == AggregateInputMode::Raw
134 && partial_agg_exec.group_expr().is_empty()
135 && partial_agg_exec.filter_expr().iter().all(|e| e.is_none())
136 {
137 return Some(child);
138 }
139 if let [childrens_child] = child.children().as_slice() {
140 child = Arc::clone(childrens_child);
141 } else {
142 break;
143 }
144 }
145 }
146 None
147}
148
149fn take_optimizable_value_from_statistics(
151 statistics_args: &StatisticsArgs,
152 agg_expr: &AggregateFunctionExpr,
153) -> Option<(ScalarValue, String)> {
154 let value = agg_expr.fun().value_from_stats(statistics_args);
155 value.map(|val| (val, agg_expr.name().to_string()))
156}
157
158