datafusion_optimizer/
push_down_limit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PushDownLimit`] pushes `LIMIT` earlier in the query plan
19
20use std::cmp::min;
21use std::sync::Arc;
22
23use crate::optimizer::ApplyOrder;
24use crate::{OptimizerConfig, OptimizerRule};
25
26use datafusion_common::tree_node::Transformed;
27use datafusion_common::utils::combine_limit;
28use datafusion_common::Result;
29use datafusion_expr::logical_plan::{Join, JoinType, Limit, LogicalPlan};
30use datafusion_expr::{lit, FetchType, SkipType};
31
32/// Optimization rule that tries to push down `LIMIT`.
33//. It will push down through projection, limits (taking the smaller limit)
34#[derive(Default, Debug)]
35pub struct PushDownLimit {}
36
37impl PushDownLimit {
38    #[allow(missing_docs)]
39    pub fn new() -> Self {
40        Self {}
41    }
42}
43
44/// Push down Limit.
45impl OptimizerRule for PushDownLimit {
46    fn supports_rewrite(&self) -> bool {
47        true
48    }
49
50    fn rewrite(
51        &self,
52        plan: LogicalPlan,
53        _config: &dyn OptimizerConfig,
54    ) -> Result<Transformed<LogicalPlan>> {
55        let LogicalPlan::Limit(mut limit) = plan else {
56            return Ok(Transformed::no(plan));
57        };
58
59        // Currently only rewrite if skip and fetch are both literals
60        let SkipType::Literal(skip) = limit.get_skip_type()? else {
61            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
62        };
63        let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
64            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
65        };
66
67        // Merge the Parent Limit and the Child Limit.
68        if let LogicalPlan::Limit(child) = limit.input.as_ref() {
69            let SkipType::Literal(child_skip) = child.get_skip_type()? else {
70                return Ok(Transformed::no(LogicalPlan::Limit(limit)));
71            };
72            let FetchType::Literal(child_fetch) = child.get_fetch_type()? else {
73                return Ok(Transformed::no(LogicalPlan::Limit(limit)));
74            };
75
76            let (skip, fetch) = combine_limit(skip, fetch, child_skip, child_fetch);
77            let plan = LogicalPlan::Limit(Limit {
78                skip: Some(Box::new(lit(skip as i64))),
79                fetch: fetch.map(|f| Box::new(lit(f as i64))),
80                input: Arc::clone(&child.input),
81            });
82
83            // recursively reapply the rule on the new plan
84            #[allow(clippy::used_underscore_binding)]
85            return self.rewrite(plan, _config);
86        }
87
88        // no fetch to push, so return the original plan
89        let Some(fetch) = fetch else {
90            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
91        };
92
93        match Arc::unwrap_or_clone(limit.input) {
94            LogicalPlan::TableScan(mut scan) => {
95                let rows_needed = if fetch != 0 { fetch + skip } else { 0 };
96                let new_fetch = scan
97                    .fetch
98                    .map(|x| min(x, rows_needed))
99                    .or(Some(rows_needed));
100                if new_fetch == scan.fetch {
101                    original_limit(skip, fetch, LogicalPlan::TableScan(scan))
102                } else {
103                    // push limit into the table scan itself
104                    scan.fetch = scan
105                        .fetch
106                        .map(|x| min(x, rows_needed))
107                        .or(Some(rows_needed));
108                    transformed_limit(skip, fetch, LogicalPlan::TableScan(scan))
109                }
110            }
111            LogicalPlan::Union(mut union) => {
112                // push limits to each input of the union
113                union.inputs = union
114                    .inputs
115                    .into_iter()
116                    .map(|input| make_arc_limit(0, fetch + skip, input))
117                    .collect();
118                transformed_limit(skip, fetch, LogicalPlan::Union(union))
119            }
120
121            LogicalPlan::Join(join) => Ok(push_down_join(join, fetch + skip)
122                .update_data(|join| {
123                    make_limit(skip, fetch, Arc::new(LogicalPlan::Join(join)))
124                })),
125
126            LogicalPlan::Sort(mut sort) => {
127                let new_fetch = {
128                    let sort_fetch = skip + fetch;
129                    Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
130                };
131                if new_fetch == sort.fetch {
132                    if skip > 0 {
133                        original_limit(skip, fetch, LogicalPlan::Sort(sort))
134                    } else {
135                        Ok(Transformed::yes(LogicalPlan::Sort(sort)))
136                    }
137                } else {
138                    sort.fetch = new_fetch;
139                    limit.input = Arc::new(LogicalPlan::Sort(sort));
140                    Ok(Transformed::yes(LogicalPlan::Limit(limit)))
141                }
142            }
143            LogicalPlan::Projection(mut proj) => {
144                // commute
145                limit.input = Arc::clone(&proj.input);
146                let new_limit = LogicalPlan::Limit(limit);
147                proj.input = Arc::new(new_limit);
148                Ok(Transformed::yes(LogicalPlan::Projection(proj)))
149            }
150            LogicalPlan::SubqueryAlias(mut subquery_alias) => {
151                // commute
152                limit.input = Arc::clone(&subquery_alias.input);
153                let new_limit = LogicalPlan::Limit(limit);
154                subquery_alias.input = Arc::new(new_limit);
155                Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias)))
156            }
157            LogicalPlan::Extension(extension_plan)
158                if extension_plan.node.supports_limit_pushdown() =>
159            {
160                let new_children = extension_plan
161                    .node
162                    .inputs()
163                    .into_iter()
164                    .map(|child| {
165                        LogicalPlan::Limit(Limit {
166                            skip: None,
167                            fetch: Some(Box::new(lit((fetch + skip) as i64))),
168                            input: Arc::new(child.clone()),
169                        })
170                    })
171                    .collect::<Vec<_>>();
172
173                // Create a new extension node with updated inputs
174                let child_plan = LogicalPlan::Extension(extension_plan);
175                let new_extension =
176                    child_plan.with_new_exprs(child_plan.expressions(), new_children)?;
177
178                transformed_limit(skip, fetch, new_extension)
179            }
180            input => original_limit(skip, fetch, input),
181        }
182    }
183
184    fn name(&self) -> &str {
185        "push_down_limit"
186    }
187
188    fn apply_order(&self) -> Option<ApplyOrder> {
189        Some(ApplyOrder::TopDown)
190    }
191}
192
193/// Wrap the input plan with a limit node
194///
195/// Original:
196/// ```text
197/// input
198/// ```
199///
200/// Return
201/// ```text
202/// Limit: skip=skip, fetch=fetch
203///  input
204/// ```
205fn make_limit(skip: usize, fetch: usize, input: Arc<LogicalPlan>) -> LogicalPlan {
206    LogicalPlan::Limit(Limit {
207        skip: Some(Box::new(lit(skip as i64))),
208        fetch: Some(Box::new(lit(fetch as i64))),
209        input,
210    })
211}
212
213/// Wrap the input plan with a limit node
214fn make_arc_limit(
215    skip: usize,
216    fetch: usize,
217    input: Arc<LogicalPlan>,
218) -> Arc<LogicalPlan> {
219    Arc::new(make_limit(skip, fetch, input))
220}
221
222/// Returns the original limit (non transformed)
223fn original_limit(
224    skip: usize,
225    fetch: usize,
226    input: LogicalPlan,
227) -> Result<Transformed<LogicalPlan>> {
228    Ok(Transformed::no(make_limit(skip, fetch, Arc::new(input))))
229}
230
231/// Returns the a transformed limit
232fn transformed_limit(
233    skip: usize,
234    fetch: usize,
235    input: LogicalPlan,
236) -> Result<Transformed<LogicalPlan>> {
237    Ok(Transformed::yes(make_limit(skip, fetch, Arc::new(input))))
238}
239
240/// Adds a limit to the inputs of a join, if possible
241fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
242    use JoinType::*;
243
244    // Cross join is the special case of inner join where there is no join condition. see [LogicalPlanBuilder::cross_join]
245    fn is_cross_join(join: &Join) -> bool {
246        join.join_type == Inner && join.on.is_empty() && join.filter.is_none()
247    }
248
249    let (left_limit, right_limit) = if is_cross_join(&join) {
250        (Some(limit), Some(limit))
251    } else {
252        match join.join_type {
253            Left => (Some(limit), None),
254            Right => (None, Some(limit)),
255            _ => (None, None),
256        }
257    };
258
259    if left_limit.is_none() && right_limit.is_none() {
260        return Transformed::no(join);
261    }
262    if let Some(limit) = left_limit {
263        join.left = make_arc_limit(0, limit, join.left);
264    }
265    if let Some(limit) = right_limit {
266        join.right = make_arc_limit(0, limit, join.right);
267    }
268    Transformed::yes(join)
269}
270
271#[cfg(test)]
272mod test {
273    use std::cmp::Ordering;
274    use std::fmt::{Debug, Formatter};
275    use std::vec;
276
277    use super::*;
278    use crate::assert_optimized_plan_eq_snapshot;
279    use crate::test::*;
280
281    use crate::OptimizerContext;
282    use datafusion_common::DFSchemaRef;
283    use datafusion_expr::{
284        col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, Extension,
285        UserDefinedLogicalNodeCore,
286    };
287    use datafusion_functions_aggregate::expr_fn::max;
288
289    macro_rules! assert_optimized_plan_equal {
290        (
291            $plan:expr,
292            @ $expected:literal $(,)?
293        ) => {{
294            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
295            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(PushDownLimit::new())];
296            assert_optimized_plan_eq_snapshot!(
297                optimizer_ctx,
298                rules,
299                $plan,
300                @ $expected,
301            )
302        }};
303    }
304
305    #[derive(Debug, PartialEq, Eq, Hash)]
306    pub struct NoopPlan {
307        input: Vec<LogicalPlan>,
308        schema: DFSchemaRef,
309    }
310
311    // Manual implementation needed because of `schema` field. Comparison excludes this field.
312    impl PartialOrd for NoopPlan {
313        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
314            self.input
315                .partial_cmp(&other.input)
316                // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
317                .filter(|cmp| *cmp != Ordering::Equal || self == other)
318        }
319    }
320
321    impl UserDefinedLogicalNodeCore for NoopPlan {
322        fn name(&self) -> &str {
323            "NoopPlan"
324        }
325
326        fn inputs(&self) -> Vec<&LogicalPlan> {
327            self.input.iter().collect()
328        }
329
330        fn schema(&self) -> &DFSchemaRef {
331            &self.schema
332        }
333
334        fn expressions(&self) -> Vec<Expr> {
335            self.input
336                .iter()
337                .flat_map(|child| child.expressions())
338                .collect()
339        }
340
341        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
342            write!(f, "NoopPlan")
343        }
344
345        fn with_exprs_and_inputs(
346            &self,
347            _exprs: Vec<Expr>,
348            inputs: Vec<LogicalPlan>,
349        ) -> Result<Self> {
350            Ok(Self {
351                input: inputs,
352                schema: Arc::clone(&self.schema),
353            })
354        }
355
356        fn supports_limit_pushdown(&self) -> bool {
357            true // Allow limit push-down
358        }
359    }
360
361    #[derive(Debug, PartialEq, Eq, Hash)]
362    struct NoLimitNoopPlan {
363        input: Vec<LogicalPlan>,
364        schema: DFSchemaRef,
365    }
366
367    // Manual implementation needed because of `schema` field. Comparison excludes this field.
368    impl PartialOrd for NoLimitNoopPlan {
369        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
370            self.input
371                .partial_cmp(&other.input)
372                // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
373                .filter(|cmp| *cmp != Ordering::Equal || self == other)
374        }
375    }
376
377    impl UserDefinedLogicalNodeCore for NoLimitNoopPlan {
378        fn name(&self) -> &str {
379            "NoLimitNoopPlan"
380        }
381
382        fn inputs(&self) -> Vec<&LogicalPlan> {
383            self.input.iter().collect()
384        }
385
386        fn schema(&self) -> &DFSchemaRef {
387            &self.schema
388        }
389
390        fn expressions(&self) -> Vec<Expr> {
391            self.input
392                .iter()
393                .flat_map(|child| child.expressions())
394                .collect()
395        }
396
397        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
398            write!(f, "NoLimitNoopPlan")
399        }
400
401        fn with_exprs_and_inputs(
402            &self,
403            _exprs: Vec<Expr>,
404            inputs: Vec<LogicalPlan>,
405        ) -> Result<Self> {
406            Ok(Self {
407                input: inputs,
408                schema: Arc::clone(&self.schema),
409            })
410        }
411
412        fn supports_limit_pushdown(&self) -> bool {
413            false // Disallow limit push-down by default
414        }
415    }
416    #[test]
417    fn limit_pushdown_basic() -> Result<()> {
418        let table_scan = test_table_scan()?;
419        let noop_plan = LogicalPlan::Extension(Extension {
420            node: Arc::new(NoopPlan {
421                input: vec![table_scan.clone()],
422                schema: Arc::clone(table_scan.schema()),
423            }),
424        });
425
426        let plan = LogicalPlanBuilder::from(noop_plan)
427            .limit(0, Some(1000))?
428            .build()?;
429
430        assert_optimized_plan_equal!(
431            plan,
432            @r"
433        Limit: skip=0, fetch=1000
434          NoopPlan
435            Limit: skip=0, fetch=1000
436              TableScan: test, fetch=1000
437        "
438        )
439    }
440
441    #[test]
442    fn limit_pushdown_with_skip() -> Result<()> {
443        let table_scan = test_table_scan()?;
444        let noop_plan = LogicalPlan::Extension(Extension {
445            node: Arc::new(NoopPlan {
446                input: vec![table_scan.clone()],
447                schema: Arc::clone(table_scan.schema()),
448            }),
449        });
450
451        let plan = LogicalPlanBuilder::from(noop_plan)
452            .limit(10, Some(1000))?
453            .build()?;
454
455        assert_optimized_plan_equal!(
456            plan,
457            @r"
458        Limit: skip=10, fetch=1000
459          NoopPlan
460            Limit: skip=0, fetch=1010
461              TableScan: test, fetch=1010
462        "
463        )
464    }
465
466    #[test]
467    fn limit_pushdown_multiple_limits() -> Result<()> {
468        let table_scan = test_table_scan()?;
469        let noop_plan = LogicalPlan::Extension(Extension {
470            node: Arc::new(NoopPlan {
471                input: vec![table_scan.clone()],
472                schema: Arc::clone(table_scan.schema()),
473            }),
474        });
475
476        let plan = LogicalPlanBuilder::from(noop_plan)
477            .limit(10, Some(1000))?
478            .limit(20, Some(500))?
479            .build()?;
480
481        assert_optimized_plan_equal!(
482            plan,
483            @r"
484        Limit: skip=30, fetch=500
485          NoopPlan
486            Limit: skip=0, fetch=530
487              TableScan: test, fetch=530
488        "
489        )
490    }
491
492    #[test]
493    fn limit_pushdown_multiple_inputs() -> Result<()> {
494        let table_scan = test_table_scan()?;
495        let noop_plan = LogicalPlan::Extension(Extension {
496            node: Arc::new(NoopPlan {
497                input: vec![table_scan.clone(), table_scan.clone()],
498                schema: Arc::clone(table_scan.schema()),
499            }),
500        });
501
502        let plan = LogicalPlanBuilder::from(noop_plan)
503            .limit(0, Some(1000))?
504            .build()?;
505
506        assert_optimized_plan_equal!(
507            plan,
508            @r"
509        Limit: skip=0, fetch=1000
510          NoopPlan
511            Limit: skip=0, fetch=1000
512              TableScan: test, fetch=1000
513            Limit: skip=0, fetch=1000
514              TableScan: test, fetch=1000
515        "
516        )
517    }
518
519    #[test]
520    fn limit_pushdown_disallowed_noop_plan() -> Result<()> {
521        let table_scan = test_table_scan()?;
522        let no_limit_noop_plan = LogicalPlan::Extension(Extension {
523            node: Arc::new(NoLimitNoopPlan {
524                input: vec![table_scan.clone()],
525                schema: Arc::clone(table_scan.schema()),
526            }),
527        });
528
529        let plan = LogicalPlanBuilder::from(no_limit_noop_plan)
530            .limit(0, Some(1000))?
531            .build()?;
532
533        assert_optimized_plan_equal!(
534            plan,
535            @r"
536        Limit: skip=0, fetch=1000
537          NoLimitNoopPlan
538            TableScan: test
539        "
540        )
541    }
542
543    #[test]
544    fn limit_pushdown_projection_table_provider() -> Result<()> {
545        let table_scan = test_table_scan()?;
546
547        let plan = LogicalPlanBuilder::from(table_scan)
548            .project(vec![col("a")])?
549            .limit(0, Some(1000))?
550            .build()?;
551
552        // Should push the limit down to table provider
553        // When it has a select
554        assert_optimized_plan_equal!(
555            plan,
556            @r"
557        Projection: test.a
558          Limit: skip=0, fetch=1000
559            TableScan: test, fetch=1000
560        "
561        )
562    }
563
564    #[test]
565    fn limit_push_down_take_smaller_limit() -> Result<()> {
566        let table_scan = test_table_scan()?;
567
568        let plan = LogicalPlanBuilder::from(table_scan)
569            .limit(0, Some(1000))?
570            .limit(0, Some(10))?
571            .build()?;
572
573        // Should push down the smallest limit
574        // Towards table scan
575        // This rule doesn't replace multiple limits
576        assert_optimized_plan_equal!(
577            plan,
578            @r"
579        Limit: skip=0, fetch=10
580          TableScan: test, fetch=10
581        "
582        )
583    }
584
585    #[test]
586    fn limit_doesnt_push_down_aggregation() -> Result<()> {
587        let table_scan = test_table_scan()?;
588
589        let plan = LogicalPlanBuilder::from(table_scan)
590            .aggregate(vec![col("a")], vec![max(col("b"))])?
591            .limit(0, Some(1000))?
592            .build()?;
593
594        // Limit should *not* push down aggregate node
595        assert_optimized_plan_equal!(
596            plan,
597            @r"
598        Limit: skip=0, fetch=1000
599          Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
600            TableScan: test
601        "
602        )
603    }
604
605    #[test]
606    fn limit_should_push_down_union() -> Result<()> {
607        let table_scan = test_table_scan()?;
608
609        let plan = LogicalPlanBuilder::from(table_scan.clone())
610            .union(LogicalPlanBuilder::from(table_scan).build()?)?
611            .limit(0, Some(1000))?
612            .build()?;
613
614        // Limit should push down through union
615        assert_optimized_plan_equal!(
616            plan,
617            @r"
618        Limit: skip=0, fetch=1000
619          Union
620            Limit: skip=0, fetch=1000
621              TableScan: test, fetch=1000
622            Limit: skip=0, fetch=1000
623              TableScan: test, fetch=1000
624        "
625        )
626    }
627
628    #[test]
629    fn limit_push_down_sort() -> Result<()> {
630        let table_scan = test_table_scan()?;
631
632        let plan = LogicalPlanBuilder::from(table_scan)
633            .sort_by(vec![col("a")])?
634            .limit(0, Some(10))?
635            .build()?;
636
637        // Should push down limit to sort
638        assert_optimized_plan_equal!(
639            plan,
640            @r"
641        Limit: skip=0, fetch=10
642          Sort: test.a ASC NULLS LAST, fetch=10
643            TableScan: test
644        "
645        )
646    }
647
648    #[test]
649    fn limit_push_down_sort_skip() -> Result<()> {
650        let table_scan = test_table_scan()?;
651
652        let plan = LogicalPlanBuilder::from(table_scan)
653            .sort_by(vec![col("a")])?
654            .limit(5, Some(10))?
655            .build()?;
656
657        // Should push down limit to sort
658        assert_optimized_plan_equal!(
659            plan,
660            @r"
661        Limit: skip=5, fetch=10
662          Sort: test.a ASC NULLS LAST, fetch=15
663            TableScan: test
664        "
665        )
666    }
667
668    #[test]
669    fn multi_stage_limit_recursive_to_deeper_limit() -> Result<()> {
670        let table_scan = test_table_scan()?;
671
672        let plan = LogicalPlanBuilder::from(table_scan)
673            .limit(0, Some(1000))?
674            .aggregate(vec![col("a")], vec![max(col("b"))])?
675            .limit(0, Some(10))?
676            .build()?;
677
678        // Limit should use deeper LIMIT 1000, but Limit 10 shouldn't push down aggregation
679        assert_optimized_plan_equal!(
680            plan,
681            @r"
682        Limit: skip=0, fetch=10
683          Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
684            Limit: skip=0, fetch=1000
685              TableScan: test, fetch=1000
686        "
687        )
688    }
689
690    #[test]
691    fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> {
692        let table_scan = test_table_scan()?;
693        let plan = LogicalPlanBuilder::from(table_scan)
694            .limit(10, None)?
695            .build()?;
696
697        // Should not push any limit down to table provider
698        // When it has a select
699        assert_optimized_plan_equal!(
700            plan,
701            @r"
702        Limit: skip=10, fetch=None
703          TableScan: test
704        "
705        )
706    }
707
708    #[test]
709    fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
710        let table_scan = test_table_scan()?;
711
712        let plan = LogicalPlanBuilder::from(table_scan)
713            .project(vec![col("a")])?
714            .limit(10, Some(1000))?
715            .build()?;
716
717        // Should push the limit down to table provider
718        // When it has a select
719        assert_optimized_plan_equal!(
720            plan,
721            @r"
722        Projection: test.a
723          Limit: skip=10, fetch=1000
724            TableScan: test, fetch=1010
725        "
726        )
727    }
728
729    #[test]
730    fn limit_pushdown_with_offset_after_limit() -> Result<()> {
731        let table_scan = test_table_scan()?;
732
733        let plan = LogicalPlanBuilder::from(table_scan)
734            .project(vec![col("a")])?
735            .limit(0, Some(1000))?
736            .limit(10, None)?
737            .build()?;
738
739        assert_optimized_plan_equal!(
740            plan,
741            @r"
742        Projection: test.a
743          Limit: skip=10, fetch=990
744            TableScan: test, fetch=1000
745        "
746        )
747    }
748
749    #[test]
750    fn limit_pushdown_with_limit_after_offset() -> Result<()> {
751        let table_scan = test_table_scan()?;
752
753        let plan = LogicalPlanBuilder::from(table_scan)
754            .project(vec![col("a")])?
755            .limit(10, None)?
756            .limit(0, Some(1000))?
757            .build()?;
758
759        assert_optimized_plan_equal!(
760            plan,
761            @r"
762        Projection: test.a
763          Limit: skip=10, fetch=1000
764            TableScan: test, fetch=1010
765        "
766        )
767    }
768
769    #[test]
770    fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
771        let table_scan = test_table_scan()?;
772
773        let plan = LogicalPlanBuilder::from(table_scan)
774            .limit(10, None)?
775            .limit(0, Some(1000))?
776            .limit(0, Some(10))?
777            .build()?;
778
779        assert_optimized_plan_equal!(
780            plan,
781            @r"
782        Limit: skip=10, fetch=10
783          TableScan: test, fetch=20
784        "
785        )
786    }
787
788    #[test]
789    fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
790        let table_scan = test_table_scan()?;
791
792        let plan = LogicalPlanBuilder::from(table_scan)
793            .aggregate(vec![col("a")], vec![max(col("b"))])?
794            .limit(10, Some(1000))?
795            .build()?;
796
797        // Limit should *not* push down aggregate node
798        assert_optimized_plan_equal!(
799            plan,
800            @r"
801        Limit: skip=10, fetch=1000
802          Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
803            TableScan: test
804        "
805        )
806    }
807
808    #[test]
809    fn limit_should_push_down_with_offset_union() -> Result<()> {
810        let table_scan = test_table_scan()?;
811
812        let plan = LogicalPlanBuilder::from(table_scan.clone())
813            .union(LogicalPlanBuilder::from(table_scan).build()?)?
814            .limit(10, Some(1000))?
815            .build()?;
816
817        // Limit should push down through union
818        assert_optimized_plan_equal!(
819            plan,
820            @r"
821        Limit: skip=10, fetch=1000
822          Union
823            Limit: skip=0, fetch=1010
824              TableScan: test, fetch=1010
825            Limit: skip=0, fetch=1010
826              TableScan: test, fetch=1010
827        "
828        )
829    }
830
831    #[test]
832    fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> {
833        let table_scan_1 = test_table_scan()?;
834        let table_scan_2 = test_table_scan_with_name("test2")?;
835
836        let plan = LogicalPlanBuilder::from(table_scan_1)
837            .join(
838                LogicalPlanBuilder::from(table_scan_2).build()?,
839                JoinType::Inner,
840                (vec!["a"], vec!["a"]),
841                None,
842            )?
843            .limit(10, Some(1000))?
844            .build()?;
845
846        // Limit pushdown Not supported in Join
847        assert_optimized_plan_equal!(
848            plan,
849            @r"
850        Limit: skip=10, fetch=1000
851          Inner Join: test.a = test2.a
852            TableScan: test
853            TableScan: test2
854        "
855        )
856    }
857
858    #[test]
859    fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> {
860        let table_scan_1 = test_table_scan()?;
861        let table_scan_2 = test_table_scan_with_name("test2")?;
862
863        let plan = LogicalPlanBuilder::from(table_scan_1)
864            .join(
865                LogicalPlanBuilder::from(table_scan_2).build()?,
866                JoinType::Inner,
867                (vec!["a"], vec!["a"]),
868                None,
869            )?
870            .limit(10, Some(1000))?
871            .build()?;
872
873        // Limit pushdown Not supported in Join
874        assert_optimized_plan_equal!(
875            plan,
876            @r"
877        Limit: skip=10, fetch=1000
878          Inner Join: test.a = test2.a
879            TableScan: test
880            TableScan: test2
881        "
882        )
883    }
884
885    #[test]
886    fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> {
887        let table_scan_1 = test_table_scan_with_name("test1")?;
888        let table_scan_2 = test_table_scan_with_name("test2")?;
889
890        let subquery = LogicalPlanBuilder::from(table_scan_1)
891            .project(vec![col("a")])?
892            .filter(col("a").eq(col("test1.a")))?
893            .build()?;
894
895        let outer_query = LogicalPlanBuilder::from(table_scan_2)
896            .project(vec![col("a")])?
897            .filter(exists(Arc::new(subquery)))?
898            .limit(10, Some(100))?
899            .build()?;
900
901        // Limit pushdown Not supported in sub_query
902        assert_optimized_plan_equal!(
903            outer_query,
904            @r"
905        Limit: skip=10, fetch=100
906          Filter: EXISTS (<subquery>)
907            Subquery:
908              Filter: test1.a = test1.a
909                Projection: test1.a
910                  TableScan: test1
911            Projection: test2.a
912              TableScan: test2
913        "
914        )
915    }
916
917    #[test]
918    fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
919        let table_scan_1 = test_table_scan_with_name("test1")?;
920        let table_scan_2 = test_table_scan_with_name("test2")?;
921
922        let subquery = LogicalPlanBuilder::from(table_scan_1)
923            .project(vec![col("a")])?
924            .filter(col("a").eq(col("test1.a")))?
925            .build()?;
926
927        let outer_query = LogicalPlanBuilder::from(table_scan_2)
928            .project(vec![col("a")])?
929            .filter(exists(Arc::new(subquery)))?
930            .limit(10, Some(100))?
931            .build()?;
932
933        // Limit pushdown Not supported in sub_query
934        assert_optimized_plan_equal!(
935            outer_query,
936            @r"
937        Limit: skip=10, fetch=100
938          Filter: EXISTS (<subquery>)
939            Subquery:
940              Filter: test1.a = test1.a
941                Projection: test1.a
942                  TableScan: test1
943            Projection: test2.a
944              TableScan: test2
945        "
946        )
947    }
948
949    #[test]
950    fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
951        let table_scan_1 = test_table_scan()?;
952        let table_scan_2 = test_table_scan_with_name("test2")?;
953
954        let plan = LogicalPlanBuilder::from(table_scan_1)
955            .join(
956                LogicalPlanBuilder::from(table_scan_2).build()?,
957                JoinType::Left,
958                (vec!["a"], vec!["a"]),
959                None,
960            )?
961            .limit(10, Some(1000))?
962            .build()?;
963
964        // Limit pushdown Not supported in Join
965        assert_optimized_plan_equal!(
966            plan,
967            @r"
968        Limit: skip=10, fetch=1000
969          Left Join: test.a = test2.a
970            Limit: skip=0, fetch=1010
971              TableScan: test, fetch=1010
972            TableScan: test2
973        "
974        )
975    }
976
977    #[test]
978    fn limit_should_push_down_right_outer_join() -> Result<()> {
979        let table_scan_1 = test_table_scan()?;
980        let table_scan_2 = test_table_scan_with_name("test2")?;
981
982        let plan = LogicalPlanBuilder::from(table_scan_1)
983            .join(
984                LogicalPlanBuilder::from(table_scan_2).build()?,
985                JoinType::Right,
986                (vec!["a"], vec!["a"]),
987                None,
988            )?
989            .limit(0, Some(1000))?
990            .build()?;
991
992        // Limit pushdown Not supported in Join
993        assert_optimized_plan_equal!(
994            plan,
995            @r"
996        Limit: skip=0, fetch=1000
997          Right Join: test.a = test2.a
998            TableScan: test
999            Limit: skip=0, fetch=1000
1000              TableScan: test2, fetch=1000
1001        "
1002        )
1003    }
1004
1005    #[test]
1006    fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> {
1007        let table_scan_1 = test_table_scan()?;
1008        let table_scan_2 = test_table_scan_with_name("test2")?;
1009
1010        let plan = LogicalPlanBuilder::from(table_scan_1)
1011            .join(
1012                LogicalPlanBuilder::from(table_scan_2).build()?,
1013                JoinType::Right,
1014                (vec!["a"], vec!["a"]),
1015                None,
1016            )?
1017            .limit(10, Some(1000))?
1018            .build()?;
1019
1020        // Limit pushdown with offset supported in right outer join
1021        assert_optimized_plan_equal!(
1022            plan,
1023            @r"
1024        Limit: skip=10, fetch=1000
1025          Right Join: test.a = test2.a
1026            TableScan: test
1027            Limit: skip=0, fetch=1010
1028              TableScan: test2, fetch=1010
1029        "
1030        )
1031    }
1032
1033    #[test]
1034    fn limit_push_down_cross_join() -> Result<()> {
1035        let table_scan_1 = test_table_scan()?;
1036        let table_scan_2 = test_table_scan_with_name("test2")?;
1037
1038        let plan = LogicalPlanBuilder::from(table_scan_1)
1039            .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
1040            .limit(0, Some(1000))?
1041            .build()?;
1042
1043        assert_optimized_plan_equal!(
1044            plan,
1045            @r"
1046        Limit: skip=0, fetch=1000
1047          Cross Join: 
1048            Limit: skip=0, fetch=1000
1049              TableScan: test, fetch=1000
1050            Limit: skip=0, fetch=1000
1051              TableScan: test2, fetch=1000
1052        "
1053        )
1054    }
1055
1056    #[test]
1057    fn skip_limit_push_down_cross_join() -> Result<()> {
1058        let table_scan_1 = test_table_scan()?;
1059        let table_scan_2 = test_table_scan_with_name("test2")?;
1060
1061        let plan = LogicalPlanBuilder::from(table_scan_1)
1062            .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
1063            .limit(1000, Some(1000))?
1064            .build()?;
1065
1066        assert_optimized_plan_equal!(
1067            plan,
1068            @r"
1069        Limit: skip=1000, fetch=1000
1070          Cross Join: 
1071            Limit: skip=0, fetch=2000
1072              TableScan: test, fetch=2000
1073            Limit: skip=0, fetch=2000
1074              TableScan: test2, fetch=2000
1075        "
1076        )
1077    }
1078
1079    #[test]
1080    fn merge_limit_result_empty() -> Result<()> {
1081        let scan = test_table_scan()?;
1082
1083        let plan = LogicalPlanBuilder::from(scan)
1084            .limit(0, Some(1000))?
1085            .limit(1000, None)?
1086            .build()?;
1087
1088        assert_optimized_plan_equal!(
1089            plan,
1090            @r"
1091        Limit: skip=1000, fetch=0
1092          TableScan: test, fetch=0
1093        "
1094        )
1095    }
1096
1097    #[test]
1098    fn skip_great_than_fetch() -> Result<()> {
1099        let scan = test_table_scan()?;
1100
1101        let plan = LogicalPlanBuilder::from(scan)
1102            .limit(0, Some(1))?
1103            .limit(1000, None)?
1104            .build()?;
1105
1106        assert_optimized_plan_equal!(
1107            plan,
1108            @r"
1109        Limit: skip=1000, fetch=0
1110          TableScan: test, fetch=0
1111        "
1112        )
1113    }
1114
1115    #[test]
1116    fn push_down_subquery_alias() -> Result<()> {
1117        let scan = test_table_scan()?;
1118
1119        let plan = LogicalPlanBuilder::from(scan)
1120            .alias("a")?
1121            .limit(0, Some(1))?
1122            .limit(1000, None)?
1123            .build()?;
1124
1125        assert_optimized_plan_equal!(
1126            plan,
1127            @r"
1128        SubqueryAlias: a
1129          Limit: skip=1000, fetch=0
1130            TableScan: test, fetch=0
1131        "
1132        )
1133    }
1134}