datafusion_optimizer/
eliminate_limit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`EliminateLimit`] eliminates `LIMIT` when possible
19use crate::optimizer::ApplyOrder;
20use crate::{OptimizerConfig, OptimizerRule};
21use datafusion_common::tree_node::Transformed;
22use datafusion_common::Result;
23use datafusion_expr::logical_plan::{EmptyRelation, FetchType, LogicalPlan, SkipType};
24use std::sync::Arc;
25
26/// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is
27/// greater than or equal to current's fetch
28///
29/// It can cooperate with `propagate_empty_relation` and `limit_push_down`. on a
30/// plan with an empty relation.
31///
32/// This rule also removes OFFSET 0 from the [LogicalPlan]
33#[derive(Default, Debug)]
34pub struct EliminateLimit;
35
36impl EliminateLimit {
37    #[allow(missing_docs)]
38    pub fn new() -> Self {
39        Self {}
40    }
41}
42
43impl OptimizerRule for EliminateLimit {
44    fn name(&self) -> &str {
45        "eliminate_limit"
46    }
47
48    fn apply_order(&self) -> Option<ApplyOrder> {
49        Some(ApplyOrder::BottomUp)
50    }
51
52    fn supports_rewrite(&self) -> bool {
53        true
54    }
55
56    fn rewrite(
57        &self,
58        plan: LogicalPlan,
59        _config: &dyn OptimizerConfig,
60    ) -> Result<Transformed<LogicalPlan>, datafusion_common::DataFusionError> {
61        match plan {
62            LogicalPlan::Limit(limit) => {
63                // Only supports rewriting for literal fetch
64                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
65                    return Ok(Transformed::no(LogicalPlan::Limit(limit)));
66                };
67
68                if let Some(v) = fetch {
69                    if v == 0 {
70                        return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
71                            EmptyRelation {
72                                produce_one_row: false,
73                                schema: Arc::clone(limit.input.schema()),
74                            },
75                        )));
76                    }
77                } else if matches!(limit.get_skip_type()?, SkipType::Literal(0)) {
78                    // If fetch is `None` and skip is 0, then Limit takes no effect and
79                    // we can remove it. Its input also can be Limit, so we should apply again.
80                    #[allow(clippy::used_underscore_binding)]
81                    return self.rewrite(Arc::unwrap_or_clone(limit.input), _config);
82                }
83                Ok(Transformed::no(LogicalPlan::Limit(limit)))
84            }
85            _ => Ok(Transformed::no(plan)),
86        }
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::test::*;
94    use crate::OptimizerContext;
95    use datafusion_common::Column;
96    use datafusion_expr::{
97        col,
98        logical_plan::{builder::LogicalPlanBuilder, JoinType},
99    };
100    use std::sync::Arc;
101
102    use crate::assert_optimized_plan_eq_snapshot;
103    use crate::push_down_limit::PushDownLimit;
104    use datafusion_expr::test::function_stub::sum;
105
106    macro_rules! assert_optimized_plan_equal {
107        (
108            $plan:expr,
109            @ $expected:literal $(,)?
110        ) => {{
111            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(EliminateLimit::new())];
112            let optimizer_ctx = OptimizerContext::new();
113            assert_optimized_plan_eq_snapshot!(
114                optimizer_ctx,
115                rules,
116                $plan,
117                @ $expected,
118            )
119        }};
120    }
121
122    macro_rules! assert_optimized_plan_eq_with_pushdown {
123        (
124            $plan:expr,
125            @ $expected:literal $(,)?
126        ) => {{
127            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
128            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
129                Arc::new(PushDownLimit::new()),
130                Arc::new(EliminateLimit::new())
131                ];
132            assert_optimized_plan_eq_snapshot!(
133                optimizer_ctx,
134                rules,
135                $plan,
136                @ $expected,
137            )
138        }};
139    }
140
141    #[test]
142    fn limit_0_root() -> Result<()> {
143        let table_scan = test_table_scan().unwrap();
144        let plan = LogicalPlanBuilder::from(table_scan)
145            .aggregate(vec![col("a")], vec![sum(col("b"))])?
146            .limit(0, Some(0))?
147            .build()?;
148        // No aggregate / scan / limit
149        assert_optimized_plan_equal!(
150            plan,
151            @ "EmptyRelation: rows=0"
152        )
153    }
154
155    #[test]
156    fn limit_0_nested() -> Result<()> {
157        let table_scan = test_table_scan()?;
158        let plan1 = LogicalPlanBuilder::from(table_scan.clone())
159            .aggregate(vec![col("a")], vec![sum(col("b"))])?
160            .build()?;
161        let plan = LogicalPlanBuilder::from(table_scan)
162            .aggregate(vec![col("a")], vec![sum(col("b"))])?
163            .limit(0, Some(0))?
164            .union(plan1)?
165            .build()?;
166
167        // Left side is removed
168        assert_optimized_plan_equal!(
169            plan,
170            @ r"
171        Union
172          EmptyRelation: rows=0
173          Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
174            TableScan: test
175        "
176        )
177    }
178
179    #[test]
180    fn limit_fetch_with_ancestor_limit_skip() -> Result<()> {
181        let table_scan = test_table_scan()?;
182        let plan = LogicalPlanBuilder::from(table_scan)
183            .aggregate(vec![col("a")], vec![sum(col("b"))])?
184            .limit(0, Some(2))?
185            .limit(2, None)?
186            .build()?;
187
188        // No aggregate / scan / limit
189        assert_optimized_plan_eq_with_pushdown!(
190            plan,
191            @ "EmptyRelation: rows=0"
192        )
193    }
194
195    #[test]
196    fn multi_limit_offset_sort_eliminate() -> Result<()> {
197        let table_scan = test_table_scan()?;
198        let plan = LogicalPlanBuilder::from(table_scan)
199            .aggregate(vec![col("a")], vec![sum(col("b"))])?
200            .limit(0, Some(2))?
201            .sort_by(vec![col("a")])?
202            .limit(2, Some(1))?
203            .build()?;
204
205        // After remove global-state, we don't record the parent <skip, fetch>
206        // So, bottom don't know parent info, so can't eliminate.
207        assert_optimized_plan_eq_with_pushdown!(
208            plan,
209            @ r"
210        Limit: skip=2, fetch=1
211          Sort: test.a ASC NULLS LAST, fetch=3
212            Limit: skip=0, fetch=2
213              Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
214                TableScan: test
215        "
216        )
217    }
218
219    #[test]
220    fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> {
221        let table_scan = test_table_scan()?;
222        let plan = LogicalPlanBuilder::from(table_scan)
223            .aggregate(vec![col("a")], vec![sum(col("b"))])?
224            .limit(0, Some(2))?
225            .sort_by(vec![col("a")])?
226            .limit(0, Some(1))?
227            .build()?;
228
229        assert_optimized_plan_equal!(
230            plan,
231            @ r"
232        Limit: skip=0, fetch=1
233          Sort: test.a ASC NULLS LAST
234            Limit: skip=0, fetch=2
235              Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
236                TableScan: test
237        "
238        )
239    }
240
241    #[test]
242    fn limit_with_ancestor_limit() -> Result<()> {
243        let table_scan = test_table_scan().unwrap();
244        let plan = LogicalPlanBuilder::from(table_scan)
245            .aggregate(vec![col("a")], vec![sum(col("b"))])?
246            .limit(2, Some(1))?
247            .sort_by(vec![col("a")])?
248            .limit(3, Some(1))?
249            .build()?;
250
251        assert_optimized_plan_equal!(
252            plan,
253            @ r"
254        Limit: skip=3, fetch=1
255          Sort: test.a ASC NULLS LAST
256            Limit: skip=2, fetch=1
257              Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
258                TableScan: test
259        "
260        )
261    }
262
263    #[test]
264    fn limit_join_with_ancestor_limit() -> Result<()> {
265        let table_scan = test_table_scan()?;
266        let table_scan_inner = test_table_scan_with_name("test1")?;
267        let plan = LogicalPlanBuilder::from(table_scan)
268            .limit(2, Some(1))?
269            .join_using(
270                table_scan_inner,
271                JoinType::Inner,
272                vec![Column::from_name("a".to_string())],
273            )?
274            .limit(3, Some(1))?
275            .build()?;
276
277        assert_optimized_plan_equal!(
278            plan,
279            @ r"
280        Limit: skip=3, fetch=1
281          Inner Join: Using test.a = test1.a
282            Limit: skip=2, fetch=1
283              TableScan: test
284            TableScan: test1
285        "
286        )
287    }
288
289    #[test]
290    fn remove_zero_offset() -> Result<()> {
291        let table_scan = test_table_scan()?;
292        let plan = LogicalPlanBuilder::from(table_scan)
293            .aggregate(vec![col("a")], vec![sum(col("b"))])?
294            .limit(0, None)?
295            .build()?;
296
297        assert_optimized_plan_equal!(
298            plan,
299            @ r"
300        Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
301          TableScan: test
302        "
303        )
304    }
305}