datafusion_optimizer/
push_down_limit.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PushDownLimit`] pushes `LIMIT` earlier in the query plan
19
20use std::cmp::min;
21use std::sync::Arc;
22
23use crate::optimizer::ApplyOrder;
24use crate::{OptimizerConfig, OptimizerRule};
25
26use datafusion_common::tree_node::Transformed;
27use datafusion_common::utils::combine_limit;
28use datafusion_common::Result;
29use datafusion_expr::logical_plan::{Join, JoinType, Limit, LogicalPlan};
30use datafusion_expr::{lit, FetchType, SkipType};
31
32/// Optimization rule that tries to push down `LIMIT`.
33///
34//. It will push down through projection, limits (taking the smaller limit)
35#[derive(Default, Debug)]
36pub struct PushDownLimit {}
37
38impl PushDownLimit {
39    #[allow(missing_docs)]
40    pub fn new() -> Self {
41        Self {}
42    }
43}
44
45/// Push down Limit.
46impl OptimizerRule for PushDownLimit {
47    fn supports_rewrite(&self) -> bool {
48        true
49    }
50
51    fn rewrite(
52        &self,
53        plan: LogicalPlan,
54        _config: &dyn OptimizerConfig,
55    ) -> Result<Transformed<LogicalPlan>> {
56        let LogicalPlan::Limit(mut limit) = plan else {
57            return Ok(Transformed::no(plan));
58        };
59
60        // Currently only rewrite if skip and fetch are both literals
61        let SkipType::Literal(skip) = limit.get_skip_type()? else {
62            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
63        };
64        let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
65            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
66        };
67
68        // Merge the Parent Limit and the Child Limit.
69        if let LogicalPlan::Limit(child) = limit.input.as_ref() {
70            let SkipType::Literal(child_skip) = child.get_skip_type()? else {
71                return Ok(Transformed::no(LogicalPlan::Limit(limit)));
72            };
73            let FetchType::Literal(child_fetch) = child.get_fetch_type()? else {
74                return Ok(Transformed::no(LogicalPlan::Limit(limit)));
75            };
76
77            let (skip, fetch) = combine_limit(skip, fetch, child_skip, child_fetch);
78            let plan = LogicalPlan::Limit(Limit {
79                skip: Some(Box::new(lit(skip as i64))),
80                fetch: fetch.map(|f| Box::new(lit(f as i64))),
81                input: Arc::clone(&child.input),
82            });
83
84            // recursively reapply the rule on the new plan
85            #[allow(clippy::used_underscore_binding)]
86            return self.rewrite(plan, _config);
87        }
88
89        // no fetch to push, so return the original plan
90        let Some(fetch) = fetch else {
91            return Ok(Transformed::no(LogicalPlan::Limit(limit)));
92        };
93
94        match Arc::unwrap_or_clone(limit.input) {
95            LogicalPlan::TableScan(mut scan) => {
96                let rows_needed = if fetch != 0 { fetch + skip } else { 0 };
97                let new_fetch = scan
98                    .fetch
99                    .map(|x| min(x, rows_needed))
100                    .or(Some(rows_needed));
101                if new_fetch == scan.fetch {
102                    original_limit(skip, fetch, LogicalPlan::TableScan(scan))
103                } else {
104                    // push limit into the table scan itself
105                    scan.fetch = scan
106                        .fetch
107                        .map(|x| min(x, rows_needed))
108                        .or(Some(rows_needed));
109                    transformed_limit(skip, fetch, LogicalPlan::TableScan(scan))
110                }
111            }
112            LogicalPlan::Union(mut union) => {
113                // push limits to each input of the union
114                union.inputs = union
115                    .inputs
116                    .into_iter()
117                    .map(|input| make_arc_limit(0, fetch + skip, input))
118                    .collect();
119                transformed_limit(skip, fetch, LogicalPlan::Union(union))
120            }
121
122            LogicalPlan::Join(join) => Ok(push_down_join(join, fetch + skip)
123                .update_data(|join| {
124                    make_limit(skip, fetch, Arc::new(LogicalPlan::Join(join)))
125                })),
126
127            LogicalPlan::Sort(mut sort) => {
128                let new_fetch = {
129                    let sort_fetch = skip + fetch;
130                    Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
131                };
132                if new_fetch == sort.fetch {
133                    if skip > 0 {
134                        original_limit(skip, fetch, LogicalPlan::Sort(sort))
135                    } else {
136                        Ok(Transformed::yes(LogicalPlan::Sort(sort)))
137                    }
138                } else {
139                    sort.fetch = new_fetch;
140                    limit.input = Arc::new(LogicalPlan::Sort(sort));
141                    Ok(Transformed::yes(LogicalPlan::Limit(limit)))
142                }
143            }
144            LogicalPlan::Projection(mut proj) => {
145                // commute
146                limit.input = Arc::clone(&proj.input);
147                let new_limit = LogicalPlan::Limit(limit);
148                proj.input = Arc::new(new_limit);
149                Ok(Transformed::yes(LogicalPlan::Projection(proj)))
150            }
151            LogicalPlan::SubqueryAlias(mut subquery_alias) => {
152                // commute
153                limit.input = Arc::clone(&subquery_alias.input);
154                let new_limit = LogicalPlan::Limit(limit);
155                subquery_alias.input = Arc::new(new_limit);
156                Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias)))
157            }
158            LogicalPlan::Extension(extension_plan)
159                if extension_plan.node.supports_limit_pushdown() =>
160            {
161                let new_children = extension_plan
162                    .node
163                    .inputs()
164                    .into_iter()
165                    .map(|child| {
166                        LogicalPlan::Limit(Limit {
167                            skip: None,
168                            fetch: Some(Box::new(lit((fetch + skip) as i64))),
169                            input: Arc::new(child.clone()),
170                        })
171                    })
172                    .collect::<Vec<_>>();
173
174                // Create a new extension node with updated inputs
175                let child_plan = LogicalPlan::Extension(extension_plan);
176                let new_extension =
177                    child_plan.with_new_exprs(child_plan.expressions(), new_children)?;
178
179                transformed_limit(skip, fetch, new_extension)
180            }
181            input => original_limit(skip, fetch, input),
182        }
183    }
184
185    fn name(&self) -> &str {
186        "push_down_limit"
187    }
188
189    fn apply_order(&self) -> Option<ApplyOrder> {
190        Some(ApplyOrder::TopDown)
191    }
192}
193
194/// Wrap the input plan with a limit node
195///
196/// Original:
197/// ```text
198/// input
199/// ```
200///
201/// Return
202/// ```text
203/// Limit: skip=skip, fetch=fetch
204///  input
205/// ```
206fn make_limit(skip: usize, fetch: usize, input: Arc<LogicalPlan>) -> LogicalPlan {
207    LogicalPlan::Limit(Limit {
208        skip: Some(Box::new(lit(skip as i64))),
209        fetch: Some(Box::new(lit(fetch as i64))),
210        input,
211    })
212}
213
214/// Wrap the input plan with a limit node
215fn make_arc_limit(
216    skip: usize,
217    fetch: usize,
218    input: Arc<LogicalPlan>,
219) -> Arc<LogicalPlan> {
220    Arc::new(make_limit(skip, fetch, input))
221}
222
223/// Returns the original limit (non transformed)
224fn original_limit(
225    skip: usize,
226    fetch: usize,
227    input: LogicalPlan,
228) -> Result<Transformed<LogicalPlan>> {
229    Ok(Transformed::no(make_limit(skip, fetch, Arc::new(input))))
230}
231
232/// Returns the a transformed limit
233fn transformed_limit(
234    skip: usize,
235    fetch: usize,
236    input: LogicalPlan,
237) -> Result<Transformed<LogicalPlan>> {
238    Ok(Transformed::yes(make_limit(skip, fetch, Arc::new(input))))
239}
240
241/// Adds a limit to the inputs of a join, if possible
242fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
243    use JoinType::*;
244
245    // Cross join is the special case of inner join where there is no join condition. see [LogicalPlanBuilder::cross_join]
246    fn is_cross_join(join: &Join) -> bool {
247        join.join_type == Inner && join.on.is_empty() && join.filter.is_none()
248    }
249
250    let (left_limit, right_limit) = if is_cross_join(&join) {
251        (Some(limit), Some(limit))
252    } else {
253        match join.join_type {
254            Left => (Some(limit), None),
255            Right => (None, Some(limit)),
256            _ => (None, None),
257        }
258    };
259
260    if left_limit.is_none() && right_limit.is_none() {
261        return Transformed::no(join);
262    }
263    if let Some(limit) = left_limit {
264        join.left = make_arc_limit(0, limit, join.left);
265    }
266    if let Some(limit) = right_limit {
267        join.right = make_arc_limit(0, limit, join.right);
268    }
269    Transformed::yes(join)
270}
271
272#[cfg(test)]
273mod test {
274    use std::cmp::Ordering;
275    use std::fmt::{Debug, Formatter};
276    use std::vec;
277
278    use super::*;
279    use crate::test::*;
280
281    use datafusion_common::DFSchemaRef;
282    use datafusion_expr::{
283        col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, Extension,
284        UserDefinedLogicalNodeCore,
285    };
286    use datafusion_functions_aggregate::expr_fn::max;
287
288    fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
289        assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan, expected)
290    }
291
292    #[derive(Debug, PartialEq, Eq, Hash)]
293    pub struct NoopPlan {
294        input: Vec<LogicalPlan>,
295        schema: DFSchemaRef,
296    }
297
298    // Manual implementation needed because of `schema` field. Comparison excludes this field.
299    impl PartialOrd for NoopPlan {
300        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
301            self.input.partial_cmp(&other.input)
302        }
303    }
304
305    impl UserDefinedLogicalNodeCore for NoopPlan {
306        fn name(&self) -> &str {
307            "NoopPlan"
308        }
309
310        fn inputs(&self) -> Vec<&LogicalPlan> {
311            self.input.iter().collect()
312        }
313
314        fn schema(&self) -> &DFSchemaRef {
315            &self.schema
316        }
317
318        fn expressions(&self) -> Vec<Expr> {
319            self.input
320                .iter()
321                .flat_map(|child| child.expressions())
322                .collect()
323        }
324
325        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
326            write!(f, "NoopPlan")
327        }
328
329        fn with_exprs_and_inputs(
330            &self,
331            _exprs: Vec<Expr>,
332            inputs: Vec<LogicalPlan>,
333        ) -> Result<Self> {
334            Ok(Self {
335                input: inputs,
336                schema: Arc::clone(&self.schema),
337            })
338        }
339
340        fn supports_limit_pushdown(&self) -> bool {
341            true // Allow limit push-down
342        }
343    }
344
345    #[derive(Debug, PartialEq, Eq, Hash)]
346    struct NoLimitNoopPlan {
347        input: Vec<LogicalPlan>,
348        schema: DFSchemaRef,
349    }
350
351    // Manual implementation needed because of `schema` field. Comparison excludes this field.
352    impl PartialOrd for NoLimitNoopPlan {
353        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
354            self.input.partial_cmp(&other.input)
355        }
356    }
357
358    impl UserDefinedLogicalNodeCore for NoLimitNoopPlan {
359        fn name(&self) -> &str {
360            "NoLimitNoopPlan"
361        }
362
363        fn inputs(&self) -> Vec<&LogicalPlan> {
364            self.input.iter().collect()
365        }
366
367        fn schema(&self) -> &DFSchemaRef {
368            &self.schema
369        }
370
371        fn expressions(&self) -> Vec<Expr> {
372            self.input
373                .iter()
374                .flat_map(|child| child.expressions())
375                .collect()
376        }
377
378        fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
379            write!(f, "NoLimitNoopPlan")
380        }
381
382        fn with_exprs_and_inputs(
383            &self,
384            _exprs: Vec<Expr>,
385            inputs: Vec<LogicalPlan>,
386        ) -> Result<Self> {
387            Ok(Self {
388                input: inputs,
389                schema: Arc::clone(&self.schema),
390            })
391        }
392
393        fn supports_limit_pushdown(&self) -> bool {
394            false // Disallow limit push-down by default
395        }
396    }
397    #[test]
398    fn limit_pushdown_basic() -> Result<()> {
399        let table_scan = test_table_scan()?;
400        let noop_plan = LogicalPlan::Extension(Extension {
401            node: Arc::new(NoopPlan {
402                input: vec![table_scan.clone()],
403                schema: Arc::clone(table_scan.schema()),
404            }),
405        });
406
407        let plan = LogicalPlanBuilder::from(noop_plan)
408            .limit(0, Some(1000))?
409            .build()?;
410
411        let expected = "Limit: skip=0, fetch=1000\
412        \n  NoopPlan\
413        \n    Limit: skip=0, fetch=1000\
414        \n      TableScan: test, fetch=1000";
415
416        assert_optimized_plan_equal(plan, expected)
417    }
418
419    #[test]
420    fn limit_pushdown_with_skip() -> Result<()> {
421        let table_scan = test_table_scan()?;
422        let noop_plan = LogicalPlan::Extension(Extension {
423            node: Arc::new(NoopPlan {
424                input: vec![table_scan.clone()],
425                schema: Arc::clone(table_scan.schema()),
426            }),
427        });
428
429        let plan = LogicalPlanBuilder::from(noop_plan)
430            .limit(10, Some(1000))?
431            .build()?;
432
433        let expected = "Limit: skip=10, fetch=1000\
434        \n  NoopPlan\
435        \n    Limit: skip=0, fetch=1010\
436        \n      TableScan: test, fetch=1010";
437
438        assert_optimized_plan_equal(plan, expected)
439    }
440
441    #[test]
442    fn limit_pushdown_multiple_limits() -> Result<()> {
443        let table_scan = test_table_scan()?;
444        let noop_plan = LogicalPlan::Extension(Extension {
445            node: Arc::new(NoopPlan {
446                input: vec![table_scan.clone()],
447                schema: Arc::clone(table_scan.schema()),
448            }),
449        });
450
451        let plan = LogicalPlanBuilder::from(noop_plan)
452            .limit(10, Some(1000))?
453            .limit(20, Some(500))?
454            .build()?;
455
456        let expected = "Limit: skip=30, fetch=500\
457        \n  NoopPlan\
458        \n    Limit: skip=0, fetch=530\
459        \n      TableScan: test, fetch=530";
460
461        assert_optimized_plan_equal(plan, expected)
462    }
463
464    #[test]
465    fn limit_pushdown_multiple_inputs() -> Result<()> {
466        let table_scan = test_table_scan()?;
467        let noop_plan = LogicalPlan::Extension(Extension {
468            node: Arc::new(NoopPlan {
469                input: vec![table_scan.clone(), table_scan.clone()],
470                schema: Arc::clone(table_scan.schema()),
471            }),
472        });
473
474        let plan = LogicalPlanBuilder::from(noop_plan)
475            .limit(0, Some(1000))?
476            .build()?;
477
478        let expected = "Limit: skip=0, fetch=1000\
479        \n  NoopPlan\
480        \n    Limit: skip=0, fetch=1000\
481        \n      TableScan: test, fetch=1000\
482        \n    Limit: skip=0, fetch=1000\
483        \n      TableScan: test, fetch=1000";
484
485        assert_optimized_plan_equal(plan, expected)
486    }
487
488    #[test]
489    fn limit_pushdown_disallowed_noop_plan() -> Result<()> {
490        let table_scan = test_table_scan()?;
491        let no_limit_noop_plan = LogicalPlan::Extension(Extension {
492            node: Arc::new(NoLimitNoopPlan {
493                input: vec![table_scan.clone()],
494                schema: Arc::clone(table_scan.schema()),
495            }),
496        });
497
498        let plan = LogicalPlanBuilder::from(no_limit_noop_plan)
499            .limit(0, Some(1000))?
500            .build()?;
501
502        let expected = "Limit: skip=0, fetch=1000\
503        \n  NoLimitNoopPlan\
504        \n    TableScan: test";
505
506        assert_optimized_plan_equal(plan, expected)
507    }
508
509    #[test]
510    fn limit_pushdown_projection_table_provider() -> Result<()> {
511        let table_scan = test_table_scan()?;
512
513        let plan = LogicalPlanBuilder::from(table_scan)
514            .project(vec![col("a")])?
515            .limit(0, Some(1000))?
516            .build()?;
517
518        // Should push the limit down to table provider
519        // When it has a select
520        let expected = "Projection: test.a\
521        \n  Limit: skip=0, fetch=1000\
522        \n    TableScan: test, fetch=1000";
523
524        assert_optimized_plan_equal(plan, expected)
525    }
526
527    #[test]
528    fn limit_push_down_take_smaller_limit() -> Result<()> {
529        let table_scan = test_table_scan()?;
530
531        let plan = LogicalPlanBuilder::from(table_scan)
532            .limit(0, Some(1000))?
533            .limit(0, Some(10))?
534            .build()?;
535
536        // Should push down the smallest limit
537        // Towards table scan
538        // This rule doesn't replace multiple limits
539        let expected = "Limit: skip=0, fetch=10\
540        \n  TableScan: test, fetch=10";
541
542        assert_optimized_plan_equal(plan, expected)
543    }
544
545    #[test]
546    fn limit_doesnt_push_down_aggregation() -> Result<()> {
547        let table_scan = test_table_scan()?;
548
549        let plan = LogicalPlanBuilder::from(table_scan)
550            .aggregate(vec![col("a")], vec![max(col("b"))])?
551            .limit(0, Some(1000))?
552            .build()?;
553
554        // Limit should *not* push down aggregate node
555        let expected = "Limit: skip=0, fetch=1000\
556        \n  Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]\
557        \n    TableScan: test";
558
559        assert_optimized_plan_equal(plan, expected)
560    }
561
562    #[test]
563    fn limit_should_push_down_union() -> Result<()> {
564        let table_scan = test_table_scan()?;
565
566        let plan = LogicalPlanBuilder::from(table_scan.clone())
567            .union(LogicalPlanBuilder::from(table_scan).build()?)?
568            .limit(0, Some(1000))?
569            .build()?;
570
571        // Limit should push down through union
572        let expected = "Limit: skip=0, fetch=1000\
573        \n  Union\
574        \n    Limit: skip=0, fetch=1000\
575        \n      TableScan: test, fetch=1000\
576        \n    Limit: skip=0, fetch=1000\
577        \n      TableScan: test, fetch=1000";
578
579        assert_optimized_plan_equal(plan, expected)
580    }
581
582    #[test]
583    fn limit_push_down_sort() -> Result<()> {
584        let table_scan = test_table_scan()?;
585
586        let plan = LogicalPlanBuilder::from(table_scan)
587            .sort_by(vec![col("a")])?
588            .limit(0, Some(10))?
589            .build()?;
590
591        // Should push down limit to sort
592        let expected = "Limit: skip=0, fetch=10\
593        \n  Sort: test.a ASC NULLS LAST, fetch=10\
594        \n    TableScan: test";
595
596        assert_optimized_plan_equal(plan, expected)
597    }
598
599    #[test]
600    fn limit_push_down_sort_skip() -> Result<()> {
601        let table_scan = test_table_scan()?;
602
603        let plan = LogicalPlanBuilder::from(table_scan)
604            .sort_by(vec![col("a")])?
605            .limit(5, Some(10))?
606            .build()?;
607
608        // Should push down limit to sort
609        let expected = "Limit: skip=5, fetch=10\
610        \n  Sort: test.a ASC NULLS LAST, fetch=15\
611        \n    TableScan: test";
612
613        assert_optimized_plan_equal(plan, expected)
614    }
615
616    #[test]
617    fn multi_stage_limit_recursive_to_deeper_limit() -> Result<()> {
618        let table_scan = test_table_scan()?;
619
620        let plan = LogicalPlanBuilder::from(table_scan)
621            .limit(0, Some(1000))?
622            .aggregate(vec![col("a")], vec![max(col("b"))])?
623            .limit(0, Some(10))?
624            .build()?;
625
626        // Limit should use deeper LIMIT 1000, but Limit 10 shouldn't push down aggregation
627        let expected = "Limit: skip=0, fetch=10\
628        \n  Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]\
629        \n    Limit: skip=0, fetch=1000\
630        \n      TableScan: test, fetch=1000";
631
632        assert_optimized_plan_equal(plan, expected)
633    }
634
635    #[test]
636    fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> {
637        let table_scan = test_table_scan()?;
638        let plan = LogicalPlanBuilder::from(table_scan)
639            .limit(10, None)?
640            .build()?;
641
642        // Should not push any limit down to table provider
643        // When it has a select
644        let expected = "Limit: skip=10, fetch=None\
645        \n  TableScan: test";
646
647        assert_optimized_plan_equal(plan, expected)
648    }
649
650    #[test]
651    fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
652        let table_scan = test_table_scan()?;
653
654        let plan = LogicalPlanBuilder::from(table_scan)
655            .project(vec![col("a")])?
656            .limit(10, Some(1000))?
657            .build()?;
658
659        // Should push the limit down to table provider
660        // When it has a select
661        let expected = "Projection: test.a\
662        \n  Limit: skip=10, fetch=1000\
663        \n    TableScan: test, fetch=1010";
664
665        assert_optimized_plan_equal(plan, expected)
666    }
667
668    #[test]
669    fn limit_pushdown_with_offset_after_limit() -> Result<()> {
670        let table_scan = test_table_scan()?;
671
672        let plan = LogicalPlanBuilder::from(table_scan)
673            .project(vec![col("a")])?
674            .limit(0, Some(1000))?
675            .limit(10, None)?
676            .build()?;
677
678        let expected = "Projection: test.a\
679        \n  Limit: skip=10, fetch=990\
680        \n    TableScan: test, fetch=1000";
681
682        assert_optimized_plan_equal(plan, expected)
683    }
684
685    #[test]
686    fn limit_pushdown_with_limit_after_offset() -> Result<()> {
687        let table_scan = test_table_scan()?;
688
689        let plan = LogicalPlanBuilder::from(table_scan)
690            .project(vec![col("a")])?
691            .limit(10, None)?
692            .limit(0, Some(1000))?
693            .build()?;
694
695        let expected = "Projection: test.a\
696        \n  Limit: skip=10, fetch=1000\
697        \n    TableScan: test, fetch=1010";
698
699        assert_optimized_plan_equal(plan, expected)
700    }
701
702    #[test]
703    fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
704        let table_scan = test_table_scan()?;
705
706        let plan = LogicalPlanBuilder::from(table_scan)
707            .limit(10, None)?
708            .limit(0, Some(1000))?
709            .limit(0, Some(10))?
710            .build()?;
711
712        let expected = "Limit: skip=10, fetch=10\
713        \n  TableScan: test, fetch=20";
714
715        assert_optimized_plan_equal(plan, expected)
716    }
717
718    #[test]
719    fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
720        let table_scan = test_table_scan()?;
721
722        let plan = LogicalPlanBuilder::from(table_scan)
723            .aggregate(vec![col("a")], vec![max(col("b"))])?
724            .limit(10, Some(1000))?
725            .build()?;
726
727        // Limit should *not* push down aggregate node
728        let expected = "Limit: skip=10, fetch=1000\
729        \n  Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]\
730        \n    TableScan: test";
731
732        assert_optimized_plan_equal(plan, expected)
733    }
734
735    #[test]
736    fn limit_should_push_down_with_offset_union() -> Result<()> {
737        let table_scan = test_table_scan()?;
738
739        let plan = LogicalPlanBuilder::from(table_scan.clone())
740            .union(LogicalPlanBuilder::from(table_scan).build()?)?
741            .limit(10, Some(1000))?
742            .build()?;
743
744        // Limit should push down through union
745        let expected = "Limit: skip=10, fetch=1000\
746        \n  Union\
747        \n    Limit: skip=0, fetch=1010\
748        \n      TableScan: test, fetch=1010\
749        \n    Limit: skip=0, fetch=1010\
750        \n      TableScan: test, fetch=1010";
751
752        assert_optimized_plan_equal(plan, expected)
753    }
754
755    #[test]
756    fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> {
757        let table_scan_1 = test_table_scan()?;
758        let table_scan_2 = test_table_scan_with_name("test2")?;
759
760        let plan = LogicalPlanBuilder::from(table_scan_1)
761            .join(
762                LogicalPlanBuilder::from(table_scan_2).build()?,
763                JoinType::Inner,
764                (vec!["a"], vec!["a"]),
765                None,
766            )?
767            .limit(10, Some(1000))?
768            .build()?;
769
770        // Limit pushdown Not supported in Join
771        let expected = "Limit: skip=10, fetch=1000\
772        \n  Inner Join: test.a = test2.a\
773        \n    TableScan: test\
774        \n    TableScan: test2";
775
776        assert_optimized_plan_equal(plan, expected)
777    }
778
779    #[test]
780    fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> {
781        let table_scan_1 = test_table_scan()?;
782        let table_scan_2 = test_table_scan_with_name("test2")?;
783
784        let plan = LogicalPlanBuilder::from(table_scan_1)
785            .join(
786                LogicalPlanBuilder::from(table_scan_2).build()?,
787                JoinType::Inner,
788                (vec!["a"], vec!["a"]),
789                None,
790            )?
791            .limit(10, Some(1000))?
792            .build()?;
793
794        // Limit pushdown Not supported in Join
795        let expected = "Limit: skip=10, fetch=1000\
796        \n  Inner Join: test.a = test2.a\
797        \n    TableScan: test\
798        \n    TableScan: test2";
799
800        assert_optimized_plan_equal(plan, expected)
801    }
802
803    #[test]
804    fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> {
805        let table_scan_1 = test_table_scan_with_name("test1")?;
806        let table_scan_2 = test_table_scan_with_name("test2")?;
807
808        let subquery = LogicalPlanBuilder::from(table_scan_1)
809            .project(vec![col("a")])?
810            .filter(col("a").eq(col("test1.a")))?
811            .build()?;
812
813        let outer_query = LogicalPlanBuilder::from(table_scan_2)
814            .project(vec![col("a")])?
815            .filter(exists(Arc::new(subquery)))?
816            .limit(10, Some(100))?
817            .build()?;
818
819        // Limit pushdown Not supported in sub_query
820        let expected = "Limit: skip=10, fetch=100\
821        \n  Filter: EXISTS (<subquery>)\
822        \n    Subquery:\
823        \n      Filter: test1.a = test1.a\
824        \n        Projection: test1.a\
825        \n          TableScan: test1\
826        \n    Projection: test2.a\
827        \n      TableScan: test2";
828
829        assert_optimized_plan_equal(outer_query, expected)
830    }
831
832    #[test]
833    fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
834        let table_scan_1 = test_table_scan_with_name("test1")?;
835        let table_scan_2 = test_table_scan_with_name("test2")?;
836
837        let subquery = LogicalPlanBuilder::from(table_scan_1)
838            .project(vec![col("a")])?
839            .filter(col("a").eq(col("test1.a")))?
840            .build()?;
841
842        let outer_query = LogicalPlanBuilder::from(table_scan_2)
843            .project(vec![col("a")])?
844            .filter(exists(Arc::new(subquery)))?
845            .limit(10, Some(100))?
846            .build()?;
847
848        // Limit pushdown Not supported in sub_query
849        let expected = "Limit: skip=10, fetch=100\
850        \n  Filter: EXISTS (<subquery>)\
851        \n    Subquery:\
852        \n      Filter: test1.a = test1.a\
853        \n        Projection: test1.a\
854        \n          TableScan: test1\
855        \n    Projection: test2.a\
856        \n      TableScan: test2";
857
858        assert_optimized_plan_equal(outer_query, expected)
859    }
860
861    #[test]
862    fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
863        let table_scan_1 = test_table_scan()?;
864        let table_scan_2 = test_table_scan_with_name("test2")?;
865
866        let plan = LogicalPlanBuilder::from(table_scan_1)
867            .join(
868                LogicalPlanBuilder::from(table_scan_2).build()?,
869                JoinType::Left,
870                (vec!["a"], vec!["a"]),
871                None,
872            )?
873            .limit(10, Some(1000))?
874            .build()?;
875
876        // Limit pushdown Not supported in Join
877        let expected = "Limit: skip=10, fetch=1000\
878        \n  Left Join: test.a = test2.a\
879        \n    Limit: skip=0, fetch=1010\
880        \n      TableScan: test, fetch=1010\
881        \n    TableScan: test2";
882
883        assert_optimized_plan_equal(plan, expected)
884    }
885
886    #[test]
887    fn limit_should_push_down_right_outer_join() -> Result<()> {
888        let table_scan_1 = test_table_scan()?;
889        let table_scan_2 = test_table_scan_with_name("test2")?;
890
891        let plan = LogicalPlanBuilder::from(table_scan_1)
892            .join(
893                LogicalPlanBuilder::from(table_scan_2).build()?,
894                JoinType::Right,
895                (vec!["a"], vec!["a"]),
896                None,
897            )?
898            .limit(0, Some(1000))?
899            .build()?;
900
901        // Limit pushdown Not supported in Join
902        let expected = "Limit: skip=0, fetch=1000\
903        \n  Right Join: test.a = test2.a\
904        \n    TableScan: test\
905        \n    Limit: skip=0, fetch=1000\
906        \n      TableScan: test2, fetch=1000";
907
908        assert_optimized_plan_equal(plan, expected)
909    }
910
911    #[test]
912    fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> {
913        let table_scan_1 = test_table_scan()?;
914        let table_scan_2 = test_table_scan_with_name("test2")?;
915
916        let plan = LogicalPlanBuilder::from(table_scan_1)
917            .join(
918                LogicalPlanBuilder::from(table_scan_2).build()?,
919                JoinType::Right,
920                (vec!["a"], vec!["a"]),
921                None,
922            )?
923            .limit(10, Some(1000))?
924            .build()?;
925
926        // Limit pushdown with offset supported in right outer join
927        let expected = "Limit: skip=10, fetch=1000\
928        \n  Right Join: test.a = test2.a\
929        \n    TableScan: test\
930        \n    Limit: skip=0, fetch=1010\
931        \n      TableScan: test2, fetch=1010";
932
933        assert_optimized_plan_equal(plan, expected)
934    }
935
936    #[test]
937    fn limit_push_down_cross_join() -> Result<()> {
938        let table_scan_1 = test_table_scan()?;
939        let table_scan_2 = test_table_scan_with_name("test2")?;
940
941        let plan = LogicalPlanBuilder::from(table_scan_1)
942            .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
943            .limit(0, Some(1000))?
944            .build()?;
945
946        let expected = "Limit: skip=0, fetch=1000\
947        \n  Cross Join: \
948        \n    Limit: skip=0, fetch=1000\
949        \n      TableScan: test, fetch=1000\
950        \n    Limit: skip=0, fetch=1000\
951        \n      TableScan: test2, fetch=1000";
952
953        assert_optimized_plan_equal(plan, expected)
954    }
955
956    #[test]
957    fn skip_limit_push_down_cross_join() -> Result<()> {
958        let table_scan_1 = test_table_scan()?;
959        let table_scan_2 = test_table_scan_with_name("test2")?;
960
961        let plan = LogicalPlanBuilder::from(table_scan_1)
962            .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
963            .limit(1000, Some(1000))?
964            .build()?;
965
966        let expected = "Limit: skip=1000, fetch=1000\
967        \n  Cross Join: \
968        \n    Limit: skip=0, fetch=2000\
969        \n      TableScan: test, fetch=2000\
970        \n    Limit: skip=0, fetch=2000\
971        \n      TableScan: test2, fetch=2000";
972
973        assert_optimized_plan_equal(plan, expected)
974    }
975
976    #[test]
977    fn merge_limit_result_empty() -> Result<()> {
978        let scan = test_table_scan()?;
979
980        let plan = LogicalPlanBuilder::from(scan)
981            .limit(0, Some(1000))?
982            .limit(1000, None)?
983            .build()?;
984
985        let expected = "Limit: skip=1000, fetch=0\
986        \n  TableScan: test, fetch=0";
987
988        assert_optimized_plan_equal(plan, expected)
989    }
990
991    #[test]
992    fn skip_great_than_fetch() -> Result<()> {
993        let scan = test_table_scan()?;
994
995        let plan = LogicalPlanBuilder::from(scan)
996            .limit(0, Some(1))?
997            .limit(1000, None)?
998            .build()?;
999
1000        let expected = "Limit: skip=1000, fetch=0\
1001        \n  TableScan: test, fetch=0";
1002
1003        assert_optimized_plan_equal(plan, expected)
1004    }
1005
1006    #[test]
1007    fn push_down_subquery_alias() -> Result<()> {
1008        let scan = test_table_scan()?;
1009
1010        let plan = LogicalPlanBuilder::from(scan)
1011            .alias("a")?
1012            .limit(0, Some(1))?
1013            .limit(1000, None)?
1014            .build()?;
1015
1016        let expected = "SubqueryAlias: a\
1017        \n  Limit: skip=1000, fetch=0\
1018        \n    TableScan: test, fetch=0";
1019
1020        assert_optimized_plan_equal(plan, expected)
1021    }
1022}