datafusion_physical_optimizer/
limited_distinct_aggregation.rs1use std::sync::Arc;
22
23use datafusion_physical_plan::aggregates::AggregateExec;
24use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
25use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
26
27use datafusion_common::Result;
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30
31use crate::PhysicalOptimizerRule;
32use itertools::Itertools;
33
34#[derive(Debug)]
39pub struct LimitedDistinctAggregation {}
40
41impl LimitedDistinctAggregation {
42 pub fn new() -> Self {
44 Self {}
45 }
46
47 fn transform_agg(
48 aggr: &AggregateExec,
49 limit: usize,
50 ) -> Option<Arc<dyn ExecutionPlan>> {
51 if !aggr.is_unordered_unfiltered_group_by_distinct() {
53 return None;
54 }
55
56 let new_aggr = AggregateExec::try_new(
58 *aggr.mode(),
59 aggr.group_expr().clone(),
60 aggr.aggr_expr().to_vec(),
61 aggr.filter_expr().to_vec(),
62 aggr.input().to_owned(),
63 aggr.input_schema(),
64 )
65 .expect("Unable to copy Aggregate!")
66 .with_limit(Some(limit));
67 Some(Arc::new(new_aggr))
68 }
69
70 fn transform_limit(plan: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
75 let limit: usize;
76 let mut global_fetch: Option<usize> = None;
77 let mut global_skip: usize = 0;
78 let children: Vec<Arc<dyn ExecutionPlan>>;
79 let mut is_global_limit = false;
80 if let Some(local_limit) = plan.as_any().downcast_ref::<LocalLimitExec>() {
81 limit = local_limit.fetch();
82 children = local_limit.children().into_iter().cloned().collect();
83 } else if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>()
84 {
85 global_fetch = global_limit.fetch();
86 global_fetch?;
87 global_skip = global_limit.skip();
88 limit = global_fetch.unwrap() + global_skip;
90 children = global_limit.children().into_iter().cloned().collect();
91 is_global_limit = true
92 } else {
93 return None;
94 }
95 let child = children.iter().exactly_one().ok()?;
96 if plan.output_ordering().is_some() {
98 return None;
99 }
100 if plan.required_input_ordering()[0].is_some() {
102 return None;
103 }
104
105 let mut match_aggr: Arc<dyn ExecutionPlan> = plan;
108 let mut found_match_aggr = false;
109
110 let mut rewrite_applicable = true;
111 let closure = |plan: Arc<dyn ExecutionPlan>| {
112 if !rewrite_applicable {
113 return Ok(Transformed::no(plan));
114 }
115 if let Some(aggr) = plan.as_any().downcast_ref::<AggregateExec>() {
116 if found_match_aggr
117 && let Some(parent_aggr) =
118 match_aggr.as_any().downcast_ref::<AggregateExec>()
119 && !parent_aggr.group_expr().eq(aggr.group_expr())
120 {
121 rewrite_applicable = false;
124 return Ok(Transformed::no(plan));
125 }
126 match Self::transform_agg(aggr, limit) {
129 None => {}
130 Some(new_aggr) => {
131 match_aggr = plan;
132 found_match_aggr = true;
133 return Ok(Transformed::yes(new_aggr));
134 }
135 }
136 }
137 rewrite_applicable = false;
138 Ok(Transformed::no(plan))
139 };
140 let child = child.to_owned().transform_down(closure).data().ok()?;
141 if is_global_limit {
142 return Some(Arc::new(GlobalLimitExec::new(
143 child,
144 global_skip,
145 global_fetch,
146 )));
147 }
148 Some(Arc::new(LocalLimitExec::new(child, limit)))
149 }
150}
151
152impl Default for LimitedDistinctAggregation {
153 fn default() -> Self {
154 Self::new()
155 }
156}
157
158impl PhysicalOptimizerRule for LimitedDistinctAggregation {
159 fn optimize(
160 &self,
161 plan: Arc<dyn ExecutionPlan>,
162 config: &ConfigOptions,
163 ) -> Result<Arc<dyn ExecutionPlan>> {
164 if config.optimizer.enable_distinct_aggregation_soft_limit {
165 plan.transform_down(|plan| {
166 Ok(
167 if let Some(plan) =
168 LimitedDistinctAggregation::transform_limit(plan.to_owned())
169 {
170 Transformed::yes(plan)
171 } else {
172 Transformed::no(plan)
173 },
174 )
175 })
176 .data()
177 } else {
178 Ok(plan)
179 }
180 }
181
182 fn name(&self) -> &str {
183 "LimitedDistinctAggregation"
184 }
185
186 fn schema_check(&self) -> bool {
187 true
188 }
189}
190
191