1use std::cmp::min;
21use std::sync::Arc;
22
23use crate::optimizer::ApplyOrder;
24use crate::{OptimizerConfig, OptimizerRule};
25
26use datafusion_common::tree_node::Transformed;
27use datafusion_common::utils::combine_limit;
28use datafusion_common::Result;
29use datafusion_expr::logical_plan::{Join, JoinType, Limit, LogicalPlan};
30use datafusion_expr::{lit, FetchType, SkipType};
31
32#[derive(Default, Debug)]
36pub struct PushDownLimit {}
37
38impl PushDownLimit {
39 #[allow(missing_docs)]
40 pub fn new() -> Self {
41 Self {}
42 }
43}
44
45impl OptimizerRule for PushDownLimit {
47 fn supports_rewrite(&self) -> bool {
48 true
49 }
50
51 fn rewrite(
52 &self,
53 plan: LogicalPlan,
54 _config: &dyn OptimizerConfig,
55 ) -> Result<Transformed<LogicalPlan>> {
56 let LogicalPlan::Limit(mut limit) = plan else {
57 return Ok(Transformed::no(plan));
58 };
59
60 let SkipType::Literal(skip) = limit.get_skip_type()? else {
62 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
63 };
64 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
65 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
66 };
67
68 if let LogicalPlan::Limit(child) = limit.input.as_ref() {
70 let SkipType::Literal(child_skip) = child.get_skip_type()? else {
71 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
72 };
73 let FetchType::Literal(child_fetch) = child.get_fetch_type()? else {
74 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
75 };
76
77 let (skip, fetch) = combine_limit(skip, fetch, child_skip, child_fetch);
78 let plan = LogicalPlan::Limit(Limit {
79 skip: Some(Box::new(lit(skip as i64))),
80 fetch: fetch.map(|f| Box::new(lit(f as i64))),
81 input: Arc::clone(&child.input),
82 });
83
84 #[allow(clippy::used_underscore_binding)]
86 return self.rewrite(plan, _config);
87 }
88
89 let Some(fetch) = fetch else {
91 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
92 };
93
94 match Arc::unwrap_or_clone(limit.input) {
95 LogicalPlan::TableScan(mut scan) => {
96 let rows_needed = if fetch != 0 { fetch + skip } else { 0 };
97 let new_fetch = scan
98 .fetch
99 .map(|x| min(x, rows_needed))
100 .or(Some(rows_needed));
101 if new_fetch == scan.fetch {
102 original_limit(skip, fetch, LogicalPlan::TableScan(scan))
103 } else {
104 scan.fetch = scan
106 .fetch
107 .map(|x| min(x, rows_needed))
108 .or(Some(rows_needed));
109 transformed_limit(skip, fetch, LogicalPlan::TableScan(scan))
110 }
111 }
112 LogicalPlan::Union(mut union) => {
113 union.inputs = union
115 .inputs
116 .into_iter()
117 .map(|input| make_arc_limit(0, fetch + skip, input))
118 .collect();
119 transformed_limit(skip, fetch, LogicalPlan::Union(union))
120 }
121
122 LogicalPlan::Join(join) => Ok(push_down_join(join, fetch + skip)
123 .update_data(|join| {
124 make_limit(skip, fetch, Arc::new(LogicalPlan::Join(join)))
125 })),
126
127 LogicalPlan::Sort(mut sort) => {
128 let new_fetch = {
129 let sort_fetch = skip + fetch;
130 Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
131 };
132 if new_fetch == sort.fetch {
133 if skip > 0 {
134 original_limit(skip, fetch, LogicalPlan::Sort(sort))
135 } else {
136 Ok(Transformed::yes(LogicalPlan::Sort(sort)))
137 }
138 } else {
139 sort.fetch = new_fetch;
140 limit.input = Arc::new(LogicalPlan::Sort(sort));
141 Ok(Transformed::yes(LogicalPlan::Limit(limit)))
142 }
143 }
144 LogicalPlan::Projection(mut proj) => {
145 limit.input = Arc::clone(&proj.input);
147 let new_limit = LogicalPlan::Limit(limit);
148 proj.input = Arc::new(new_limit);
149 Ok(Transformed::yes(LogicalPlan::Projection(proj)))
150 }
151 LogicalPlan::SubqueryAlias(mut subquery_alias) => {
152 limit.input = Arc::clone(&subquery_alias.input);
154 let new_limit = LogicalPlan::Limit(limit);
155 subquery_alias.input = Arc::new(new_limit);
156 Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias)))
157 }
158 LogicalPlan::Extension(extension_plan)
159 if extension_plan.node.supports_limit_pushdown() =>
160 {
161 let new_children = extension_plan
162 .node
163 .inputs()
164 .into_iter()
165 .map(|child| {
166 LogicalPlan::Limit(Limit {
167 skip: None,
168 fetch: Some(Box::new(lit((fetch + skip) as i64))),
169 input: Arc::new(child.clone()),
170 })
171 })
172 .collect::<Vec<_>>();
173
174 let child_plan = LogicalPlan::Extension(extension_plan);
176 let new_extension =
177 child_plan.with_new_exprs(child_plan.expressions(), new_children)?;
178
179 transformed_limit(skip, fetch, new_extension)
180 }
181 input => original_limit(skip, fetch, input),
182 }
183 }
184
185 fn name(&self) -> &str {
186 "push_down_limit"
187 }
188
189 fn apply_order(&self) -> Option<ApplyOrder> {
190 Some(ApplyOrder::TopDown)
191 }
192}
193
194fn make_limit(skip: usize, fetch: usize, input: Arc<LogicalPlan>) -> LogicalPlan {
207 LogicalPlan::Limit(Limit {
208 skip: Some(Box::new(lit(skip as i64))),
209 fetch: Some(Box::new(lit(fetch as i64))),
210 input,
211 })
212}
213
214fn make_arc_limit(
216 skip: usize,
217 fetch: usize,
218 input: Arc<LogicalPlan>,
219) -> Arc<LogicalPlan> {
220 Arc::new(make_limit(skip, fetch, input))
221}
222
223fn original_limit(
225 skip: usize,
226 fetch: usize,
227 input: LogicalPlan,
228) -> Result<Transformed<LogicalPlan>> {
229 Ok(Transformed::no(make_limit(skip, fetch, Arc::new(input))))
230}
231
232fn transformed_limit(
234 skip: usize,
235 fetch: usize,
236 input: LogicalPlan,
237) -> Result<Transformed<LogicalPlan>> {
238 Ok(Transformed::yes(make_limit(skip, fetch, Arc::new(input))))
239}
240
241fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
243 use JoinType::*;
244
245 fn is_cross_join(join: &Join) -> bool {
247 join.join_type == Inner && join.on.is_empty() && join.filter.is_none()
248 }
249
250 let (left_limit, right_limit) = if is_cross_join(&join) {
251 (Some(limit), Some(limit))
252 } else {
253 match join.join_type {
254 Left => (Some(limit), None),
255 Right => (None, Some(limit)),
256 _ => (None, None),
257 }
258 };
259
260 if left_limit.is_none() && right_limit.is_none() {
261 return Transformed::no(join);
262 }
263 if let Some(limit) = left_limit {
264 join.left = make_arc_limit(0, limit, join.left);
265 }
266 if let Some(limit) = right_limit {
267 join.right = make_arc_limit(0, limit, join.right);
268 }
269 Transformed::yes(join)
270}
271
272#[cfg(test)]
273mod test {
274 use std::cmp::Ordering;
275 use std::fmt::{Debug, Formatter};
276 use std::vec;
277
278 use super::*;
279 use crate::assert_optimized_plan_eq_snapshot;
280 use crate::test::*;
281
282 use crate::OptimizerContext;
283 use datafusion_common::DFSchemaRef;
284 use datafusion_expr::{
285 col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, Extension,
286 UserDefinedLogicalNodeCore,
287 };
288 use datafusion_functions_aggregate::expr_fn::max;
289
290 macro_rules! assert_optimized_plan_equal {
291 (
292 $plan:expr,
293 @ $expected:literal $(,)?
294 ) => {{
295 let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
296 let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(PushDownLimit::new())];
297 assert_optimized_plan_eq_snapshot!(
298 optimizer_ctx,
299 rules,
300 $plan,
301 @ $expected,
302 )
303 }};
304 }
305
306 #[derive(Debug, PartialEq, Eq, Hash)]
307 pub struct NoopPlan {
308 input: Vec<LogicalPlan>,
309 schema: DFSchemaRef,
310 }
311
312 impl PartialOrd for NoopPlan {
314 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
315 self.input.partial_cmp(&other.input)
316 }
317 }
318
319 impl UserDefinedLogicalNodeCore for NoopPlan {
320 fn name(&self) -> &str {
321 "NoopPlan"
322 }
323
324 fn inputs(&self) -> Vec<&LogicalPlan> {
325 self.input.iter().collect()
326 }
327
328 fn schema(&self) -> &DFSchemaRef {
329 &self.schema
330 }
331
332 fn expressions(&self) -> Vec<Expr> {
333 self.input
334 .iter()
335 .flat_map(|child| child.expressions())
336 .collect()
337 }
338
339 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
340 write!(f, "NoopPlan")
341 }
342
343 fn with_exprs_and_inputs(
344 &self,
345 _exprs: Vec<Expr>,
346 inputs: Vec<LogicalPlan>,
347 ) -> Result<Self> {
348 Ok(Self {
349 input: inputs,
350 schema: Arc::clone(&self.schema),
351 })
352 }
353
354 fn supports_limit_pushdown(&self) -> bool {
355 true }
357 }
358
359 #[derive(Debug, PartialEq, Eq, Hash)]
360 struct NoLimitNoopPlan {
361 input: Vec<LogicalPlan>,
362 schema: DFSchemaRef,
363 }
364
365 impl PartialOrd for NoLimitNoopPlan {
367 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
368 self.input.partial_cmp(&other.input)
369 }
370 }
371
372 impl UserDefinedLogicalNodeCore for NoLimitNoopPlan {
373 fn name(&self) -> &str {
374 "NoLimitNoopPlan"
375 }
376
377 fn inputs(&self) -> Vec<&LogicalPlan> {
378 self.input.iter().collect()
379 }
380
381 fn schema(&self) -> &DFSchemaRef {
382 &self.schema
383 }
384
385 fn expressions(&self) -> Vec<Expr> {
386 self.input
387 .iter()
388 .flat_map(|child| child.expressions())
389 .collect()
390 }
391
392 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
393 write!(f, "NoLimitNoopPlan")
394 }
395
396 fn with_exprs_and_inputs(
397 &self,
398 _exprs: Vec<Expr>,
399 inputs: Vec<LogicalPlan>,
400 ) -> Result<Self> {
401 Ok(Self {
402 input: inputs,
403 schema: Arc::clone(&self.schema),
404 })
405 }
406
407 fn supports_limit_pushdown(&self) -> bool {
408 false }
410 }
411 #[test]
412 fn limit_pushdown_basic() -> Result<()> {
413 let table_scan = test_table_scan()?;
414 let noop_plan = LogicalPlan::Extension(Extension {
415 node: Arc::new(NoopPlan {
416 input: vec![table_scan.clone()],
417 schema: Arc::clone(table_scan.schema()),
418 }),
419 });
420
421 let plan = LogicalPlanBuilder::from(noop_plan)
422 .limit(0, Some(1000))?
423 .build()?;
424
425 assert_optimized_plan_equal!(
426 plan,
427 @r"
428 Limit: skip=0, fetch=1000
429 NoopPlan
430 Limit: skip=0, fetch=1000
431 TableScan: test, fetch=1000
432 "
433 )
434 }
435
436 #[test]
437 fn limit_pushdown_with_skip() -> Result<()> {
438 let table_scan = test_table_scan()?;
439 let noop_plan = LogicalPlan::Extension(Extension {
440 node: Arc::new(NoopPlan {
441 input: vec![table_scan.clone()],
442 schema: Arc::clone(table_scan.schema()),
443 }),
444 });
445
446 let plan = LogicalPlanBuilder::from(noop_plan)
447 .limit(10, Some(1000))?
448 .build()?;
449
450 assert_optimized_plan_equal!(
451 plan,
452 @r"
453 Limit: skip=10, fetch=1000
454 NoopPlan
455 Limit: skip=0, fetch=1010
456 TableScan: test, fetch=1010
457 "
458 )
459 }
460
461 #[test]
462 fn limit_pushdown_multiple_limits() -> Result<()> {
463 let table_scan = test_table_scan()?;
464 let noop_plan = LogicalPlan::Extension(Extension {
465 node: Arc::new(NoopPlan {
466 input: vec![table_scan.clone()],
467 schema: Arc::clone(table_scan.schema()),
468 }),
469 });
470
471 let plan = LogicalPlanBuilder::from(noop_plan)
472 .limit(10, Some(1000))?
473 .limit(20, Some(500))?
474 .build()?;
475
476 assert_optimized_plan_equal!(
477 plan,
478 @r"
479 Limit: skip=30, fetch=500
480 NoopPlan
481 Limit: skip=0, fetch=530
482 TableScan: test, fetch=530
483 "
484 )
485 }
486
487 #[test]
488 fn limit_pushdown_multiple_inputs() -> Result<()> {
489 let table_scan = test_table_scan()?;
490 let noop_plan = LogicalPlan::Extension(Extension {
491 node: Arc::new(NoopPlan {
492 input: vec![table_scan.clone(), table_scan.clone()],
493 schema: Arc::clone(table_scan.schema()),
494 }),
495 });
496
497 let plan = LogicalPlanBuilder::from(noop_plan)
498 .limit(0, Some(1000))?
499 .build()?;
500
501 assert_optimized_plan_equal!(
502 plan,
503 @r"
504 Limit: skip=0, fetch=1000
505 NoopPlan
506 Limit: skip=0, fetch=1000
507 TableScan: test, fetch=1000
508 Limit: skip=0, fetch=1000
509 TableScan: test, fetch=1000
510 "
511 )
512 }
513
514 #[test]
515 fn limit_pushdown_disallowed_noop_plan() -> Result<()> {
516 let table_scan = test_table_scan()?;
517 let no_limit_noop_plan = LogicalPlan::Extension(Extension {
518 node: Arc::new(NoLimitNoopPlan {
519 input: vec![table_scan.clone()],
520 schema: Arc::clone(table_scan.schema()),
521 }),
522 });
523
524 let plan = LogicalPlanBuilder::from(no_limit_noop_plan)
525 .limit(0, Some(1000))?
526 .build()?;
527
528 assert_optimized_plan_equal!(
529 plan,
530 @r"
531 Limit: skip=0, fetch=1000
532 NoLimitNoopPlan
533 TableScan: test
534 "
535 )
536 }
537
538 #[test]
539 fn limit_pushdown_projection_table_provider() -> Result<()> {
540 let table_scan = test_table_scan()?;
541
542 let plan = LogicalPlanBuilder::from(table_scan)
543 .project(vec![col("a")])?
544 .limit(0, Some(1000))?
545 .build()?;
546
547 assert_optimized_plan_equal!(
550 plan,
551 @r"
552 Projection: test.a
553 Limit: skip=0, fetch=1000
554 TableScan: test, fetch=1000
555 "
556 )
557 }
558
559 #[test]
560 fn limit_push_down_take_smaller_limit() -> Result<()> {
561 let table_scan = test_table_scan()?;
562
563 let plan = LogicalPlanBuilder::from(table_scan)
564 .limit(0, Some(1000))?
565 .limit(0, Some(10))?
566 .build()?;
567
568 assert_optimized_plan_equal!(
572 plan,
573 @r"
574 Limit: skip=0, fetch=10
575 TableScan: test, fetch=10
576 "
577 )
578 }
579
580 #[test]
581 fn limit_doesnt_push_down_aggregation() -> Result<()> {
582 let table_scan = test_table_scan()?;
583
584 let plan = LogicalPlanBuilder::from(table_scan)
585 .aggregate(vec![col("a")], vec![max(col("b"))])?
586 .limit(0, Some(1000))?
587 .build()?;
588
589 assert_optimized_plan_equal!(
591 plan,
592 @r"
593 Limit: skip=0, fetch=1000
594 Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
595 TableScan: test
596 "
597 )
598 }
599
600 #[test]
601 fn limit_should_push_down_union() -> Result<()> {
602 let table_scan = test_table_scan()?;
603
604 let plan = LogicalPlanBuilder::from(table_scan.clone())
605 .union(LogicalPlanBuilder::from(table_scan).build()?)?
606 .limit(0, Some(1000))?
607 .build()?;
608
609 assert_optimized_plan_equal!(
611 plan,
612 @r"
613 Limit: skip=0, fetch=1000
614 Union
615 Limit: skip=0, fetch=1000
616 TableScan: test, fetch=1000
617 Limit: skip=0, fetch=1000
618 TableScan: test, fetch=1000
619 "
620 )
621 }
622
623 #[test]
624 fn limit_push_down_sort() -> Result<()> {
625 let table_scan = test_table_scan()?;
626
627 let plan = LogicalPlanBuilder::from(table_scan)
628 .sort_by(vec![col("a")])?
629 .limit(0, Some(10))?
630 .build()?;
631
632 assert_optimized_plan_equal!(
634 plan,
635 @r"
636 Limit: skip=0, fetch=10
637 Sort: test.a ASC NULLS LAST, fetch=10
638 TableScan: test
639 "
640 )
641 }
642
643 #[test]
644 fn limit_push_down_sort_skip() -> Result<()> {
645 let table_scan = test_table_scan()?;
646
647 let plan = LogicalPlanBuilder::from(table_scan)
648 .sort_by(vec![col("a")])?
649 .limit(5, Some(10))?
650 .build()?;
651
652 assert_optimized_plan_equal!(
654 plan,
655 @r"
656 Limit: skip=5, fetch=10
657 Sort: test.a ASC NULLS LAST, fetch=15
658 TableScan: test
659 "
660 )
661 }
662
663 #[test]
664 fn multi_stage_limit_recursive_to_deeper_limit() -> Result<()> {
665 let table_scan = test_table_scan()?;
666
667 let plan = LogicalPlanBuilder::from(table_scan)
668 .limit(0, Some(1000))?
669 .aggregate(vec![col("a")], vec![max(col("b"))])?
670 .limit(0, Some(10))?
671 .build()?;
672
673 assert_optimized_plan_equal!(
675 plan,
676 @r"
677 Limit: skip=0, fetch=10
678 Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
679 Limit: skip=0, fetch=1000
680 TableScan: test, fetch=1000
681 "
682 )
683 }
684
685 #[test]
686 fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> {
687 let table_scan = test_table_scan()?;
688 let plan = LogicalPlanBuilder::from(table_scan)
689 .limit(10, None)?
690 .build()?;
691
692 assert_optimized_plan_equal!(
695 plan,
696 @r"
697 Limit: skip=10, fetch=None
698 TableScan: test
699 "
700 )
701 }
702
703 #[test]
704 fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
705 let table_scan = test_table_scan()?;
706
707 let plan = LogicalPlanBuilder::from(table_scan)
708 .project(vec![col("a")])?
709 .limit(10, Some(1000))?
710 .build()?;
711
712 assert_optimized_plan_equal!(
715 plan,
716 @r"
717 Projection: test.a
718 Limit: skip=10, fetch=1000
719 TableScan: test, fetch=1010
720 "
721 )
722 }
723
724 #[test]
725 fn limit_pushdown_with_offset_after_limit() -> Result<()> {
726 let table_scan = test_table_scan()?;
727
728 let plan = LogicalPlanBuilder::from(table_scan)
729 .project(vec![col("a")])?
730 .limit(0, Some(1000))?
731 .limit(10, None)?
732 .build()?;
733
734 assert_optimized_plan_equal!(
735 plan,
736 @r"
737 Projection: test.a
738 Limit: skip=10, fetch=990
739 TableScan: test, fetch=1000
740 "
741 )
742 }
743
744 #[test]
745 fn limit_pushdown_with_limit_after_offset() -> Result<()> {
746 let table_scan = test_table_scan()?;
747
748 let plan = LogicalPlanBuilder::from(table_scan)
749 .project(vec![col("a")])?
750 .limit(10, None)?
751 .limit(0, Some(1000))?
752 .build()?;
753
754 assert_optimized_plan_equal!(
755 plan,
756 @r"
757 Projection: test.a
758 Limit: skip=10, fetch=1000
759 TableScan: test, fetch=1010
760 "
761 )
762 }
763
764 #[test]
765 fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
766 let table_scan = test_table_scan()?;
767
768 let plan = LogicalPlanBuilder::from(table_scan)
769 .limit(10, None)?
770 .limit(0, Some(1000))?
771 .limit(0, Some(10))?
772 .build()?;
773
774 assert_optimized_plan_equal!(
775 plan,
776 @r"
777 Limit: skip=10, fetch=10
778 TableScan: test, fetch=20
779 "
780 )
781 }
782
783 #[test]
784 fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
785 let table_scan = test_table_scan()?;
786
787 let plan = LogicalPlanBuilder::from(table_scan)
788 .aggregate(vec![col("a")], vec![max(col("b"))])?
789 .limit(10, Some(1000))?
790 .build()?;
791
792 assert_optimized_plan_equal!(
794 plan,
795 @r"
796 Limit: skip=10, fetch=1000
797 Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]
798 TableScan: test
799 "
800 )
801 }
802
803 #[test]
804 fn limit_should_push_down_with_offset_union() -> Result<()> {
805 let table_scan = test_table_scan()?;
806
807 let plan = LogicalPlanBuilder::from(table_scan.clone())
808 .union(LogicalPlanBuilder::from(table_scan).build()?)?
809 .limit(10, Some(1000))?
810 .build()?;
811
812 assert_optimized_plan_equal!(
814 plan,
815 @r"
816 Limit: skip=10, fetch=1000
817 Union
818 Limit: skip=0, fetch=1010
819 TableScan: test, fetch=1010
820 Limit: skip=0, fetch=1010
821 TableScan: test, fetch=1010
822 "
823 )
824 }
825
826 #[test]
827 fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> {
828 let table_scan_1 = test_table_scan()?;
829 let table_scan_2 = test_table_scan_with_name("test2")?;
830
831 let plan = LogicalPlanBuilder::from(table_scan_1)
832 .join(
833 LogicalPlanBuilder::from(table_scan_2).build()?,
834 JoinType::Inner,
835 (vec!["a"], vec!["a"]),
836 None,
837 )?
838 .limit(10, Some(1000))?
839 .build()?;
840
841 assert_optimized_plan_equal!(
843 plan,
844 @r"
845 Limit: skip=10, fetch=1000
846 Inner Join: test.a = test2.a
847 TableScan: test
848 TableScan: test2
849 "
850 )
851 }
852
853 #[test]
854 fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> {
855 let table_scan_1 = test_table_scan()?;
856 let table_scan_2 = test_table_scan_with_name("test2")?;
857
858 let plan = LogicalPlanBuilder::from(table_scan_1)
859 .join(
860 LogicalPlanBuilder::from(table_scan_2).build()?,
861 JoinType::Inner,
862 (vec!["a"], vec!["a"]),
863 None,
864 )?
865 .limit(10, Some(1000))?
866 .build()?;
867
868 assert_optimized_plan_equal!(
870 plan,
871 @r"
872 Limit: skip=10, fetch=1000
873 Inner Join: test.a = test2.a
874 TableScan: test
875 TableScan: test2
876 "
877 )
878 }
879
880 #[test]
881 fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> {
882 let table_scan_1 = test_table_scan_with_name("test1")?;
883 let table_scan_2 = test_table_scan_with_name("test2")?;
884
885 let subquery = LogicalPlanBuilder::from(table_scan_1)
886 .project(vec![col("a")])?
887 .filter(col("a").eq(col("test1.a")))?
888 .build()?;
889
890 let outer_query = LogicalPlanBuilder::from(table_scan_2)
891 .project(vec![col("a")])?
892 .filter(exists(Arc::new(subquery)))?
893 .limit(10, Some(100))?
894 .build()?;
895
896 assert_optimized_plan_equal!(
898 outer_query,
899 @r"
900 Limit: skip=10, fetch=100
901 Filter: EXISTS (<subquery>)
902 Subquery:
903 Filter: test1.a = test1.a
904 Projection: test1.a
905 TableScan: test1
906 Projection: test2.a
907 TableScan: test2
908 "
909 )
910 }
911
912 #[test]
913 fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
914 let table_scan_1 = test_table_scan_with_name("test1")?;
915 let table_scan_2 = test_table_scan_with_name("test2")?;
916
917 let subquery = LogicalPlanBuilder::from(table_scan_1)
918 .project(vec![col("a")])?
919 .filter(col("a").eq(col("test1.a")))?
920 .build()?;
921
922 let outer_query = LogicalPlanBuilder::from(table_scan_2)
923 .project(vec![col("a")])?
924 .filter(exists(Arc::new(subquery)))?
925 .limit(10, Some(100))?
926 .build()?;
927
928 assert_optimized_plan_equal!(
930 outer_query,
931 @r"
932 Limit: skip=10, fetch=100
933 Filter: EXISTS (<subquery>)
934 Subquery:
935 Filter: test1.a = test1.a
936 Projection: test1.a
937 TableScan: test1
938 Projection: test2.a
939 TableScan: test2
940 "
941 )
942 }
943
944 #[test]
945 fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
946 let table_scan_1 = test_table_scan()?;
947 let table_scan_2 = test_table_scan_with_name("test2")?;
948
949 let plan = LogicalPlanBuilder::from(table_scan_1)
950 .join(
951 LogicalPlanBuilder::from(table_scan_2).build()?,
952 JoinType::Left,
953 (vec!["a"], vec!["a"]),
954 None,
955 )?
956 .limit(10, Some(1000))?
957 .build()?;
958
959 assert_optimized_plan_equal!(
961 plan,
962 @r"
963 Limit: skip=10, fetch=1000
964 Left Join: test.a = test2.a
965 Limit: skip=0, fetch=1010
966 TableScan: test, fetch=1010
967 TableScan: test2
968 "
969 )
970 }
971
972 #[test]
973 fn limit_should_push_down_right_outer_join() -> Result<()> {
974 let table_scan_1 = test_table_scan()?;
975 let table_scan_2 = test_table_scan_with_name("test2")?;
976
977 let plan = LogicalPlanBuilder::from(table_scan_1)
978 .join(
979 LogicalPlanBuilder::from(table_scan_2).build()?,
980 JoinType::Right,
981 (vec!["a"], vec!["a"]),
982 None,
983 )?
984 .limit(0, Some(1000))?
985 .build()?;
986
987 assert_optimized_plan_equal!(
989 plan,
990 @r"
991 Limit: skip=0, fetch=1000
992 Right Join: test.a = test2.a
993 TableScan: test
994 Limit: skip=0, fetch=1000
995 TableScan: test2, fetch=1000
996 "
997 )
998 }
999
1000 #[test]
1001 fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> {
1002 let table_scan_1 = test_table_scan()?;
1003 let table_scan_2 = test_table_scan_with_name("test2")?;
1004
1005 let plan = LogicalPlanBuilder::from(table_scan_1)
1006 .join(
1007 LogicalPlanBuilder::from(table_scan_2).build()?,
1008 JoinType::Right,
1009 (vec!["a"], vec!["a"]),
1010 None,
1011 )?
1012 .limit(10, Some(1000))?
1013 .build()?;
1014
1015 assert_optimized_plan_equal!(
1017 plan,
1018 @r"
1019 Limit: skip=10, fetch=1000
1020 Right Join: test.a = test2.a
1021 TableScan: test
1022 Limit: skip=0, fetch=1010
1023 TableScan: test2, fetch=1010
1024 "
1025 )
1026 }
1027
1028 #[test]
1029 fn limit_push_down_cross_join() -> Result<()> {
1030 let table_scan_1 = test_table_scan()?;
1031 let table_scan_2 = test_table_scan_with_name("test2")?;
1032
1033 let plan = LogicalPlanBuilder::from(table_scan_1)
1034 .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
1035 .limit(0, Some(1000))?
1036 .build()?;
1037
1038 assert_optimized_plan_equal!(
1039 plan,
1040 @r"
1041 Limit: skip=0, fetch=1000
1042 Cross Join:
1043 Limit: skip=0, fetch=1000
1044 TableScan: test, fetch=1000
1045 Limit: skip=0, fetch=1000
1046 TableScan: test2, fetch=1000
1047 "
1048 )
1049 }
1050
1051 #[test]
1052 fn skip_limit_push_down_cross_join() -> Result<()> {
1053 let table_scan_1 = test_table_scan()?;
1054 let table_scan_2 = test_table_scan_with_name("test2")?;
1055
1056 let plan = LogicalPlanBuilder::from(table_scan_1)
1057 .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
1058 .limit(1000, Some(1000))?
1059 .build()?;
1060
1061 assert_optimized_plan_equal!(
1062 plan,
1063 @r"
1064 Limit: skip=1000, fetch=1000
1065 Cross Join:
1066 Limit: skip=0, fetch=2000
1067 TableScan: test, fetch=2000
1068 Limit: skip=0, fetch=2000
1069 TableScan: test2, fetch=2000
1070 "
1071 )
1072 }
1073
1074 #[test]
1075 fn merge_limit_result_empty() -> Result<()> {
1076 let scan = test_table_scan()?;
1077
1078 let plan = LogicalPlanBuilder::from(scan)
1079 .limit(0, Some(1000))?
1080 .limit(1000, None)?
1081 .build()?;
1082
1083 assert_optimized_plan_equal!(
1084 plan,
1085 @r"
1086 Limit: skip=1000, fetch=0
1087 TableScan: test, fetch=0
1088 "
1089 )
1090 }
1091
1092 #[test]
1093 fn skip_great_than_fetch() -> Result<()> {
1094 let scan = test_table_scan()?;
1095
1096 let plan = LogicalPlanBuilder::from(scan)
1097 .limit(0, Some(1))?
1098 .limit(1000, None)?
1099 .build()?;
1100
1101 assert_optimized_plan_equal!(
1102 plan,
1103 @r"
1104 Limit: skip=1000, fetch=0
1105 TableScan: test, fetch=0
1106 "
1107 )
1108 }
1109
1110 #[test]
1111 fn push_down_subquery_alias() -> Result<()> {
1112 let scan = test_table_scan()?;
1113
1114 let plan = LogicalPlanBuilder::from(scan)
1115 .alias("a")?
1116 .limit(0, Some(1))?
1117 .limit(1000, None)?
1118 .build()?;
1119
1120 assert_optimized_plan_equal!(
1121 plan,
1122 @r"
1123 SubqueryAlias: a
1124 Limit: skip=1000, fetch=0
1125 TableScan: test, fetch=0
1126 "
1127 )
1128 }
1129}