datafusion_optimizer/
push_down_limit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PushDownLimit`] pushes `LIMIT` earlier in the query plan
19
20use std::cmp::min;
21use std::sync::Arc;
22
23use crate::optimizer::ApplyOrder;
24use crate::{OptimizerConfig, OptimizerRule};
25
26use datafusion_common::tree_node::Transformed;
27use datafusion_common::utils::combine_limit;
28use datafusion_common::Result;
29use datafusion_expr::logical_plan::{Join, JoinType, Limit, LogicalPlan};
30use datafusion_expr::{lit, FetchType, SkipType};
31
32/// Optimization rule that tries to push down `LIMIT`.
33///
34//. It will push down through projection, limits (taking the smaller limit)
35#[derive(Default, Debug)]
36pub struct PushDownLimit {}
37
38impl PushDownLimit {
39    #[allow(missing_docs)]
40    pub fn new() -> Self {
41        Self {}
42    }
43}
44
45/// Push down Limit.
46impl OptimizerRule for PushDownLimit {
47    fn supports_rewrite(&self) -> bool {
48        true
49    }
50
51    fn rewrite(
52        &self,
53        plan: LogicalPlan,
54        _config: &dyn OptimizerConfig,
55    ) -> Result<Transformed<LogicalPlan>> {
56        let LogicalPlan::Limit(mut limit) = plan else {
57            return Ok(Transformed::no(plan));
58        };
59
60        // Currently only rewrite if skip and fetch are both literals
61        let SkipType::Literal(skip) = limit.get_skip_type()? else {
62            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
63        };
64        let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
65            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
66        };
67
68        // Merge the Parent Limit and the Child Limit.
69        if let LogicalPlan::Limit(child) = limit.input.as_ref() {
70            let SkipType::Literal(child_skip) = child.get_skip_type()? else {
71                return Ok(Transformed::no(LogicalPlan::Limit(limit)));
72            };
73            let FetchType::Literal(child_fetch) = child.get_fetch_type()? else {
74                return Ok(Transformed::no(LogicalPlan::Limit(limit)));
75            };
76
77            let (skip, fetch) = combine_limit(skip, fetch, child_skip, child_fetch);
78            let plan = LogicalPlan::Limit(Limit {
79                skip: Some(Box::new(lit(skip as i64))),
80                fetch: fetch.map(|f| Box::new(lit(f as i64))),
81                input: Arc::clone(&child.input),
82            });
83
84            // recursively reapply the rule on the new plan
85            #[allow(clippy::used_underscore_binding)]
86            return self.rewrite(plan, _config);
87        }
88
89        // no fetch to push, so return the original plan
90        let Some(fetch) = fetch else {
91            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
92        };
93
94        match Arc::unwrap_or_clone(limit.input) {
95            LogicalPlan::TableScan(mut scan) => {
96                let rows_needed = if fetch != 0 { fetch + skip } else { 0 };
97                let new_fetch = scan
98                    .fetch
99                    .map(|x| min(x, rows_needed))
100                    .or(Some(rows_needed));
101                if new_fetch == scan.fetch {
102                    original_limit(skip, fetch, LogicalPlan::TableScan(scan))
103                } else {
104                    // push limit into the table scan itself
105                    scan.fetch = scan
106                        .fetch
107                        .map(|x| min(x, rows_needed))
108                        .or(Some(rows_needed));
109                    transformed_limit(skip, fetch, LogicalPlan::TableScan(scan))
110                }
111            }
112            LogicalPlan::Union(mut union) => {
113                // push limits to each input of the union
114                union.inputs = union
115                    .inputs
116                    .into_iter()
117                    .map(|input| make_arc_limit(0, fetch + skip, input))
118                    .collect();
119                transformed_limit(skip, fetch, LogicalPlan::Union(union))
120            }
121
122            LogicalPlan::Join(join) => Ok(push_down_join(join, fetch + skip)
123                .update_data(|join| {
124                    make_limit(skip, fetch, Arc::new(LogicalPlan::Join(join)))
125                })),
126
127            LogicalPlan::Sort(mut sort) => {
128                let new_fetch = {
129                    let sort_fetch = skip + fetch;
130                    Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
131                };
132                if new_fetch == sort.fetch {
133                    if skip > 0 {
134                        original_limit(skip, fetch, LogicalPlan::Sort(sort))
135                    } else {
136                        Ok(Transformed::yes(LogicalPlan::Sort(sort)))
137                    }
138                } else {
139                    sort.fetch = new_fetch;
140                    limit.input = Arc::new(LogicalPlan::Sort(sort));
141                    Ok(Transformed::yes(LogicalPlan::Limit(limit)))
142                }
143            }
144            LogicalPlan::Projection(mut proj) => {
145                // commute
146                limit.input = Arc::clone(&proj.input);
147                let new_limit = LogicalPlan::Limit(limit);
148                proj.input = Arc::new(new_limit);
149                Ok(Transformed::yes(LogicalPlan::Projection(proj)))
150            }
151            LogicalPlan::SubqueryAlias(mut subquery_alias) => {
152                // commute
153                limit.input = Arc::clone(&subquery_alias.input);
154                let new_limit = LogicalPlan::Limit(limit);
155                subquery_alias.input = Arc::new(new_limit);
156                Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias)))
157            }
158            LogicalPlan::Extension(extension_plan)
159                if extension_plan.node.supports_limit_pushdown() =>
160            {
161                let new_children = extension_plan
162                    .node
163                    .inputs()
164                    .into_iter()
165                    .map(|child| {
166                        LogicalPlan::Limit(Limit {
167                            skip: None,
168                            fetch: Some(Box::new(lit((fetch + skip) as i64))),
169                            input: Arc::new(child.clone()),
170                        })
171                    })
172                    .collect::<Vec<_>>();
173
174                // Create a new extension node with updated inputs
175                let child_plan = LogicalPlan::Extension(extension_plan);
176                let new_extension =
177                    child_plan.with_new_exprs(child_plan.expressions(), new_children)?;
178
179                transformed_limit(skip, fetch, new_extension)
180            }
181            input => original_limit(skip, fetch, input),
182        }
183    }
184
185    fn name(&self) -> &str {
186        "push_down_limit"
187    }
188
189    fn apply_order(&self) -> Option<ApplyOrder> {
190        Some(ApplyOrder::TopDown)
191    }
192}
193
194/// Wrap the input plan with a limit node
195///
196/// Original:
197/// ```text
198/// input
199/// ```
200///
201/// Return
202/// ```text
203/// Limit: skip=skip, fetch=fetch
204///  input
205/// ```
206fn make_limit(skip: usize, fetch: usize, input: Arc<LogicalPlan>) -> LogicalPlan {
207    LogicalPlan::Limit(Limit {
208        skip: Some(Box::new(lit(skip as i64))),
209        fetch: Some(Box::new(lit(fetch as i64))),
210        input,
211    })
212}
213
214/// Wrap the input plan with a limit node
215fn make_arc_limit(
216    skip: usize,
217    fetch: usize,
218    input: Arc<LogicalPlan>,
219) -> Arc<LogicalPlan> {
220    Arc::new(make_limit(skip, fetch, input))
221}
222
223/// Returns the original limit (non transformed)
224fn original_limit(
225    skip: usize,
226    fetch: usize,
227    input: LogicalPlan,
228) -> Result<Transformed<LogicalPlan>> {
229    Ok(Transformed::no(make_limit(skip, fetch, Arc::new(input))))
230}
231
232/// Returns the a transformed limit
233fn transformed_limit(
234    skip: usize,
235    fetch: usize,
236    input: LogicalPlan,
237) -> Result<Transformed<LogicalPlan>> {
238    Ok(Transformed::yes(make_limit(skip, fetch, Arc::new(input))))
239}
240
241/// Adds a limit to the inputs of a join, if possible
242fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
243    use JoinType::*;
244
245    // Cross join is the special case of inner join where there is no join condition. see [LogicalPlanBuilder::cross_join]
246    fn is_cross_join(join: &Join) -> bool {
247        join.join_type == Inner && join.on.is_empty() && join.filter.is_none()
248    }
249
250    let (left_limit, right_limit) = if is_cross_join(&join) {
251        (Some(limit), Some(limit))
252    } else {
253        match join.join_type {
254            Left => (Some(limit), None),
255            Right => (None, Some(limit)),
256            _ => (None, None),
257        }
258    };
259
260    if left_limit.is_none() && right_limit.is_none() {
261        return Transformed::no(join);
262    }
263    if let Some(limit) = left_limit {
264        join.left = make_arc_limit(0, limit, join.left);
265    }
266    if let Some(limit) = right_limit {
267        join.right = make_arc_limit(0, limit, join.right);
268    }
269    Transformed::yes(join)
270}
271
272#[cfg(test)]
273mod test {
274    use std::cmp::Ordering;
275    use std::fmt::{Debug, Formatter};
276    use std::vec;
277
278    use super::*;
279    use crate::assert_optimized_plan_eq_snapshot;
280    use crate::test::*;
281
282    use crate::OptimizerContext;
283    use datafusion_common::DFSchemaRef;
284    use datafusion_expr::{
285        col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, Extension,
286        UserDefinedLogicalNodeCore,
287    };
288    use datafusion_functions_aggregate::expr_fn::max;
289
290    macro_rules! assert_optimized_plan_equal {
291        (
292            $plan:expr,
293            @ $expected:literal $(,)?
294        ) => {{
295            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
296            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(PushDownLimit::new())];
297            assert_optimized_plan_eq_snapshot!(
298                optimizer_ctx,
299                rules,
300                $plan,
301                @ $expected,
302            )
303        }};
304    }
305
306    #[derive(Debug, PartialEq, Eq, Hash)]
307    pub struct NoopPlan {
308        input: Vec<LogicalPlan>,
309        schema: DFSchemaRef,
310    }
311
312    // Manual implementation needed because of `schema` field. Comparison excludes this field.
313    impl PartialOrd for NoopPlan {
314        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
315            self.input.partial_cmp(&other.input)
316        }
317    }
318
319    impl UserDefinedLogicalNodeCore for NoopPlan {
320        fn name(&self) -> &str {
321            "NoopPlan"
322        }
323
324        fn inputs(&self) -> Vec<&LogicalPlan> {
325            self.input.iter().collect()
326        }
327
328        fn schema(&self) -> &DFSchemaRef {
329            &self.schema
330        }
331
332        fn expressions(&self) -> Vec<Expr> {
333            self.input
334                .iter()
335                .flat_map(|child| child.expressions())
336                .collect()
337        }
338
339        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
340            write!(f, "NoopPlan")
341        }
342
343        fn with_exprs_and_inputs(
344            &self,
345            _exprs: Vec<Expr>,
346            inputs: Vec<LogicalPlan>,
347        ) -> Result<Self> {
348            Ok(Self {
349                input: inputs,
350                schema: Arc::clone(&self.schema),
351            })
352        }
353
354        fn supports_limit_pushdown(&self) -> bool {
355            true // Allow limit push-down
356        }
357    }
358
359    #[derive(Debug, PartialEq, Eq, Hash)]
360    struct NoLimitNoopPlan {
361        input: Vec<LogicalPlan>,
362        schema: DFSchemaRef,
363    }
364
365    // Manual implementation needed because of `schema` field. Comparison excludes this field.
366    impl PartialOrd for NoLimitNoopPlan {
367        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
368            self.input.partial_cmp(&other.input)
369        }
370    }
371
372    impl UserDefinedLogicalNodeCore for NoLimitNoopPlan {
373        fn name(&self) -> &str {
374            "NoLimitNoopPlan"
375        }
376
377        fn inputs(&self) -> Vec<&LogicalPlan> {
378            self.input.iter().collect()
379        }
380
381        fn schema(&self) -> &DFSchemaRef {
382            &self.schema
383        }
384
385        fn expressions(&self) -> Vec<Expr> {
386            self.input
387                .iter()
388                .flat_map(|child| child.expressions())
389                .collect()
390        }
391
392        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
393            write!(f, "NoLimitNoopPlan")
394        }
395
396        fn with_exprs_and_inputs(
397            &self,
398            _exprs: Vec<Expr>,
399            inputs: Vec<LogicalPlan>,
400        ) -> Result<Self> {
401            Ok(Self {
402                input: inputs,
403                schema: Arc::clone(&self.schema),
404            })
405        }
406
407        fn supports_limit_pushdown(&self) -> bool {
408            false // Disallow limit push-down by default
409        }
410    }
411    #[test]
412    fn limit_pushdown_basic() -> Result<()> {
413        let table_scan = test_table_scan()?;
414        let noop_plan = LogicalPlan::Extension(Extension {
415            node: Arc::new(NoopPlan {
416                input: vec![table_scan.clone()],
417                schema: Arc::clone(table_scan.schema()),
418            }),
419        });
420
421        let plan = LogicalPlanBuilder::from(noop_plan)
422            .limit(0, Some(1000))?
423            .build()?;
424
425        assert_optimized_plan_equal!(
426            plan,
427            @r"
428        Limit: skip=0, fetch=1000
429          NoopPlan
430            Limit: skip=0, fetch=1000
431              TableScan: test, fetch=1000
432        "
433        )
434    }
435
436    #[test]
437    fn limit_pushdown_with_skip() -> Result<()> {
438        let table_scan = test_table_scan()?;
439        let noop_plan = LogicalPlan::Extension(Extension {
440            node: Arc::new(NoopPlan {
441                input: vec![table_scan.clone()],
442                schema: Arc::clone(table_scan.schema()),
443            }),
444        });
445
446        let plan = LogicalPlanBuilder::from(noop_plan)
447            .limit(10, Some(1000))?
448            .build()?;
449
450        assert_optimized_plan_equal!(
451            plan,
452            @r"
453        Limit: skip=10, fetch=1000
454          NoopPlan
455            Limit: skip=0, fetch=1010
456              TableScan: test, fetch=1010
457        "
458        )
459    }
460
461    #[test]
462    fn limit_pushdown_multiple_limits() -> Result<()> {
463        let table_scan = test_table_scan()?;
464        let noop_plan = LogicalPlan::Extension(Extension {
465            node: Arc::new(NoopPlan {
466                input: vec![table_scan.clone()],
467                schema: Arc::clone(table_scan.schema()),
468            }),
469        });
470
471        let plan = LogicalPlanBuilder::from(noop_plan)
472            .limit(10, Some(1000))?
473            .limit(20, Some(500))?
474            .build()?;
475
476        assert_optimized_plan_equal!(
477            plan,
478            @r"
479        Limit: skip=30, fetch=500
480          NoopPlan
481            Limit: skip=0, fetch=530
482              TableScan: test, fetch=530
483        "
484        )
485    }
486
487    #[test]
488    fn limit_pushdown_multiple_inputs() -> Result<()> {
489        let table_scan = test_table_scan()?;
490        let noop_plan = LogicalPlan::Extension(Extension {
491            node: Arc::new(NoopPlan {
492                input: vec![table_scan.clone(), table_scan.clone()],
493                schema: Arc::clone(table_scan.schema()),
494            }),
495        });
496
497        let plan = LogicalPlanBuilder::from(noop_plan)
498            .limit(0, Some(1000))?
499            .build()?;
500
501        assert_optimized_plan_equal!(
502            plan,
503            @r"
504        Limit: skip=0, fetch=1000
505          NoopPlan
506            Limit: skip=0, fetch=1000
507              TableScan: test, fetch=1000
508            Limit: skip=0, fetch=1000
509              TableScan: test, fetch=1000
510        "
511        )
512    }
513
514    #[test]
515    fn limit_pushdown_disallowed_noop_plan() -> Result<()> {
516        let table_scan = test_table_scan()?;
517        let no_limit_noop_plan = LogicalPlan::Extension(Extension {
518            node: Arc::new(NoLimitNoopPlan {
519                input: vec![table_scan.clone()],
520                schema: Arc::clone(table_scan.schema()),
521            }),
522        });
523
524        let plan = LogicalPlanBuilder::from(no_limit_noop_plan)
525            .limit(0, Some(1000))?
526            .build()?;
527
528        assert_optimized_plan_equal!(
529            plan,
530            @r"
531        Limit: skip=0, fetch=1000
532          NoLimitNoopPlan
533            TableScan: test
534        "
535        )
536    }
537
538    #[test]
539    fn limit_pushdown_projection_table_provider() -> Result<()> {
540        let table_scan = test_table_scan()?;
541
542        let plan = LogicalPlanBuilder::from(table_scan)
543            .project(vec![col("a")])?
544            .limit(0, Some(1000))?
545            .build()?;
546
547        // Should push the limit down to table provider
548        // When it has a select
549        assert_optimized_plan_equal!(
550            plan,
551            @r"
552        Projection: test.a
553          Limit: skip=0, fetch=1000
554            TableScan: test, fetch=1000
555        "
556        )
557    }
558
559    #[test]
560    fn limit_push_down_take_smaller_limit() -> Result<()> {
561        let table_scan = test_table_scan()?;
562
563        let plan = LogicalPlanBuilder::from(table_scan)
564            .limit(0, Some(1000))?
565            .limit(0, Some(10))?
566            .build()?;
567
568        // Should push down the smallest limit
569        // Towards table scan
570        // This rule doesn't replace multiple limits
571        assert_optimized_plan_equal!(
572            plan,
573            @r"
574        Limit: skip=0, fetch=10
575          TableScan: test, fetch=10
576        "
577        )
578    }
579
580    #[test]
581    fn limit_doesnt_push_down_aggregation() -> Result<()> {
582        let table_scan = test_table_scan()?;
583
584        let plan = LogicalPlanBuilder::from(table_scan)
585            .aggregate(vec![col("a")], vec![max(col("b"))])?
586            .limit(0, Some(1000))?
587            .build()?;
588
589        // Limit should *not* push down aggregate node
590        assert_optimized_plan_equal!(
591            plan,
592            @r"
593        Limit: skip=0, fetch=1000
594          Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
595            TableScan: test
596        "
597        )
598    }
599
600    #[test]
601    fn limit_should_push_down_union() -> Result<()> {
602        let table_scan = test_table_scan()?;
603
604        let plan = LogicalPlanBuilder::from(table_scan.clone())
605            .union(LogicalPlanBuilder::from(table_scan).build()?)?
606            .limit(0, Some(1000))?
607            .build()?;
608
609        // Limit should push down through union
610        assert_optimized_plan_equal!(
611            plan,
612            @r"
613        Limit: skip=0, fetch=1000
614          Union
615            Limit: skip=0, fetch=1000
616              TableScan: test, fetch=1000
617            Limit: skip=0, fetch=1000
618              TableScan: test, fetch=1000
619        "
620        )
621    }
622
623    #[test]
624    fn limit_push_down_sort() -> Result<()> {
625        let table_scan = test_table_scan()?;
626
627        let plan = LogicalPlanBuilder::from(table_scan)
628            .sort_by(vec![col("a")])?
629            .limit(0, Some(10))?
630            .build()?;
631
632        // Should push down limit to sort
633        assert_optimized_plan_equal!(
634            plan,
635            @r"
636        Limit: skip=0, fetch=10
637          Sort: test.a ASC NULLS LAST, fetch=10
638            TableScan: test
639        "
640        )
641    }
642
643    #[test]
644    fn limit_push_down_sort_skip() -> Result<()> {
645        let table_scan = test_table_scan()?;
646
647        let plan = LogicalPlanBuilder::from(table_scan)
648            .sort_by(vec![col("a")])?
649            .limit(5, Some(10))?
650            .build()?;
651
652        // Should push down limit to sort
653        assert_optimized_plan_equal!(
654            plan,
655            @r"
656        Limit: skip=5, fetch=10
657          Sort: test.a ASC NULLS LAST, fetch=15
658            TableScan: test
659        "
660        )
661    }
662
663    #[test]
664    fn multi_stage_limit_recursive_to_deeper_limit() -> Result<()> {
665        let table_scan = test_table_scan()?;
666
667        let plan = LogicalPlanBuilder::from(table_scan)
668            .limit(0, Some(1000))?
669            .aggregate(vec![col("a")], vec![max(col("b"))])?
670            .limit(0, Some(10))?
671            .build()?;
672
673        // Limit should use deeper LIMIT 1000, but Limit 10 shouldn't push down aggregation
674        assert_optimized_plan_equal!(
675            plan,
676            @r"
677        Limit: skip=0, fetch=10
678          Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
679            Limit: skip=0, fetch=1000
680              TableScan: test, fetch=1000
681        "
682        )
683    }
684
685    #[test]
686    fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> {
687        let table_scan = test_table_scan()?;
688        let plan = LogicalPlanBuilder::from(table_scan)
689            .limit(10, None)?
690            .build()?;
691
692        // Should not push any limit down to table provider
693        // When it has a select
694        assert_optimized_plan_equal!(
695            plan,
696            @r"
697        Limit: skip=10, fetch=None
698          TableScan: test
699        "
700        )
701    }
702
703    #[test]
704    fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
705        let table_scan = test_table_scan()?;
706
707        let plan = LogicalPlanBuilder::from(table_scan)
708            .project(vec![col("a")])?
709            .limit(10, Some(1000))?
710            .build()?;
711
712        // Should push the limit down to table provider
713        // When it has a select
714        assert_optimized_plan_equal!(
715            plan,
716            @r"
717        Projection: test.a
718          Limit: skip=10, fetch=1000
719            TableScan: test, fetch=1010
720        "
721        )
722    }
723
724    #[test]
725    fn limit_pushdown_with_offset_after_limit() -> Result<()> {
726        let table_scan = test_table_scan()?;
727
728        let plan = LogicalPlanBuilder::from(table_scan)
729            .project(vec![col("a")])?
730            .limit(0, Some(1000))?
731            .limit(10, None)?
732            .build()?;
733
734        assert_optimized_plan_equal!(
735            plan,
736            @r"
737        Projection: test.a
738          Limit: skip=10, fetch=990
739            TableScan: test, fetch=1000
740        "
741        )
742    }
743
744    #[test]
745    fn limit_pushdown_with_limit_after_offset() -> Result<()> {
746        let table_scan = test_table_scan()?;
747
748        let plan = LogicalPlanBuilder::from(table_scan)
749            .project(vec![col("a")])?
750            .limit(10, None)?
751            .limit(0, Some(1000))?
752            .build()?;
753
754        assert_optimized_plan_equal!(
755            plan,
756            @r"
757        Projection: test.a
758          Limit: skip=10, fetch=1000
759            TableScan: test, fetch=1010
760        "
761        )
762    }
763
764    #[test]
765    fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
766        let table_scan = test_table_scan()?;
767
768        let plan = LogicalPlanBuilder::from(table_scan)
769            .limit(10, None)?
770            .limit(0, Some(1000))?
771            .limit(0, Some(10))?
772            .build()?;
773
774        assert_optimized_plan_equal!(
775            plan,
776            @r"
777        Limit: skip=10, fetch=10
778          TableScan: test, fetch=20
779        "
780        )
781    }
782
783    #[test]
784    fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
785        let table_scan = test_table_scan()?;
786
787        let plan = LogicalPlanBuilder::from(table_scan)
788            .aggregate(vec![col("a")], vec![max(col("b"))])?
789            .limit(10, Some(1000))?
790            .build()?;
791
792        // Limit should *not* push down aggregate node
793        assert_optimized_plan_equal!(
794            plan,
795            @r"
796        Limit: skip=10, fetch=1000
797          Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
798            TableScan: test
799        "
800        )
801    }
802
803    #[test]
804    fn limit_should_push_down_with_offset_union() -> Result<()> {
805        let table_scan = test_table_scan()?;
806
807        let plan = LogicalPlanBuilder::from(table_scan.clone())
808            .union(LogicalPlanBuilder::from(table_scan).build()?)?
809            .limit(10, Some(1000))?
810            .build()?;
811
812        // Limit should push down through union
813        assert_optimized_plan_equal!(
814            plan,
815            @r"
816        Limit: skip=10, fetch=1000
817          Union
818            Limit: skip=0, fetch=1010
819              TableScan: test, fetch=1010
820            Limit: skip=0, fetch=1010
821              TableScan: test, fetch=1010
822        "
823        )
824    }
825
826    #[test]
827    fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> {
828        let table_scan_1 = test_table_scan()?;
829        let table_scan_2 = test_table_scan_with_name("test2")?;
830
831        let plan = LogicalPlanBuilder::from(table_scan_1)
832            .join(
833                LogicalPlanBuilder::from(table_scan_2).build()?,
834                JoinType::Inner,
835                (vec!["a"], vec!["a"]),
836                None,
837            )?
838            .limit(10, Some(1000))?
839            .build()?;
840
841        // Limit pushdown Not supported in Join
842        assert_optimized_plan_equal!(
843            plan,
844            @r"
845        Limit: skip=10, fetch=1000
846          Inner Join: test.a = test2.a
847            TableScan: test
848            TableScan: test2
849        "
850        )
851    }
852
853    #[test]
854    fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> {
855        let table_scan_1 = test_table_scan()?;
856        let table_scan_2 = test_table_scan_with_name("test2")?;
857
858        let plan = LogicalPlanBuilder::from(table_scan_1)
859            .join(
860                LogicalPlanBuilder::from(table_scan_2).build()?,
861                JoinType::Inner,
862                (vec!["a"], vec!["a"]),
863                None,
864            )?
865            .limit(10, Some(1000))?
866            .build()?;
867
868        // Limit pushdown Not supported in Join
869        assert_optimized_plan_equal!(
870            plan,
871            @r"
872        Limit: skip=10, fetch=1000
873          Inner Join: test.a = test2.a
874            TableScan: test
875            TableScan: test2
876        "
877        )
878    }
879
880    #[test]
881    fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> {
882        let table_scan_1 = test_table_scan_with_name("test1")?;
883        let table_scan_2 = test_table_scan_with_name("test2")?;
884
885        let subquery = LogicalPlanBuilder::from(table_scan_1)
886            .project(vec![col("a")])?
887            .filter(col("a").eq(col("test1.a")))?
888            .build()?;
889
890        let outer_query = LogicalPlanBuilder::from(table_scan_2)
891            .project(vec![col("a")])?
892            .filter(exists(Arc::new(subquery)))?
893            .limit(10, Some(100))?
894            .build()?;
895
896        // Limit pushdown Not supported in sub_query
897        assert_optimized_plan_equal!(
898            outer_query,
899            @r"
900        Limit: skip=10, fetch=100
901          Filter: EXISTS (<subquery>)
902            Subquery:
903              Filter: test1.a = test1.a
904                Projection: test1.a
905                  TableScan: test1
906            Projection: test2.a
907              TableScan: test2
908        "
909        )
910    }
911
912    #[test]
913    fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
914        let table_scan_1 = test_table_scan_with_name("test1")?;
915        let table_scan_2 = test_table_scan_with_name("test2")?;
916
917        let subquery = LogicalPlanBuilder::from(table_scan_1)
918            .project(vec![col("a")])?
919            .filter(col("a").eq(col("test1.a")))?
920            .build()?;
921
922        let outer_query = LogicalPlanBuilder::from(table_scan_2)
923            .project(vec![col("a")])?
924            .filter(exists(Arc::new(subquery)))?
925            .limit(10, Some(100))?
926            .build()?;
927
928        // Limit pushdown Not supported in sub_query
929        assert_optimized_plan_equal!(
930            outer_query,
931            @r"
932        Limit: skip=10, fetch=100
933          Filter: EXISTS (<subquery>)
934            Subquery:
935              Filter: test1.a = test1.a
936                Projection: test1.a
937                  TableScan: test1
938            Projection: test2.a
939              TableScan: test2
940        "
941        )
942    }
943
944    #[test]
945    fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
946        let table_scan_1 = test_table_scan()?;
947        let table_scan_2 = test_table_scan_with_name("test2")?;
948
949        let plan = LogicalPlanBuilder::from(table_scan_1)
950            .join(
951                LogicalPlanBuilder::from(table_scan_2).build()?,
952                JoinType::Left,
953                (vec!["a"], vec!["a"]),
954                None,
955            )?
956            .limit(10, Some(1000))?
957            .build()?;
958
959        // Limit pushdown Not supported in Join
960        assert_optimized_plan_equal!(
961            plan,
962            @r"
963        Limit: skip=10, fetch=1000
964          Left Join: test.a = test2.a
965            Limit: skip=0, fetch=1010
966              TableScan: test, fetch=1010
967            TableScan: test2
968        "
969        )
970    }
971
972    #[test]
973    fn limit_should_push_down_right_outer_join() -> Result<()> {
974        let table_scan_1 = test_table_scan()?;
975        let table_scan_2 = test_table_scan_with_name("test2")?;
976
977        let plan = LogicalPlanBuilder::from(table_scan_1)
978            .join(
979                LogicalPlanBuilder::from(table_scan_2).build()?,
980                JoinType::Right,
981                (vec!["a"], vec!["a"]),
982                None,
983            )?
984            .limit(0, Some(1000))?
985            .build()?;
986
987        // Limit pushdown Not supported in Join
988        assert_optimized_plan_equal!(
989            plan,
990            @r"
991        Limit: skip=0, fetch=1000
992          Right Join: test.a = test2.a
993            TableScan: test
994            Limit: skip=0, fetch=1000
995              TableScan: test2, fetch=1000
996        "
997        )
998    }
999
1000    #[test]
1001    fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> {
1002        let table_scan_1 = test_table_scan()?;
1003        let table_scan_2 = test_table_scan_with_name("test2")?;
1004
1005        let plan = LogicalPlanBuilder::from(table_scan_1)
1006            .join(
1007                LogicalPlanBuilder::from(table_scan_2).build()?,
1008                JoinType::Right,
1009                (vec!["a"], vec!["a"]),
1010                None,
1011            )?
1012            .limit(10, Some(1000))?
1013            .build()?;
1014
1015        // Limit pushdown with offset supported in right outer join
1016        assert_optimized_plan_equal!(
1017            plan,
1018            @r"
1019        Limit: skip=10, fetch=1000
1020          Right Join: test.a = test2.a
1021            TableScan: test
1022            Limit: skip=0, fetch=1010
1023              TableScan: test2, fetch=1010
1024        "
1025        )
1026    }
1027
1028    #[test]
1029    fn limit_push_down_cross_join() -> Result<()> {
1030        let table_scan_1 = test_table_scan()?;
1031        let table_scan_2 = test_table_scan_with_name("test2")?;
1032
1033        let plan = LogicalPlanBuilder::from(table_scan_1)
1034            .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
1035            .limit(0, Some(1000))?
1036            .build()?;
1037
1038        assert_optimized_plan_equal!(
1039            plan,
1040            @r"
1041        Limit: skip=0, fetch=1000
1042          Cross Join: 
1043            Limit: skip=0, fetch=1000
1044              TableScan: test, fetch=1000
1045            Limit: skip=0, fetch=1000
1046              TableScan: test2, fetch=1000
1047        "
1048        )
1049    }
1050
1051    #[test]
1052    fn skip_limit_push_down_cross_join() -> Result<()> {
1053        let table_scan_1 = test_table_scan()?;
1054        let table_scan_2 = test_table_scan_with_name("test2")?;
1055
1056        let plan = LogicalPlanBuilder::from(table_scan_1)
1057            .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
1058            .limit(1000, Some(1000))?
1059            .build()?;
1060
1061        assert_optimized_plan_equal!(
1062            plan,
1063            @r"
1064        Limit: skip=1000, fetch=1000
1065          Cross Join: 
1066            Limit: skip=0, fetch=2000
1067              TableScan: test, fetch=2000
1068            Limit: skip=0, fetch=2000
1069              TableScan: test2, fetch=2000
1070        "
1071        )
1072    }
1073
1074    #[test]
1075    fn merge_limit_result_empty() -> Result<()> {
1076        let scan = test_table_scan()?;
1077
1078        let plan = LogicalPlanBuilder::from(scan)
1079            .limit(0, Some(1000))?
1080            .limit(1000, None)?
1081            .build()?;
1082
1083        assert_optimized_plan_equal!(
1084            plan,
1085            @r"
1086        Limit: skip=1000, fetch=0
1087          TableScan: test, fetch=0
1088        "
1089        )
1090    }
1091
1092    #[test]
1093    fn skip_great_than_fetch() -> Result<()> {
1094        let scan = test_table_scan()?;
1095
1096        let plan = LogicalPlanBuilder::from(scan)
1097            .limit(0, Some(1))?
1098            .limit(1000, None)?
1099            .build()?;
1100
1101        assert_optimized_plan_equal!(
1102            plan,
1103            @r"
1104        Limit: skip=1000, fetch=0
1105          TableScan: test, fetch=0
1106        "
1107        )
1108    }
1109
1110    #[test]
1111    fn push_down_subquery_alias() -> Result<()> {
1112        let scan = test_table_scan()?;
1113
1114        let plan = LogicalPlanBuilder::from(scan)
1115            .alias("a")?
1116            .limit(0, Some(1))?
1117            .limit(1000, None)?
1118            .build()?;
1119
1120        assert_optimized_plan_equal!(
1121            plan,
1122            @r"
1123        SubqueryAlias: a
1124          Limit: skip=1000, fetch=0
1125            TableScan: test, fetch=0
1126        "
1127        )
1128    }
1129}