Skip to main content

datafusion_physical_optimizer/
limited_distinct_aggregation.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A special-case optimizer rule that pushes limit into a grouped aggregation
19//! which has no aggregate expressions or sorting requirements
20
21use std::sync::Arc;
22
23use datafusion_physical_plan::aggregates::{AggregateExec, LimitOptions};
24use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
25use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
26
27use datafusion_common::Result;
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30
31use crate::PhysicalOptimizerRule;
32use itertools::Itertools;
33
34/// An optimizer rule that passes a `limit` hint into grouped aggregations which don't require all
35/// rows in the group to be processed for correctness. Example queries fitting this description are:
36/// - `SELECT distinct l_orderkey FROM lineitem LIMIT 10;`
37/// - `SELECT l_orderkey FROM lineitem GROUP BY l_orderkey LIMIT 10;`
38#[derive(Debug)]
39pub struct LimitedDistinctAggregation {}
40
41impl LimitedDistinctAggregation {
42    /// Create a new `LimitedDistinctAggregation`
43    pub fn new() -> Self {
44        Self {}
45    }
46
47    fn transform_agg(
48        aggr: &AggregateExec,
49        limit: usize,
50    ) -> Option<Arc<dyn ExecutionPlan>> {
51        // rules for transforming this Aggregate are held in this method
52        if !aggr.is_unordered_unfiltered_group_by_distinct() {
53            return None;
54        }
55
56        // We found what we want: clone, copy the limit down, and return modified node
57        let new_aggr = aggr.with_new_limit_options(Some(LimitOptions::new(limit)));
58
59        Some(Arc::new(new_aggr))
60    }
61
62    /// transform_limit matches an `AggregateExec` as the child of a `LocalLimitExec`
63    /// or `GlobalLimitExec` and pushes the limit into the aggregation as a soft limit when
64    /// there is a group by, but no sorting, no aggregate expressions, and no filters in the
65    /// aggregation
66    fn transform_limit(plan: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
67        let limit: usize;
68        let mut global_fetch: Option<usize> = None;
69        let mut global_skip: usize = 0;
70        let children: Vec<Arc<dyn ExecutionPlan>>;
71        let mut is_global_limit = false;
72        if let Some(local_limit) = plan.downcast_ref::<LocalLimitExec>() {
73            limit = local_limit.fetch();
74            children = local_limit.children().into_iter().cloned().collect();
75        } else if let Some(global_limit) = plan.downcast_ref::<GlobalLimitExec>() {
76            global_fetch = global_limit.fetch();
77            global_fetch?;
78            global_skip = global_limit.skip();
79            // the aggregate must read at least fetch+skip number of rows
80            limit = global_fetch.unwrap() + global_skip;
81            children = global_limit.children().into_iter().cloned().collect();
82            is_global_limit = true
83        } else {
84            return None;
85        }
86        let child = children.iter().exactly_one().ok()?;
87        // ensure there is no output ordering; can this rule be relaxed?
88        if plan.output_ordering().is_some() {
89            return None;
90        }
91        // ensure no ordering is required on the input
92        if plan.required_input_ordering()[0].is_some() {
93            return None;
94        }
95
96        // if found_match_aggr is true, match_aggr holds a parent aggregation whose group_by
97        // must match that of a child aggregation in order to rewrite the child aggregation
98        let mut match_aggr: Arc<dyn ExecutionPlan> = plan;
99        let mut found_match_aggr = false;
100
101        let mut rewrite_applicable = true;
102        let closure = |plan: Arc<dyn ExecutionPlan>| {
103            if !rewrite_applicable {
104                return Ok(Transformed::no(plan));
105            }
106            if let Some(aggr) = plan.downcast_ref::<AggregateExec>() {
107                if found_match_aggr
108                    && let Some(parent_aggr) = match_aggr.downcast_ref::<AggregateExec>()
109                    && !parent_aggr.group_expr().eq(aggr.group_expr())
110                {
111                    // a partial and final aggregation with different groupings disqualifies
112                    // rewriting the child aggregation
113                    rewrite_applicable = false;
114                    return Ok(Transformed::no(plan));
115                }
116                // either we run into an Aggregate and transform it, or disable the rewrite
117                // for subsequent children
118                match Self::transform_agg(aggr, limit) {
119                    None => {}
120                    Some(new_aggr) => {
121                        match_aggr = plan;
122                        found_match_aggr = true;
123                        return Ok(Transformed::yes(new_aggr));
124                    }
125                }
126            }
127            rewrite_applicable = false;
128            Ok(Transformed::no(plan))
129        };
130        let child = child.to_owned().transform_down(closure).data().ok()?;
131        if is_global_limit {
132            return Some(Arc::new(GlobalLimitExec::new(
133                child,
134                global_skip,
135                global_fetch,
136            )));
137        }
138        Some(Arc::new(LocalLimitExec::new(child, limit)))
139    }
140}
141
142impl Default for LimitedDistinctAggregation {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl PhysicalOptimizerRule for LimitedDistinctAggregation {
149    fn optimize(
150        &self,
151        plan: Arc<dyn ExecutionPlan>,
152        config: &ConfigOptions,
153    ) -> Result<Arc<dyn ExecutionPlan>> {
154        if config.optimizer.enable_distinct_aggregation_soft_limit {
155            plan.transform_down(|plan| {
156                Ok(
157                    if let Some(plan) =
158                        LimitedDistinctAggregation::transform_limit(plan.to_owned())
159                    {
160                        Transformed::yes(plan)
161                    } else {
162                        Transformed::no(plan)
163                    },
164                )
165            })
166            .data()
167        } else {
168            Ok(plan)
169        }
170    }
171
172    fn name(&self) -> &str {
173        "LimitedDistinctAggregation"
174    }
175
176    fn schema_check(&self) -> bool {
177        true
178    }
179}
180
181// See tests in datafusion/core/tests/physical_optimizer