1use std::cmp::min;
21use std::sync::Arc;
22
23use crate::optimizer::ApplyOrder;
24use crate::{OptimizerConfig, OptimizerRule};
25
26use datafusion_common::tree_node::Transformed;
27use datafusion_common::utils::combine_limit;
28use datafusion_common::Result;
29use datafusion_expr::logical_plan::{Join, JoinType, Limit, LogicalPlan};
30use datafusion_expr::{lit, FetchType, SkipType};
31
32#[derive(Default, Debug)]
35pub struct PushDownLimit {}
36
37impl PushDownLimit {
38 #[allow(missing_docs)]
39 pub fn new() -> Self {
40 Self {}
41 }
42}
43
44impl OptimizerRule for PushDownLimit {
46 fn supports_rewrite(&self) -> bool {
47 true
48 }
49
50 fn rewrite(
51 &self,
52 plan: LogicalPlan,
53 _config: &dyn OptimizerConfig,
54 ) -> Result<Transformed<LogicalPlan>> {
55 let LogicalPlan::Limit(mut limit) = plan else {
56 return Ok(Transformed::no(plan));
57 };
58
59 let SkipType::Literal(skip) = limit.get_skip_type()? else {
61 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
62 };
63 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
64 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
65 };
66
67 if let LogicalPlan::Limit(child) = limit.input.as_ref() {
69 let SkipType::Literal(child_skip) = child.get_skip_type()? else {
70 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
71 };
72 let FetchType::Literal(child_fetch) = child.get_fetch_type()? else {
73 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
74 };
75
76 let (skip, fetch) = combine_limit(skip, fetch, child_skip, child_fetch);
77 let plan = LogicalPlan::Limit(Limit {
78 skip: Some(Box::new(lit(skip as i64))),
79 fetch: fetch.map(|f| Box::new(lit(f as i64))),
80 input: Arc::clone(&child.input),
81 });
82
83 #[allow(clippy::used_underscore_binding)]
85 return self.rewrite(plan, _config);
86 }
87
88 let Some(fetch) = fetch else {
90 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
91 };
92
93 match Arc::unwrap_or_clone(limit.input) {
94 LogicalPlan::TableScan(mut scan) => {
95 let rows_needed = if fetch != 0 { fetch + skip } else { 0 };
96 let new_fetch = scan
97 .fetch
98 .map(|x| min(x, rows_needed))
99 .or(Some(rows_needed));
100 if new_fetch == scan.fetch {
101 original_limit(skip, fetch, LogicalPlan::TableScan(scan))
102 } else {
103 scan.fetch = scan
105 .fetch
106 .map(|x| min(x, rows_needed))
107 .or(Some(rows_needed));
108 transformed_limit(skip, fetch, LogicalPlan::TableScan(scan))
109 }
110 }
111 LogicalPlan::Union(mut union) => {
112 union.inputs = union
114 .inputs
115 .into_iter()
116 .map(|input| make_arc_limit(0, fetch + skip, input))
117 .collect();
118 transformed_limit(skip, fetch, LogicalPlan::Union(union))
119 }
120
121 LogicalPlan::Join(join) => Ok(push_down_join(join, fetch + skip)
122 .update_data(|join| {
123 make_limit(skip, fetch, Arc::new(LogicalPlan::Join(join)))
124 })),
125
126 LogicalPlan::Sort(mut sort) => {
127 let new_fetch = {
128 let sort_fetch = skip + fetch;
129 Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
130 };
131 if new_fetch == sort.fetch {
132 if skip > 0 {
133 original_limit(skip, fetch, LogicalPlan::Sort(sort))
134 } else {
135 Ok(Transformed::yes(LogicalPlan::Sort(sort)))
136 }
137 } else {
138 sort.fetch = new_fetch;
139 limit.input = Arc::new(LogicalPlan::Sort(sort));
140 Ok(Transformed::yes(LogicalPlan::Limit(limit)))
141 }
142 }
143 LogicalPlan::Projection(mut proj) => {
144 limit.input = Arc::clone(&proj.input);
146 let new_limit = LogicalPlan::Limit(limit);
147 proj.input = Arc::new(new_limit);
148 Ok(Transformed::yes(LogicalPlan::Projection(proj)))
149 }
150 LogicalPlan::SubqueryAlias(mut subquery_alias) => {
151 limit.input = Arc::clone(&subquery_alias.input);
153 let new_limit = LogicalPlan::Limit(limit);
154 subquery_alias.input = Arc::new(new_limit);
155 Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias)))
156 }
157 LogicalPlan::Extension(extension_plan)
158 if extension_plan.node.supports_limit_pushdown() =>
159 {
160 let new_children = extension_plan
161 .node
162 .inputs()
163 .into_iter()
164 .map(|child| {
165 LogicalPlan::Limit(Limit {
166 skip: None,
167 fetch: Some(Box::new(lit((fetch + skip) as i64))),
168 input: Arc::new(child.clone()),
169 })
170 })
171 .collect::<Vec<_>>();
172
173 let child_plan = LogicalPlan::Extension(extension_plan);
175 let new_extension =
176 child_plan.with_new_exprs(child_plan.expressions(), new_children)?;
177
178 transformed_limit(skip, fetch, new_extension)
179 }
180 input => original_limit(skip, fetch, input),
181 }
182 }
183
184 fn name(&self) -> &str {
185 "push_down_limit"
186 }
187
188 fn apply_order(&self) -> Option<ApplyOrder> {
189 Some(ApplyOrder::TopDown)
190 }
191}
192
193fn make_limit(skip: usize, fetch: usize, input: Arc<LogicalPlan>) -> LogicalPlan {
206 LogicalPlan::Limit(Limit {
207 skip: Some(Box::new(lit(skip as i64))),
208 fetch: Some(Box::new(lit(fetch as i64))),
209 input,
210 })
211}
212
213fn make_arc_limit(
215 skip: usize,
216 fetch: usize,
217 input: Arc<LogicalPlan>,
218) -> Arc<LogicalPlan> {
219 Arc::new(make_limit(skip, fetch, input))
220}
221
222fn original_limit(
224 skip: usize,
225 fetch: usize,
226 input: LogicalPlan,
227) -> Result<Transformed<LogicalPlan>> {
228 Ok(Transformed::no(make_limit(skip, fetch, Arc::new(input))))
229}
230
231fn transformed_limit(
233 skip: usize,
234 fetch: usize,
235 input: LogicalPlan,
236) -> Result<Transformed<LogicalPlan>> {
237 Ok(Transformed::yes(make_limit(skip, fetch, Arc::new(input))))
238}
239
240fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
242 use JoinType::*;
243
244 fn is_cross_join(join: &Join) -> bool {
246 join.join_type == Inner && join.on.is_empty() && join.filter.is_none()
247 }
248
249 let (left_limit, right_limit) = if is_cross_join(&join) {
250 (Some(limit), Some(limit))
251 } else {
252 match join.join_type {
253 Left => (Some(limit), None),
254 Right => (None, Some(limit)),
255 _ => (None, None),
256 }
257 };
258
259 if left_limit.is_none() && right_limit.is_none() {
260 return Transformed::no(join);
261 }
262 if let Some(limit) = left_limit {
263 join.left = make_arc_limit(0, limit, join.left);
264 }
265 if let Some(limit) = right_limit {
266 join.right = make_arc_limit(0, limit, join.right);
267 }
268 Transformed::yes(join)
269}
270
271#[cfg(test)]
272mod test {
273 use std::cmp::Ordering;
274 use std::fmt::{Debug, Formatter};
275 use std::vec;
276
277 use super::*;
278 use crate::assert_optimized_plan_eq_snapshot;
279 use crate::test::*;
280
281 use crate::OptimizerContext;
282 use datafusion_common::DFSchemaRef;
283 use datafusion_expr::{
284 col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, Extension,
285 UserDefinedLogicalNodeCore,
286 };
287 use datafusion_functions_aggregate::expr_fn::max;
288
289 macro_rules! assert_optimized_plan_equal {
290 (
291 $plan:expr,
292 @ $expected:literal $(,)?
293 ) => {{
294 let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
295 let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(PushDownLimit::new())];
296 assert_optimized_plan_eq_snapshot!(
297 optimizer_ctx,
298 rules,
299 $plan,
300 @ $expected,
301 )
302 }};
303 }
304
305 #[derive(Debug, PartialEq, Eq, Hash)]
306 pub struct NoopPlan {
307 input: Vec<LogicalPlan>,
308 schema: DFSchemaRef,
309 }
310
311 impl PartialOrd for NoopPlan {
313 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
314 self.input
315 .partial_cmp(&other.input)
316 .filter(|cmp| *cmp != Ordering::Equal || self == other)
318 }
319 }
320
321 impl UserDefinedLogicalNodeCore for NoopPlan {
322 fn name(&self) -> &str {
323 "NoopPlan"
324 }
325
326 fn inputs(&self) -> Vec<&LogicalPlan> {
327 self.input.iter().collect()
328 }
329
330 fn schema(&self) -> &DFSchemaRef {
331 &self.schema
332 }
333
334 fn expressions(&self) -> Vec<Expr> {
335 self.input
336 .iter()
337 .flat_map(|child| child.expressions())
338 .collect()
339 }
340
341 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
342 write!(f, "NoopPlan")
343 }
344
345 fn with_exprs_and_inputs(
346 &self,
347 _exprs: Vec<Expr>,
348 inputs: Vec<LogicalPlan>,
349 ) -> Result<Self> {
350 Ok(Self {
351 input: inputs,
352 schema: Arc::clone(&self.schema),
353 })
354 }
355
356 fn supports_limit_pushdown(&self) -> bool {
357 true }
359 }
360
361 #[derive(Debug, PartialEq, Eq, Hash)]
362 struct NoLimitNoopPlan {
363 input: Vec<LogicalPlan>,
364 schema: DFSchemaRef,
365 }
366
367 impl PartialOrd for NoLimitNoopPlan {
369 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
370 self.input
371 .partial_cmp(&other.input)
372 .filter(|cmp| *cmp != Ordering::Equal || self == other)
374 }
375 }
376
377 impl UserDefinedLogicalNodeCore for NoLimitNoopPlan {
378 fn name(&self) -> &str {
379 "NoLimitNoopPlan"
380 }
381
382 fn inputs(&self) -> Vec<&LogicalPlan> {
383 self.input.iter().collect()
384 }
385
386 fn schema(&self) -> &DFSchemaRef {
387 &self.schema
388 }
389
390 fn expressions(&self) -> Vec<Expr> {
391 self.input
392 .iter()
393 .flat_map(|child| child.expressions())
394 .collect()
395 }
396
397 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
398 write!(f, "NoLimitNoopPlan")
399 }
400
401 fn with_exprs_and_inputs(
402 &self,
403 _exprs: Vec<Expr>,
404 inputs: Vec<LogicalPlan>,
405 ) -> Result<Self> {
406 Ok(Self {
407 input: inputs,
408 schema: Arc::clone(&self.schema),
409 })
410 }
411
412 fn supports_limit_pushdown(&self) -> bool {
413 false }
415 }
416 #[test]
417 fn limit_pushdown_basic() -> Result<()> {
418 let table_scan = test_table_scan()?;
419 let noop_plan = LogicalPlan::Extension(Extension {
420 node: Arc::new(NoopPlan {
421 input: vec![table_scan.clone()],
422 schema: Arc::clone(table_scan.schema()),
423 }),
424 });
425
426 let plan = LogicalPlanBuilder::from(noop_plan)
427 .limit(0, Some(1000))?
428 .build()?;
429
430 assert_optimized_plan_equal!(
431 plan,
432 @r"
433 Limit: skip=0, fetch=1000
434 NoopPlan
435 Limit: skip=0, fetch=1000
436 TableScan: test, fetch=1000
437 "
438 )
439 }
440
441 #[test]
442 fn limit_pushdown_with_skip() -> Result<()> {
443 let table_scan = test_table_scan()?;
444 let noop_plan = LogicalPlan::Extension(Extension {
445 node: Arc::new(NoopPlan {
446 input: vec![table_scan.clone()],
447 schema: Arc::clone(table_scan.schema()),
448 }),
449 });
450
451 let plan = LogicalPlanBuilder::from(noop_plan)
452 .limit(10, Some(1000))?
453 .build()?;
454
455 assert_optimized_plan_equal!(
456 plan,
457 @r"
458 Limit: skip=10, fetch=1000
459 NoopPlan
460 Limit: skip=0, fetch=1010
461 TableScan: test, fetch=1010
462 "
463 )
464 }
465
466 #[test]
467 fn limit_pushdown_multiple_limits() -> Result<()> {
468 let table_scan = test_table_scan()?;
469 let noop_plan = LogicalPlan::Extension(Extension {
470 node: Arc::new(NoopPlan {
471 input: vec![table_scan.clone()],
472 schema: Arc::clone(table_scan.schema()),
473 }),
474 });
475
476 let plan = LogicalPlanBuilder::from(noop_plan)
477 .limit(10, Some(1000))?
478 .limit(20, Some(500))?
479 .build()?;
480
481 assert_optimized_plan_equal!(
482 plan,
483 @r"
484 Limit: skip=30, fetch=500
485 NoopPlan
486 Limit: skip=0, fetch=530
487 TableScan: test, fetch=530
488 "
489 )
490 }
491
492 #[test]
493 fn limit_pushdown_multiple_inputs() -> Result<()> {
494 let table_scan = test_table_scan()?;
495 let noop_plan = LogicalPlan::Extension(Extension {
496 node: Arc::new(NoopPlan {
497 input: vec![table_scan.clone(), table_scan.clone()],
498 schema: Arc::clone(table_scan.schema()),
499 }),
500 });
501
502 let plan = LogicalPlanBuilder::from(noop_plan)
503 .limit(0, Some(1000))?
504 .build()?;
505
506 assert_optimized_plan_equal!(
507 plan,
508 @r"
509 Limit: skip=0, fetch=1000
510 NoopPlan
511 Limit: skip=0, fetch=1000
512 TableScan: test, fetch=1000
513 Limit: skip=0, fetch=1000
514 TableScan: test, fetch=1000
515 "
516 )
517 }
518
519 #[test]
520 fn limit_pushdown_disallowed_noop_plan() -> Result<()> {
521 let table_scan = test_table_scan()?;
522 let no_limit_noop_plan = LogicalPlan::Extension(Extension {
523 node: Arc::new(NoLimitNoopPlan {
524 input: vec![table_scan.clone()],
525 schema: Arc::clone(table_scan.schema()),
526 }),
527 });
528
529 let plan = LogicalPlanBuilder::from(no_limit_noop_plan)
530 .limit(0, Some(1000))?
531 .build()?;
532
533 assert_optimized_plan_equal!(
534 plan,
535 @r"
536 Limit: skip=0, fetch=1000
537 NoLimitNoopPlan
538 TableScan: test
539 "
540 )
541 }
542
543 #[test]
544 fn limit_pushdown_projection_table_provider() -> Result<()> {
545 let table_scan = test_table_scan()?;
546
547 let plan = LogicalPlanBuilder::from(table_scan)
548 .project(vec![col("a")])?
549 .limit(0, Some(1000))?
550 .build()?;
551
552 assert_optimized_plan_equal!(
555 plan,
556 @r"
557 Projection: test.a
558 Limit: skip=0, fetch=1000
559 TableScan: test, fetch=1000
560 "
561 )
562 }
563
564 #[test]
565 fn limit_push_down_take_smaller_limit() -> Result<()> {
566 let table_scan = test_table_scan()?;
567
568 let plan = LogicalPlanBuilder::from(table_scan)
569 .limit(0, Some(1000))?
570 .limit(0, Some(10))?
571 .build()?;
572
573 assert_optimized_plan_equal!(
577 plan,
578 @r"
579 Limit: skip=0, fetch=10
580 TableScan: test, fetch=10
581 "
582 )
583 }
584
585 #[test]
586 fn limit_doesnt_push_down_aggregation() -> Result<()> {
587 let table_scan = test_table_scan()?;
588
589 let plan = LogicalPlanBuilder::from(table_scan)
590 .aggregate(vec![col("a")], vec![max(col("b"))])?
591 .limit(0, Some(1000))?
592 .build()?;
593
594 assert_optimized_plan_equal!(
596 plan,
597 @r"
598 Limit: skip=0, fetch=1000
599 Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
600 TableScan: test
601 "
602 )
603 }
604
605 #[test]
606 fn limit_should_push_down_union() -> Result<()> {
607 let table_scan = test_table_scan()?;
608
609 let plan = LogicalPlanBuilder::from(table_scan.clone())
610 .union(LogicalPlanBuilder::from(table_scan).build()?)?
611 .limit(0, Some(1000))?
612 .build()?;
613
614 assert_optimized_plan_equal!(
616 plan,
617 @r"
618 Limit: skip=0, fetch=1000
619 Union
620 Limit: skip=0, fetch=1000
621 TableScan: test, fetch=1000
622 Limit: skip=0, fetch=1000
623 TableScan: test, fetch=1000
624 "
625 )
626 }
627
628 #[test]
629 fn limit_push_down_sort() -> Result<()> {
630 let table_scan = test_table_scan()?;
631
632 let plan = LogicalPlanBuilder::from(table_scan)
633 .sort_by(vec![col("a")])?
634 .limit(0, Some(10))?
635 .build()?;
636
637 assert_optimized_plan_equal!(
639 plan,
640 @r"
641 Limit: skip=0, fetch=10
642 Sort: test.a ASC NULLS LAST, fetch=10
643 TableScan: test
644 "
645 )
646 }
647
648 #[test]
649 fn limit_push_down_sort_skip() -> Result<()> {
650 let table_scan = test_table_scan()?;
651
652 let plan = LogicalPlanBuilder::from(table_scan)
653 .sort_by(vec![col("a")])?
654 .limit(5, Some(10))?
655 .build()?;
656
657 assert_optimized_plan_equal!(
659 plan,
660 @r"
661 Limit: skip=5, fetch=10
662 Sort: test.a ASC NULLS LAST, fetch=15
663 TableScan: test
664 "
665 )
666 }
667
668 #[test]
669 fn multi_stage_limit_recursive_to_deeper_limit() -> Result<()> {
670 let table_scan = test_table_scan()?;
671
672 let plan = LogicalPlanBuilder::from(table_scan)
673 .limit(0, Some(1000))?
674 .aggregate(vec![col("a")], vec![max(col("b"))])?
675 .limit(0, Some(10))?
676 .build()?;
677
678 assert_optimized_plan_equal!(
680 plan,
681 @r"
682 Limit: skip=0, fetch=10
683 Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
684 Limit: skip=0, fetch=1000
685 TableScan: test, fetch=1000
686 "
687 )
688 }
689
690 #[test]
691 fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> {
692 let table_scan = test_table_scan()?;
693 let plan = LogicalPlanBuilder::from(table_scan)
694 .limit(10, None)?
695 .build()?;
696
697 assert_optimized_plan_equal!(
700 plan,
701 @r"
702 Limit: skip=10, fetch=None
703 TableScan: test
704 "
705 )
706 }
707
708 #[test]
709 fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
710 let table_scan = test_table_scan()?;
711
712 let plan = LogicalPlanBuilder::from(table_scan)
713 .project(vec![col("a")])?
714 .limit(10, Some(1000))?
715 .build()?;
716
717 assert_optimized_plan_equal!(
720 plan,
721 @r"
722 Projection: test.a
723 Limit: skip=10, fetch=1000
724 TableScan: test, fetch=1010
725 "
726 )
727 }
728
729 #[test]
730 fn limit_pushdown_with_offset_after_limit() -> Result<()> {
731 let table_scan = test_table_scan()?;
732
733 let plan = LogicalPlanBuilder::from(table_scan)
734 .project(vec![col("a")])?
735 .limit(0, Some(1000))?
736 .limit(10, None)?
737 .build()?;
738
739 assert_optimized_plan_equal!(
740 plan,
741 @r"
742 Projection: test.a
743 Limit: skip=10, fetch=990
744 TableScan: test, fetch=1000
745 "
746 )
747 }
748
749 #[test]
750 fn limit_pushdown_with_limit_after_offset() -> Result<()> {
751 let table_scan = test_table_scan()?;
752
753 let plan = LogicalPlanBuilder::from(table_scan)
754 .project(vec![col("a")])?
755 .limit(10, None)?
756 .limit(0, Some(1000))?
757 .build()?;
758
759 assert_optimized_plan_equal!(
760 plan,
761 @r"
762 Projection: test.a
763 Limit: skip=10, fetch=1000
764 TableScan: test, fetch=1010
765 "
766 )
767 }
768
769 #[test]
770 fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
771 let table_scan = test_table_scan()?;
772
773 let plan = LogicalPlanBuilder::from(table_scan)
774 .limit(10, None)?
775 .limit(0, Some(1000))?
776 .limit(0, Some(10))?
777 .build()?;
778
779 assert_optimized_plan_equal!(
780 plan,
781 @r"
782 Limit: skip=10, fetch=10
783 TableScan: test, fetch=20
784 "
785 )
786 }
787
788 #[test]
789 fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
790 let table_scan = test_table_scan()?;
791
792 let plan = LogicalPlanBuilder::from(table_scan)
793 .aggregate(vec![col("a")], vec![max(col("b"))])?
794 .limit(10, Some(1000))?
795 .build()?;
796
797 assert_optimized_plan_equal!(
799 plan,
800 @r"
801 Limit: skip=10, fetch=1000
802 Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
803 TableScan: test
804 "
805 )
806 }
807
808 #[test]
809 fn limit_should_push_down_with_offset_union() -> Result<()> {
810 let table_scan = test_table_scan()?;
811
812 let plan = LogicalPlanBuilder::from(table_scan.clone())
813 .union(LogicalPlanBuilder::from(table_scan).build()?)?
814 .limit(10, Some(1000))?
815 .build()?;
816
817 assert_optimized_plan_equal!(
819 plan,
820 @r"
821 Limit: skip=10, fetch=1000
822 Union
823 Limit: skip=0, fetch=1010
824 TableScan: test, fetch=1010
825 Limit: skip=0, fetch=1010
826 TableScan: test, fetch=1010
827 "
828 )
829 }
830
831 #[test]
832 fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> {
833 let table_scan_1 = test_table_scan()?;
834 let table_scan_2 = test_table_scan_with_name("test2")?;
835
836 let plan = LogicalPlanBuilder::from(table_scan_1)
837 .join(
838 LogicalPlanBuilder::from(table_scan_2).build()?,
839 JoinType::Inner,
840 (vec!["a"], vec!["a"]),
841 None,
842 )?
843 .limit(10, Some(1000))?
844 .build()?;
845
846 assert_optimized_plan_equal!(
848 plan,
849 @r"
850 Limit: skip=10, fetch=1000
851 Inner Join: test.a = test2.a
852 TableScan: test
853 TableScan: test2
854 "
855 )
856 }
857
858 #[test]
859 fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> {
860 let table_scan_1 = test_table_scan()?;
861 let table_scan_2 = test_table_scan_with_name("test2")?;
862
863 let plan = LogicalPlanBuilder::from(table_scan_1)
864 .join(
865 LogicalPlanBuilder::from(table_scan_2).build()?,
866 JoinType::Inner,
867 (vec!["a"], vec!["a"]),
868 None,
869 )?
870 .limit(10, Some(1000))?
871 .build()?;
872
873 assert_optimized_plan_equal!(
875 plan,
876 @r"
877 Limit: skip=10, fetch=1000
878 Inner Join: test.a = test2.a
879 TableScan: test
880 TableScan: test2
881 "
882 )
883 }
884
885 #[test]
886 fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> {
887 let table_scan_1 = test_table_scan_with_name("test1")?;
888 let table_scan_2 = test_table_scan_with_name("test2")?;
889
890 let subquery = LogicalPlanBuilder::from(table_scan_1)
891 .project(vec![col("a")])?
892 .filter(col("a").eq(col("test1.a")))?
893 .build()?;
894
895 let outer_query = LogicalPlanBuilder::from(table_scan_2)
896 .project(vec![col("a")])?
897 .filter(exists(Arc::new(subquery)))?
898 .limit(10, Some(100))?
899 .build()?;
900
901 assert_optimized_plan_equal!(
903 outer_query,
904 @r"
905 Limit: skip=10, fetch=100
906 Filter: EXISTS (<subquery>)
907 Subquery:
908 Filter: test1.a = test1.a
909 Projection: test1.a
910 TableScan: test1
911 Projection: test2.a
912 TableScan: test2
913 "
914 )
915 }
916
917 #[test]
918 fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
919 let table_scan_1 = test_table_scan_with_name("test1")?;
920 let table_scan_2 = test_table_scan_with_name("test2")?;
921
922 let subquery = LogicalPlanBuilder::from(table_scan_1)
923 .project(vec![col("a")])?
924 .filter(col("a").eq(col("test1.a")))?
925 .build()?;
926
927 let outer_query = LogicalPlanBuilder::from(table_scan_2)
928 .project(vec![col("a")])?
929 .filter(exists(Arc::new(subquery)))?
930 .limit(10, Some(100))?
931 .build()?;
932
933 assert_optimized_plan_equal!(
935 outer_query,
936 @r"
937 Limit: skip=10, fetch=100
938 Filter: EXISTS (<subquery>)
939 Subquery:
940 Filter: test1.a = test1.a
941 Projection: test1.a
942 TableScan: test1
943 Projection: test2.a
944 TableScan: test2
945 "
946 )
947 }
948
949 #[test]
950 fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
951 let table_scan_1 = test_table_scan()?;
952 let table_scan_2 = test_table_scan_with_name("test2")?;
953
954 let plan = LogicalPlanBuilder::from(table_scan_1)
955 .join(
956 LogicalPlanBuilder::from(table_scan_2).build()?,
957 JoinType::Left,
958 (vec!["a"], vec!["a"]),
959 None,
960 )?
961 .limit(10, Some(1000))?
962 .build()?;
963
964 assert_optimized_plan_equal!(
966 plan,
967 @r"
968 Limit: skip=10, fetch=1000
969 Left Join: test.a = test2.a
970 Limit: skip=0, fetch=1010
971 TableScan: test, fetch=1010
972 TableScan: test2
973 "
974 )
975 }
976
977 #[test]
978 fn limit_should_push_down_right_outer_join() -> Result<()> {
979 let table_scan_1 = test_table_scan()?;
980 let table_scan_2 = test_table_scan_with_name("test2")?;
981
982 let plan = LogicalPlanBuilder::from(table_scan_1)
983 .join(
984 LogicalPlanBuilder::from(table_scan_2).build()?,
985 JoinType::Right,
986 (vec!["a"], vec!["a"]),
987 None,
988 )?
989 .limit(0, Some(1000))?
990 .build()?;
991
992 assert_optimized_plan_equal!(
994 plan,
995 @r"
996 Limit: skip=0, fetch=1000
997 Right Join: test.a = test2.a
998 TableScan: test
999 Limit: skip=0, fetch=1000
1000 TableScan: test2, fetch=1000
1001 "
1002 )
1003 }
1004
1005 #[test]
1006 fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> {
1007 let table_scan_1 = test_table_scan()?;
1008 let table_scan_2 = test_table_scan_with_name("test2")?;
1009
1010 let plan = LogicalPlanBuilder::from(table_scan_1)
1011 .join(
1012 LogicalPlanBuilder::from(table_scan_2).build()?,
1013 JoinType::Right,
1014 (vec!["a"], vec!["a"]),
1015 None,
1016 )?
1017 .limit(10, Some(1000))?
1018 .build()?;
1019
1020 assert_optimized_plan_equal!(
1022 plan,
1023 @r"
1024 Limit: skip=10, fetch=1000
1025 Right Join: test.a = test2.a
1026 TableScan: test
1027 Limit: skip=0, fetch=1010
1028 TableScan: test2, fetch=1010
1029 "
1030 )
1031 }
1032
1033 #[test]
1034 fn limit_push_down_cross_join() -> Result<()> {
1035 let table_scan_1 = test_table_scan()?;
1036 let table_scan_2 = test_table_scan_with_name("test2")?;
1037
1038 let plan = LogicalPlanBuilder::from(table_scan_1)
1039 .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
1040 .limit(0, Some(1000))?
1041 .build()?;
1042
1043 assert_optimized_plan_equal!(
1044 plan,
1045 @r"
1046 Limit: skip=0, fetch=1000
1047 Cross Join:
1048 Limit: skip=0, fetch=1000
1049 TableScan: test, fetch=1000
1050 Limit: skip=0, fetch=1000
1051 TableScan: test2, fetch=1000
1052 "
1053 )
1054 }
1055
1056 #[test]
1057 fn skip_limit_push_down_cross_join() -> Result<()> {
1058 let table_scan_1 = test_table_scan()?;
1059 let table_scan_2 = test_table_scan_with_name("test2")?;
1060
1061 let plan = LogicalPlanBuilder::from(table_scan_1)
1062 .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
1063 .limit(1000, Some(1000))?
1064 .build()?;
1065
1066 assert_optimized_plan_equal!(
1067 plan,
1068 @r"
1069 Limit: skip=1000, fetch=1000
1070 Cross Join:
1071 Limit: skip=0, fetch=2000
1072 TableScan: test, fetch=2000
1073 Limit: skip=0, fetch=2000
1074 TableScan: test2, fetch=2000
1075 "
1076 )
1077 }
1078
1079 #[test]
1080 fn merge_limit_result_empty() -> Result<()> {
1081 let scan = test_table_scan()?;
1082
1083 let plan = LogicalPlanBuilder::from(scan)
1084 .limit(0, Some(1000))?
1085 .limit(1000, None)?
1086 .build()?;
1087
1088 assert_optimized_plan_equal!(
1089 plan,
1090 @r"
1091 Limit: skip=1000, fetch=0
1092 TableScan: test, fetch=0
1093 "
1094 )
1095 }
1096
1097 #[test]
1098 fn skip_great_than_fetch() -> Result<()> {
1099 let scan = test_table_scan()?;
1100
1101 let plan = LogicalPlanBuilder::from(scan)
1102 .limit(0, Some(1))?
1103 .limit(1000, None)?
1104 .build()?;
1105
1106 assert_optimized_plan_equal!(
1107 plan,
1108 @r"
1109 Limit: skip=1000, fetch=0
1110 TableScan: test, fetch=0
1111 "
1112 )
1113 }
1114
1115 #[test]
1116 fn push_down_subquery_alias() -> Result<()> {
1117 let scan = test_table_scan()?;
1118
1119 let plan = LogicalPlanBuilder::from(scan)
1120 .alias("a")?
1121 .limit(0, Some(1))?
1122 .limit(1000, None)?
1123 .build()?;
1124
1125 assert_optimized_plan_equal!(
1126 plan,
1127 @r"
1128 SubqueryAlias: a
1129 Limit: skip=1000, fetch=0
1130 TableScan: test, fetch=0
1131 "
1132 )
1133 }
1134}