datafusion_physical_optimizer/
limited_distinct_aggregation.rs1use std::sync::Arc;
22
23use datafusion_physical_plan::aggregates::{AggregateExec, LimitOptions};
24use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
25use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
26
27use datafusion_common::Result;
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30
31use crate::PhysicalOptimizerRule;
32use itertools::Itertools;
33
34#[derive(Debug)]
39pub struct LimitedDistinctAggregation {}
40
41impl LimitedDistinctAggregation {
42 pub fn new() -> Self {
44 Self {}
45 }
46
47 fn transform_agg(
48 aggr: &AggregateExec,
49 limit: usize,
50 ) -> Option<Arc<dyn ExecutionPlan>> {
51 if !aggr.is_unordered_unfiltered_group_by_distinct() {
53 return None;
54 }
55
56 let new_aggr = aggr.with_new_limit_options(Some(LimitOptions::new(limit)));
58
59 Some(Arc::new(new_aggr))
60 }
61
62 fn transform_limit(plan: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
67 let limit: usize;
68 let mut global_fetch: Option<usize> = None;
69 let mut global_skip: usize = 0;
70 let children: Vec<Arc<dyn ExecutionPlan>>;
71 let mut is_global_limit = false;
72 if let Some(local_limit) = plan.downcast_ref::<LocalLimitExec>() {
73 limit = local_limit.fetch();
74 children = local_limit.children().into_iter().cloned().collect();
75 } else if let Some(global_limit) = plan.downcast_ref::<GlobalLimitExec>() {
76 global_fetch = global_limit.fetch();
77 global_fetch?;
78 global_skip = global_limit.skip();
79 limit = global_fetch.unwrap() + global_skip;
81 children = global_limit.children().into_iter().cloned().collect();
82 is_global_limit = true
83 } else {
84 return None;
85 }
86 let child = children.iter().exactly_one().ok()?;
87 if plan.output_ordering().is_some() {
89 return None;
90 }
91 if plan.required_input_ordering()[0].is_some() {
93 return None;
94 }
95
96 let mut match_aggr: Arc<dyn ExecutionPlan> = plan;
99 let mut found_match_aggr = false;
100
101 let mut rewrite_applicable = true;
102 let closure = |plan: Arc<dyn ExecutionPlan>| {
103 if !rewrite_applicable {
104 return Ok(Transformed::no(plan));
105 }
106 if let Some(aggr) = plan.downcast_ref::<AggregateExec>() {
107 if found_match_aggr
108 && let Some(parent_aggr) = match_aggr.downcast_ref::<AggregateExec>()
109 && !parent_aggr.group_expr().eq(aggr.group_expr())
110 {
111 rewrite_applicable = false;
114 return Ok(Transformed::no(plan));
115 }
116 match Self::transform_agg(aggr, limit) {
119 None => {}
120 Some(new_aggr) => {
121 match_aggr = plan;
122 found_match_aggr = true;
123 return Ok(Transformed::yes(new_aggr));
124 }
125 }
126 }
127 rewrite_applicable = false;
128 Ok(Transformed::no(plan))
129 };
130 let child = child.to_owned().transform_down(closure).data().ok()?;
131 if is_global_limit {
132 return Some(Arc::new(GlobalLimitExec::new(
133 child,
134 global_skip,
135 global_fetch,
136 )));
137 }
138 Some(Arc::new(LocalLimitExec::new(child, limit)))
139 }
140}
141
142impl Default for LimitedDistinctAggregation {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148impl PhysicalOptimizerRule for LimitedDistinctAggregation {
149 fn optimize(
150 &self,
151 plan: Arc<dyn ExecutionPlan>,
152 config: &ConfigOptions,
153 ) -> Result<Arc<dyn ExecutionPlan>> {
154 if config.optimizer.enable_distinct_aggregation_soft_limit {
155 plan.transform_down(|plan| {
156 Ok(
157 if let Some(plan) =
158 LimitedDistinctAggregation::transform_limit(plan.to_owned())
159 {
160 Transformed::yes(plan)
161 } else {
162 Transformed::no(plan)
163 },
164 )
165 })
166 .data()
167 } else {
168 Ok(plan)
169 }
170 }
171
172 fn name(&self) -> &str {
173 "LimitedDistinctAggregation"
174 }
175
176 fn schema_check(&self) -> bool {
177 true
178 }
179}
180
181