Skip to main content

datafusion_optimizer/
eliminate_limit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`EliminateLimit`] eliminates `LIMIT` when possible
19use crate::optimizer::ApplyOrder;
20use crate::{OptimizerConfig, OptimizerRule};
21use datafusion_common::Result;
22use datafusion_common::tree_node::Transformed;
23use datafusion_expr::logical_plan::{EmptyRelation, FetchType, LogicalPlan, SkipType};
24use std::sync::Arc;
25
26/// Optimizer rule to replace `LIMIT 0` or `LIMIT` whose ancestor LIMIT's skip is
27/// greater than or equal to current's fetch
28///
29/// It can cooperate with `propagate_empty_relation` and `limit_push_down`. on a
30/// plan with an empty relation.
31///
32/// This rule also removes OFFSET 0 from the [LogicalPlan]
33#[derive(Default, Debug)]
34pub struct EliminateLimit;
35
36impl EliminateLimit {
37    #[expect(missing_docs)]
38    pub fn new() -> Self {
39        Self {}
40    }
41}
42
43impl OptimizerRule for EliminateLimit {
44    fn name(&self) -> &str {
45        "eliminate_limit"
46    }
47
48    fn apply_order(&self) -> Option<ApplyOrder> {
49        Some(ApplyOrder::BottomUp)
50    }
51
52    fn supports_rewrite(&self) -> bool {
53        true
54    }
55
56    fn rewrite(
57        &self,
58        plan: LogicalPlan,
59        _config: &dyn OptimizerConfig,
60    ) -> Result<Transformed<LogicalPlan>, datafusion_common::DataFusionError> {
61        match plan {
62            LogicalPlan::Limit(limit) => {
63                // Only supports rewriting for literal fetch
64                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
65                    return Ok(Transformed::no(LogicalPlan::Limit(limit)));
66                };
67
68                if let Some(v) = fetch {
69                    if v == 0 {
70                        return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
71                            EmptyRelation {
72                                produce_one_row: false,
73                                schema: Arc::clone(limit.input.schema()),
74                            },
75                        )));
76                    }
77                } else if matches!(limit.get_skip_type()?, SkipType::Literal(0)) {
78                    // If fetch is `None` and skip is 0, then Limit takes no effect and
79                    // we can remove it. Its input also can be Limit, so we should apply again.
80                    #[expect(clippy::used_underscore_binding)]
81                    return self.rewrite(Arc::unwrap_or_clone(limit.input), _config);
82                }
83                Ok(Transformed::no(LogicalPlan::Limit(limit)))
84            }
85            _ => Ok(Transformed::no(plan)),
86        }
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::OptimizerContext;
94    use crate::test::*;
95    use datafusion_common::Column;
96    use datafusion_expr::{
97        col,
98        logical_plan::{JoinType, builder::LogicalPlanBuilder},
99    };
100
101    use crate::assert_optimized_plan_eq_snapshot;
102    use crate::push_down_limit::PushDownLimit;
103    use datafusion_expr::test::function_stub::sum;
104
105    macro_rules! assert_optimized_plan_equal {
106        (
107            $plan:expr,
108            @ $expected:literal $(,)?
109        ) => {{
110            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(EliminateLimit::new())];
111            let optimizer_ctx = OptimizerContext::new();
112            assert_optimized_plan_eq_snapshot!(
113                optimizer_ctx,
114                rules,
115                $plan,
116                @ $expected,
117            )
118        }};
119    }
120
121    macro_rules! assert_optimized_plan_eq_with_pushdown {
122        (
123            $plan:expr,
124            @ $expected:literal $(,)?
125        ) => {{
126            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
127            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
128                Arc::new(PushDownLimit::new()),
129                Arc::new(EliminateLimit::new())
130                ];
131            assert_optimized_plan_eq_snapshot!(
132                optimizer_ctx,
133                rules,
134                $plan,
135                @ $expected,
136            )
137        }};
138    }
139
140    #[test]
141    fn limit_0_root() -> Result<()> {
142        let table_scan = test_table_scan().unwrap();
143        let plan = LogicalPlanBuilder::from(table_scan)
144            .aggregate(vec![col("a")], vec![sum(col("b"))])?
145            .limit(0, Some(0))?
146            .build()?;
147        // No aggregate / scan / limit
148        assert_optimized_plan_equal!(
149            plan,
150            @ "EmptyRelation: rows=0"
151        )
152    }
153
154    #[test]
155    fn limit_0_nested() -> Result<()> {
156        let table_scan = test_table_scan()?;
157        let plan1 = LogicalPlanBuilder::from(table_scan.clone())
158            .aggregate(vec![col("a")], vec![sum(col("b"))])?
159            .build()?;
160        let plan = LogicalPlanBuilder::from(table_scan)
161            .aggregate(vec![col("a")], vec![sum(col("b"))])?
162            .limit(0, Some(0))?
163            .union(plan1)?
164            .build()?;
165
166        // Left side is removed
167        assert_optimized_plan_equal!(
168            plan,
169            @ r"
170        Union
171          EmptyRelation: rows=0
172          Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
173            TableScan: test
174        "
175        )
176    }
177
178    #[test]
179    fn limit_fetch_with_ancestor_limit_skip() -> Result<()> {
180        let table_scan = test_table_scan()?;
181        let plan = LogicalPlanBuilder::from(table_scan)
182            .aggregate(vec![col("a")], vec![sum(col("b"))])?
183            .limit(0, Some(2))?
184            .limit(2, None)?
185            .build()?;
186
187        // No aggregate / scan / limit
188        assert_optimized_plan_eq_with_pushdown!(
189            plan,
190            @ "EmptyRelation: rows=0"
191        )
192    }
193
194    #[test]
195    fn multi_limit_offset_sort_eliminate() -> Result<()> {
196        let table_scan = test_table_scan()?;
197        let plan = LogicalPlanBuilder::from(table_scan)
198            .aggregate(vec![col("a")], vec![sum(col("b"))])?
199            .limit(0, Some(2))?
200            .sort_by(vec![col("a")])?
201            .limit(2, Some(1))?
202            .build()?;
203
204        // After remove global-state, we don't record the parent <skip, fetch>
205        // So, bottom don't know parent info, so can't eliminate.
206        assert_optimized_plan_eq_with_pushdown!(
207            plan,
208            @ r"
209        Limit: skip=2, fetch=1
210          Sort: test.a ASC NULLS LAST, fetch=3
211            Limit: skip=0, fetch=2
212              Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
213                TableScan: test
214        "
215        )
216    }
217
218    #[test]
219    fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> {
220        let table_scan = test_table_scan()?;
221        let plan = LogicalPlanBuilder::from(table_scan)
222            .aggregate(vec![col("a")], vec![sum(col("b"))])?
223            .limit(0, Some(2))?
224            .sort_by(vec![col("a")])?
225            .limit(0, Some(1))?
226            .build()?;
227
228        assert_optimized_plan_equal!(
229            plan,
230            @ r"
231        Limit: skip=0, fetch=1
232          Sort: test.a ASC NULLS LAST
233            Limit: skip=0, fetch=2
234              Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
235                TableScan: test
236        "
237        )
238    }
239
240    #[test]
241    fn limit_with_ancestor_limit() -> Result<()> {
242        let table_scan = test_table_scan().unwrap();
243        let plan = LogicalPlanBuilder::from(table_scan)
244            .aggregate(vec![col("a")], vec![sum(col("b"))])?
245            .limit(2, Some(1))?
246            .sort_by(vec![col("a")])?
247            .limit(3, Some(1))?
248            .build()?;
249
250        assert_optimized_plan_equal!(
251            plan,
252            @ r"
253        Limit: skip=3, fetch=1
254          Sort: test.a ASC NULLS LAST
255            Limit: skip=2, fetch=1
256              Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
257                TableScan: test
258        "
259        )
260    }
261
262    #[test]
263    fn limit_join_with_ancestor_limit() -> Result<()> {
264        let table_scan = test_table_scan()?;
265        let table_scan_inner = test_table_scan_with_name("test1")?;
266        let plan = LogicalPlanBuilder::from(table_scan)
267            .limit(2, Some(1))?
268            .join_using(
269                table_scan_inner,
270                JoinType::Inner,
271                vec![Column::from_name("a".to_string())],
272            )?
273            .limit(3, Some(1))?
274            .build()?;
275
276        assert_optimized_plan_equal!(
277            plan,
278            @ r"
279        Limit: skip=3, fetch=1
280          Inner Join: Using test.a = test1.a
281            Limit: skip=2, fetch=1
282              TableScan: test
283            TableScan: test1
284        "
285        )
286    }
287
288    #[test]
289    fn remove_zero_offset() -> Result<()> {
290        let table_scan = test_table_scan()?;
291        let plan = LogicalPlanBuilder::from(table_scan)
292            .aggregate(vec![col("a")], vec![sum(col("b"))])?
293            .limit(0, None)?
294            .build()?;
295
296        assert_optimized_plan_equal!(
297            plan,
298            @ r"
299        Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
300          TableScan: test
301        "
302        )
303    }
304}