1use std::cmp::min;
21use std::sync::Arc;
22
23use crate::optimizer::ApplyOrder;
24use crate::{OptimizerConfig, OptimizerRule};
25
26use datafusion_common::tree_node::Transformed;
27use datafusion_common::utils::combine_limit;
28use datafusion_common::Result;
29use datafusion_expr::logical_plan::{Join, JoinType, Limit, LogicalPlan};
30use datafusion_expr::{lit, FetchType, SkipType};
31
32#[derive(Default, Debug)]
36pub struct PushDownLimit {}
37
38impl PushDownLimit {
39 #[allow(missing_docs)]
40 pub fn new() -> Self {
41 Self {}
42 }
43}
44
45impl OptimizerRule for PushDownLimit {
47 fn supports_rewrite(&self) -> bool {
48 true
49 }
50
51 fn rewrite(
52 &self,
53 plan: LogicalPlan,
54 _config: &dyn OptimizerConfig,
55 ) -> Result<Transformed<LogicalPlan>> {
56 let LogicalPlan::Limit(mut limit) = plan else {
57 return Ok(Transformed::no(plan));
58 };
59
60 let SkipType::Literal(skip) = limit.get_skip_type()? else {
62 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
63 };
64 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
65 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
66 };
67
68 if let LogicalPlan::Limit(child) = limit.input.as_ref() {
70 let SkipType::Literal(child_skip) = child.get_skip_type()? else {
71 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
72 };
73 let FetchType::Literal(child_fetch) = child.get_fetch_type()? else {
74 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
75 };
76
77 let (skip, fetch) = combine_limit(skip, fetch, child_skip, child_fetch);
78 let plan = LogicalPlan::Limit(Limit {
79 skip: Some(Box::new(lit(skip as i64))),
80 fetch: fetch.map(|f| Box::new(lit(f as i64))),
81 input: Arc::clone(&child.input),
82 });
83
84 #[allow(clippy::used_underscore_binding)]
86 return self.rewrite(plan, _config);
87 }
88
89 let Some(fetch) = fetch else {
91 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
92 };
93
94 match Arc::unwrap_or_clone(limit.input) {
95 LogicalPlan::TableScan(mut scan) => {
96 let rows_needed = if fetch != 0 { fetch + skip } else { 0 };
97 let new_fetch = scan
98 .fetch
99 .map(|x| min(x, rows_needed))
100 .or(Some(rows_needed));
101 if new_fetch == scan.fetch {
102 original_limit(skip, fetch, LogicalPlan::TableScan(scan))
103 } else {
104 scan.fetch = scan
106 .fetch
107 .map(|x| min(x, rows_needed))
108 .or(Some(rows_needed));
109 transformed_limit(skip, fetch, LogicalPlan::TableScan(scan))
110 }
111 }
112 LogicalPlan::Union(mut union) => {
113 union.inputs = union
115 .inputs
116 .into_iter()
117 .map(|input| make_arc_limit(0, fetch + skip, input))
118 .collect();
119 transformed_limit(skip, fetch, LogicalPlan::Union(union))
120 }
121
122 LogicalPlan::Join(join) => Ok(push_down_join(join, fetch + skip)
123 .update_data(|join| {
124 make_limit(skip, fetch, Arc::new(LogicalPlan::Join(join)))
125 })),
126
127 LogicalPlan::Sort(mut sort) => {
128 let new_fetch = {
129 let sort_fetch = skip + fetch;
130 Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
131 };
132 if new_fetch == sort.fetch {
133 if skip > 0 {
134 original_limit(skip, fetch, LogicalPlan::Sort(sort))
135 } else {
136 Ok(Transformed::yes(LogicalPlan::Sort(sort)))
137 }
138 } else {
139 sort.fetch = new_fetch;
140 limit.input = Arc::new(LogicalPlan::Sort(sort));
141 Ok(Transformed::yes(LogicalPlan::Limit(limit)))
142 }
143 }
144 LogicalPlan::Projection(mut proj) => {
145 limit.input = Arc::clone(&proj.input);
147 let new_limit = LogicalPlan::Limit(limit);
148 proj.input = Arc::new(new_limit);
149 Ok(Transformed::yes(LogicalPlan::Projection(proj)))
150 }
151 LogicalPlan::SubqueryAlias(mut subquery_alias) => {
152 limit.input = Arc::clone(&subquery_alias.input);
154 let new_limit = LogicalPlan::Limit(limit);
155 subquery_alias.input = Arc::new(new_limit);
156 Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias)))
157 }
158 LogicalPlan::Extension(extension_plan)
159 if extension_plan.node.supports_limit_pushdown() =>
160 {
161 let new_children = extension_plan
162 .node
163 .inputs()
164 .into_iter()
165 .map(|child| {
166 LogicalPlan::Limit(Limit {
167 skip: None,
168 fetch: Some(Box::new(lit((fetch + skip) as i64))),
169 input: Arc::new(child.clone()),
170 })
171 })
172 .collect::<Vec<_>>();
173
174 let child_plan = LogicalPlan::Extension(extension_plan);
176 let new_extension =
177 child_plan.with_new_exprs(child_plan.expressions(), new_children)?;
178
179 transformed_limit(skip, fetch, new_extension)
180 }
181 input => original_limit(skip, fetch, input),
182 }
183 }
184
185 fn name(&self) -> &str {
186 "push_down_limit"
187 }
188
189 fn apply_order(&self) -> Option<ApplyOrder> {
190 Some(ApplyOrder::TopDown)
191 }
192}
193
194fn make_limit(skip: usize, fetch: usize, input: Arc<LogicalPlan>) -> LogicalPlan {
207 LogicalPlan::Limit(Limit {
208 skip: Some(Box::new(lit(skip as i64))),
209 fetch: Some(Box::new(lit(fetch as i64))),
210 input,
211 })
212}
213
214fn make_arc_limit(
216 skip: usize,
217 fetch: usize,
218 input: Arc<LogicalPlan>,
219) -> Arc<LogicalPlan> {
220 Arc::new(make_limit(skip, fetch, input))
221}
222
223fn original_limit(
225 skip: usize,
226 fetch: usize,
227 input: LogicalPlan,
228) -> Result<Transformed<LogicalPlan>> {
229 Ok(Transformed::no(make_limit(skip, fetch, Arc::new(input))))
230}
231
232fn transformed_limit(
234 skip: usize,
235 fetch: usize,
236 input: LogicalPlan,
237) -> Result<Transformed<LogicalPlan>> {
238 Ok(Transformed::yes(make_limit(skip, fetch, Arc::new(input))))
239}
240
241fn push_down_join(mut join: Join, limit: usize) -> Transformed<Join> {
243 use JoinType::*;
244
245 fn is_cross_join(join: &Join) -> bool {
247 join.join_type == Inner && join.on.is_empty() && join.filter.is_none()
248 }
249
250 let (left_limit, right_limit) = if is_cross_join(&join) {
251 (Some(limit), Some(limit))
252 } else {
253 match join.join_type {
254 Left => (Some(limit), None),
255 Right => (None, Some(limit)),
256 _ => (None, None),
257 }
258 };
259
260 if left_limit.is_none() && right_limit.is_none() {
261 return Transformed::no(join);
262 }
263 if let Some(limit) = left_limit {
264 join.left = make_arc_limit(0, limit, join.left);
265 }
266 if let Some(limit) = right_limit {
267 join.right = make_arc_limit(0, limit, join.right);
268 }
269 Transformed::yes(join)
270}
271
272#[cfg(test)]
273mod test {
274 use std::cmp::Ordering;
275 use std::fmt::{Debug, Formatter};
276 use std::vec;
277
278 use super::*;
279 use crate::test::*;
280
281 use datafusion_common::DFSchemaRef;
282 use datafusion_expr::{
283 col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, Extension,
284 UserDefinedLogicalNodeCore,
285 };
286 use datafusion_functions_aggregate::expr_fn::max;
287
288 fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
289 assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan, expected)
290 }
291
292 #[derive(Debug, PartialEq, Eq, Hash)]
293 pub struct NoopPlan {
294 input: Vec<LogicalPlan>,
295 schema: DFSchemaRef,
296 }
297
298 impl PartialOrd for NoopPlan {
300 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
301 self.input.partial_cmp(&other.input)
302 }
303 }
304
305 impl UserDefinedLogicalNodeCore for NoopPlan {
306 fn name(&self) -> &str {
307 "NoopPlan"
308 }
309
310 fn inputs(&self) -> Vec<&LogicalPlan> {
311 self.input.iter().collect()
312 }
313
314 fn schema(&self) -> &DFSchemaRef {
315 &self.schema
316 }
317
318 fn expressions(&self) -> Vec<Expr> {
319 self.input
320 .iter()
321 .flat_map(|child| child.expressions())
322 .collect()
323 }
324
325 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
326 write!(f, "NoopPlan")
327 }
328
329 fn with_exprs_and_inputs(
330 &self,
331 _exprs: Vec<Expr>,
332 inputs: Vec<LogicalPlan>,
333 ) -> Result<Self> {
334 Ok(Self {
335 input: inputs,
336 schema: Arc::clone(&self.schema),
337 })
338 }
339
340 fn supports_limit_pushdown(&self) -> bool {
341 true }
343 }
344
345 #[derive(Debug, PartialEq, Eq, Hash)]
346 struct NoLimitNoopPlan {
347 input: Vec<LogicalPlan>,
348 schema: DFSchemaRef,
349 }
350
351 impl PartialOrd for NoLimitNoopPlan {
353 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
354 self.input.partial_cmp(&other.input)
355 }
356 }
357
358 impl UserDefinedLogicalNodeCore for NoLimitNoopPlan {
359 fn name(&self) -> &str {
360 "NoLimitNoopPlan"
361 }
362
363 fn inputs(&self) -> Vec<&LogicalPlan> {
364 self.input.iter().collect()
365 }
366
367 fn schema(&self) -> &DFSchemaRef {
368 &self.schema
369 }
370
371 fn expressions(&self) -> Vec<Expr> {
372 self.input
373 .iter()
374 .flat_map(|child| child.expressions())
375 .collect()
376 }
377
378 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
379 write!(f, "NoLimitNoopPlan")
380 }
381
382 fn with_exprs_and_inputs(
383 &self,
384 _exprs: Vec<Expr>,
385 inputs: Vec<LogicalPlan>,
386 ) -> Result<Self> {
387 Ok(Self {
388 input: inputs,
389 schema: Arc::clone(&self.schema),
390 })
391 }
392
393 fn supports_limit_pushdown(&self) -> bool {
394 false }
396 }
397 #[test]
398 fn limit_pushdown_basic() -> Result<()> {
399 let table_scan = test_table_scan()?;
400 let noop_plan = LogicalPlan::Extension(Extension {
401 node: Arc::new(NoopPlan {
402 input: vec![table_scan.clone()],
403 schema: Arc::clone(table_scan.schema()),
404 }),
405 });
406
407 let plan = LogicalPlanBuilder::from(noop_plan)
408 .limit(0, Some(1000))?
409 .build()?;
410
411 let expected = "Limit: skip=0, fetch=1000\
412 \n NoopPlan\
413 \n Limit: skip=0, fetch=1000\
414 \n TableScan: test, fetch=1000";
415
416 assert_optimized_plan_equal(plan, expected)
417 }
418
419 #[test]
420 fn limit_pushdown_with_skip() -> Result<()> {
421 let table_scan = test_table_scan()?;
422 let noop_plan = LogicalPlan::Extension(Extension {
423 node: Arc::new(NoopPlan {
424 input: vec![table_scan.clone()],
425 schema: Arc::clone(table_scan.schema()),
426 }),
427 });
428
429 let plan = LogicalPlanBuilder::from(noop_plan)
430 .limit(10, Some(1000))?
431 .build()?;
432
433 let expected = "Limit: skip=10, fetch=1000\
434 \n NoopPlan\
435 \n Limit: skip=0, fetch=1010\
436 \n TableScan: test, fetch=1010";
437
438 assert_optimized_plan_equal(plan, expected)
439 }
440
441 #[test]
442 fn limit_pushdown_multiple_limits() -> Result<()> {
443 let table_scan = test_table_scan()?;
444 let noop_plan = LogicalPlan::Extension(Extension {
445 node: Arc::new(NoopPlan {
446 input: vec![table_scan.clone()],
447 schema: Arc::clone(table_scan.schema()),
448 }),
449 });
450
451 let plan = LogicalPlanBuilder::from(noop_plan)
452 .limit(10, Some(1000))?
453 .limit(20, Some(500))?
454 .build()?;
455
456 let expected = "Limit: skip=30, fetch=500\
457 \n NoopPlan\
458 \n Limit: skip=0, fetch=530\
459 \n TableScan: test, fetch=530";
460
461 assert_optimized_plan_equal(plan, expected)
462 }
463
464 #[test]
465 fn limit_pushdown_multiple_inputs() -> Result<()> {
466 let table_scan = test_table_scan()?;
467 let noop_plan = LogicalPlan::Extension(Extension {
468 node: Arc::new(NoopPlan {
469 input: vec![table_scan.clone(), table_scan.clone()],
470 schema: Arc::clone(table_scan.schema()),
471 }),
472 });
473
474 let plan = LogicalPlanBuilder::from(noop_plan)
475 .limit(0, Some(1000))?
476 .build()?;
477
478 let expected = "Limit: skip=0, fetch=1000\
479 \n NoopPlan\
480 \n Limit: skip=0, fetch=1000\
481 \n TableScan: test, fetch=1000\
482 \n Limit: skip=0, fetch=1000\
483 \n TableScan: test, fetch=1000";
484
485 assert_optimized_plan_equal(plan, expected)
486 }
487
488 #[test]
489 fn limit_pushdown_disallowed_noop_plan() -> Result<()> {
490 let table_scan = test_table_scan()?;
491 let no_limit_noop_plan = LogicalPlan::Extension(Extension {
492 node: Arc::new(NoLimitNoopPlan {
493 input: vec![table_scan.clone()],
494 schema: Arc::clone(table_scan.schema()),
495 }),
496 });
497
498 let plan = LogicalPlanBuilder::from(no_limit_noop_plan)
499 .limit(0, Some(1000))?
500 .build()?;
501
502 let expected = "Limit: skip=0, fetch=1000\
503 \n NoLimitNoopPlan\
504 \n TableScan: test";
505
506 assert_optimized_plan_equal(plan, expected)
507 }
508
509 #[test]
510 fn limit_pushdown_projection_table_provider() -> Result<()> {
511 let table_scan = test_table_scan()?;
512
513 let plan = LogicalPlanBuilder::from(table_scan)
514 .project(vec![col("a")])?
515 .limit(0, Some(1000))?
516 .build()?;
517
518 let expected = "Projection: test.a\
521 \n Limit: skip=0, fetch=1000\
522 \n TableScan: test, fetch=1000";
523
524 assert_optimized_plan_equal(plan, expected)
525 }
526
527 #[test]
528 fn limit_push_down_take_smaller_limit() -> Result<()> {
529 let table_scan = test_table_scan()?;
530
531 let plan = LogicalPlanBuilder::from(table_scan)
532 .limit(0, Some(1000))?
533 .limit(0, Some(10))?
534 .build()?;
535
536 let expected = "Limit: skip=0, fetch=10\
540 \n TableScan: test, fetch=10";
541
542 assert_optimized_plan_equal(plan, expected)
543 }
544
545 #[test]
546 fn limit_doesnt_push_down_aggregation() -> Result<()> {
547 let table_scan = test_table_scan()?;
548
549 let plan = LogicalPlanBuilder::from(table_scan)
550 .aggregate(vec![col("a")], vec![max(col("b"))])?
551 .limit(0, Some(1000))?
552 .build()?;
553
554 let expected = "Limit: skip=0, fetch=1000\
556 \n Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]\
557 \n TableScan: test";
558
559 assert_optimized_plan_equal(plan, expected)
560 }
561
562 #[test]
563 fn limit_should_push_down_union() -> Result<()> {
564 let table_scan = test_table_scan()?;
565
566 let plan = LogicalPlanBuilder::from(table_scan.clone())
567 .union(LogicalPlanBuilder::from(table_scan).build()?)?
568 .limit(0, Some(1000))?
569 .build()?;
570
571 let expected = "Limit: skip=0, fetch=1000\
573 \n Union\
574 \n Limit: skip=0, fetch=1000\
575 \n TableScan: test, fetch=1000\
576 \n Limit: skip=0, fetch=1000\
577 \n TableScan: test, fetch=1000";
578
579 assert_optimized_plan_equal(plan, expected)
580 }
581
582 #[test]
583 fn limit_push_down_sort() -> Result<()> {
584 let table_scan = test_table_scan()?;
585
586 let plan = LogicalPlanBuilder::from(table_scan)
587 .sort_by(vec![col("a")])?
588 .limit(0, Some(10))?
589 .build()?;
590
591 let expected = "Limit: skip=0, fetch=10\
593 \n Sort: test.a ASC NULLS LAST, fetch=10\
594 \n TableScan: test";
595
596 assert_optimized_plan_equal(plan, expected)
597 }
598
599 #[test]
600 fn limit_push_down_sort_skip() -> Result<()> {
601 let table_scan = test_table_scan()?;
602
603 let plan = LogicalPlanBuilder::from(table_scan)
604 .sort_by(vec![col("a")])?
605 .limit(5, Some(10))?
606 .build()?;
607
608 let expected = "Limit: skip=5, fetch=10\
610 \n Sort: test.a ASC NULLS LAST, fetch=15\
611 \n TableScan: test";
612
613 assert_optimized_plan_equal(plan, expected)
614 }
615
616 #[test]
617 fn multi_stage_limit_recursive_to_deeper_limit() -> Result<()> {
618 let table_scan = test_table_scan()?;
619
620 let plan = LogicalPlanBuilder::from(table_scan)
621 .limit(0, Some(1000))?
622 .aggregate(vec![col("a")], vec![max(col("b"))])?
623 .limit(0, Some(10))?
624 .build()?;
625
626 let expected = "Limit: skip=0, fetch=10\
628 \n Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]\
629 \n Limit: skip=0, fetch=1000\
630 \n TableScan: test, fetch=1000";
631
632 assert_optimized_plan_equal(plan, expected)
633 }
634
635 #[test]
636 fn limit_pushdown_should_not_pushdown_limit_with_offset_only() -> Result<()> {
637 let table_scan = test_table_scan()?;
638 let plan = LogicalPlanBuilder::from(table_scan)
639 .limit(10, None)?
640 .build()?;
641
642 let expected = "Limit: skip=10, fetch=None\
645 \n TableScan: test";
646
647 assert_optimized_plan_equal(plan, expected)
648 }
649
650 #[test]
651 fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
652 let table_scan = test_table_scan()?;
653
654 let plan = LogicalPlanBuilder::from(table_scan)
655 .project(vec![col("a")])?
656 .limit(10, Some(1000))?
657 .build()?;
658
659 let expected = "Projection: test.a\
662 \n Limit: skip=10, fetch=1000\
663 \n TableScan: test, fetch=1010";
664
665 assert_optimized_plan_equal(plan, expected)
666 }
667
668 #[test]
669 fn limit_pushdown_with_offset_after_limit() -> Result<()> {
670 let table_scan = test_table_scan()?;
671
672 let plan = LogicalPlanBuilder::from(table_scan)
673 .project(vec![col("a")])?
674 .limit(0, Some(1000))?
675 .limit(10, None)?
676 .build()?;
677
678 let expected = "Projection: test.a\
679 \n Limit: skip=10, fetch=990\
680 \n TableScan: test, fetch=1000";
681
682 assert_optimized_plan_equal(plan, expected)
683 }
684
685 #[test]
686 fn limit_pushdown_with_limit_after_offset() -> Result<()> {
687 let table_scan = test_table_scan()?;
688
689 let plan = LogicalPlanBuilder::from(table_scan)
690 .project(vec![col("a")])?
691 .limit(10, None)?
692 .limit(0, Some(1000))?
693 .build()?;
694
695 let expected = "Projection: test.a\
696 \n Limit: skip=10, fetch=1000\
697 \n TableScan: test, fetch=1010";
698
699 assert_optimized_plan_equal(plan, expected)
700 }
701
702 #[test]
703 fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
704 let table_scan = test_table_scan()?;
705
706 let plan = LogicalPlanBuilder::from(table_scan)
707 .limit(10, None)?
708 .limit(0, Some(1000))?
709 .limit(0, Some(10))?
710 .build()?;
711
712 let expected = "Limit: skip=10, fetch=10\
713 \n TableScan: test, fetch=20";
714
715 assert_optimized_plan_equal(plan, expected)
716 }
717
718 #[test]
719 fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
720 let table_scan = test_table_scan()?;
721
722 let plan = LogicalPlanBuilder::from(table_scan)
723 .aggregate(vec![col("a")], vec![max(col("b"))])?
724 .limit(10, Some(1000))?
725 .build()?;
726
727 let expected = "Limit: skip=10, fetch=1000\
729 \n Aggregate: groupBy=[[test.a]], aggr=[[max(test.b)]]\
730 \n TableScan: test";
731
732 assert_optimized_plan_equal(plan, expected)
733 }
734
735 #[test]
736 fn limit_should_push_down_with_offset_union() -> Result<()> {
737 let table_scan = test_table_scan()?;
738
739 let plan = LogicalPlanBuilder::from(table_scan.clone())
740 .union(LogicalPlanBuilder::from(table_scan).build()?)?
741 .limit(10, Some(1000))?
742 .build()?;
743
744 let expected = "Limit: skip=10, fetch=1000\
746 \n Union\
747 \n Limit: skip=0, fetch=1010\
748 \n TableScan: test, fetch=1010\
749 \n Limit: skip=0, fetch=1010\
750 \n TableScan: test, fetch=1010";
751
752 assert_optimized_plan_equal(plan, expected)
753 }
754
755 #[test]
756 fn limit_offset_should_not_push_down_with_offset_join() -> Result<()> {
757 let table_scan_1 = test_table_scan()?;
758 let table_scan_2 = test_table_scan_with_name("test2")?;
759
760 let plan = LogicalPlanBuilder::from(table_scan_1)
761 .join(
762 LogicalPlanBuilder::from(table_scan_2).build()?,
763 JoinType::Inner,
764 (vec!["a"], vec!["a"]),
765 None,
766 )?
767 .limit(10, Some(1000))?
768 .build()?;
769
770 let expected = "Limit: skip=10, fetch=1000\
772 \n Inner Join: test.a = test2.a\
773 \n TableScan: test\
774 \n TableScan: test2";
775
776 assert_optimized_plan_equal(plan, expected)
777 }
778
779 #[test]
780 fn offset_limit_should_not_push_down_with_offset_join() -> Result<()> {
781 let table_scan_1 = test_table_scan()?;
782 let table_scan_2 = test_table_scan_with_name("test2")?;
783
784 let plan = LogicalPlanBuilder::from(table_scan_1)
785 .join(
786 LogicalPlanBuilder::from(table_scan_2).build()?,
787 JoinType::Inner,
788 (vec!["a"], vec!["a"]),
789 None,
790 )?
791 .limit(10, Some(1000))?
792 .build()?;
793
794 let expected = "Limit: skip=10, fetch=1000\
796 \n Inner Join: test.a = test2.a\
797 \n TableScan: test\
798 \n TableScan: test2";
799
800 assert_optimized_plan_equal(plan, expected)
801 }
802
803 #[test]
804 fn limit_offset_should_not_push_down_with_offset_sub_query() -> Result<()> {
805 let table_scan_1 = test_table_scan_with_name("test1")?;
806 let table_scan_2 = test_table_scan_with_name("test2")?;
807
808 let subquery = LogicalPlanBuilder::from(table_scan_1)
809 .project(vec![col("a")])?
810 .filter(col("a").eq(col("test1.a")))?
811 .build()?;
812
813 let outer_query = LogicalPlanBuilder::from(table_scan_2)
814 .project(vec![col("a")])?
815 .filter(exists(Arc::new(subquery)))?
816 .limit(10, Some(100))?
817 .build()?;
818
819 let expected = "Limit: skip=10, fetch=100\
821 \n Filter: EXISTS (<subquery>)\
822 \n Subquery:\
823 \n Filter: test1.a = test1.a\
824 \n Projection: test1.a\
825 \n TableScan: test1\
826 \n Projection: test2.a\
827 \n TableScan: test2";
828
829 assert_optimized_plan_equal(outer_query, expected)
830 }
831
832 #[test]
833 fn offset_limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
834 let table_scan_1 = test_table_scan_with_name("test1")?;
835 let table_scan_2 = test_table_scan_with_name("test2")?;
836
837 let subquery = LogicalPlanBuilder::from(table_scan_1)
838 .project(vec![col("a")])?
839 .filter(col("a").eq(col("test1.a")))?
840 .build()?;
841
842 let outer_query = LogicalPlanBuilder::from(table_scan_2)
843 .project(vec![col("a")])?
844 .filter(exists(Arc::new(subquery)))?
845 .limit(10, Some(100))?
846 .build()?;
847
848 let expected = "Limit: skip=10, fetch=100\
850 \n Filter: EXISTS (<subquery>)\
851 \n Subquery:\
852 \n Filter: test1.a = test1.a\
853 \n Projection: test1.a\
854 \n TableScan: test1\
855 \n Projection: test2.a\
856 \n TableScan: test2";
857
858 assert_optimized_plan_equal(outer_query, expected)
859 }
860
861 #[test]
862 fn limit_should_push_down_left_outer_join_with_offset() -> Result<()> {
863 let table_scan_1 = test_table_scan()?;
864 let table_scan_2 = test_table_scan_with_name("test2")?;
865
866 let plan = LogicalPlanBuilder::from(table_scan_1)
867 .join(
868 LogicalPlanBuilder::from(table_scan_2).build()?,
869 JoinType::Left,
870 (vec!["a"], vec!["a"]),
871 None,
872 )?
873 .limit(10, Some(1000))?
874 .build()?;
875
876 let expected = "Limit: skip=10, fetch=1000\
878 \n Left Join: test.a = test2.a\
879 \n Limit: skip=0, fetch=1010\
880 \n TableScan: test, fetch=1010\
881 \n TableScan: test2";
882
883 assert_optimized_plan_equal(plan, expected)
884 }
885
886 #[test]
887 fn limit_should_push_down_right_outer_join() -> Result<()> {
888 let table_scan_1 = test_table_scan()?;
889 let table_scan_2 = test_table_scan_with_name("test2")?;
890
891 let plan = LogicalPlanBuilder::from(table_scan_1)
892 .join(
893 LogicalPlanBuilder::from(table_scan_2).build()?,
894 JoinType::Right,
895 (vec!["a"], vec!["a"]),
896 None,
897 )?
898 .limit(0, Some(1000))?
899 .build()?;
900
901 let expected = "Limit: skip=0, fetch=1000\
903 \n Right Join: test.a = test2.a\
904 \n TableScan: test\
905 \n Limit: skip=0, fetch=1000\
906 \n TableScan: test2, fetch=1000";
907
908 assert_optimized_plan_equal(plan, expected)
909 }
910
911 #[test]
912 fn limit_should_push_down_right_outer_join_with_offset() -> Result<()> {
913 let table_scan_1 = test_table_scan()?;
914 let table_scan_2 = test_table_scan_with_name("test2")?;
915
916 let plan = LogicalPlanBuilder::from(table_scan_1)
917 .join(
918 LogicalPlanBuilder::from(table_scan_2).build()?,
919 JoinType::Right,
920 (vec!["a"], vec!["a"]),
921 None,
922 )?
923 .limit(10, Some(1000))?
924 .build()?;
925
926 let expected = "Limit: skip=10, fetch=1000\
928 \n Right Join: test.a = test2.a\
929 \n TableScan: test\
930 \n Limit: skip=0, fetch=1010\
931 \n TableScan: test2, fetch=1010";
932
933 assert_optimized_plan_equal(plan, expected)
934 }
935
936 #[test]
937 fn limit_push_down_cross_join() -> Result<()> {
938 let table_scan_1 = test_table_scan()?;
939 let table_scan_2 = test_table_scan_with_name("test2")?;
940
941 let plan = LogicalPlanBuilder::from(table_scan_1)
942 .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
943 .limit(0, Some(1000))?
944 .build()?;
945
946 let expected = "Limit: skip=0, fetch=1000\
947 \n Cross Join: \
948 \n Limit: skip=0, fetch=1000\
949 \n TableScan: test, fetch=1000\
950 \n Limit: skip=0, fetch=1000\
951 \n TableScan: test2, fetch=1000";
952
953 assert_optimized_plan_equal(plan, expected)
954 }
955
956 #[test]
957 fn skip_limit_push_down_cross_join() -> Result<()> {
958 let table_scan_1 = test_table_scan()?;
959 let table_scan_2 = test_table_scan_with_name("test2")?;
960
961 let plan = LogicalPlanBuilder::from(table_scan_1)
962 .cross_join(LogicalPlanBuilder::from(table_scan_2).build()?)?
963 .limit(1000, Some(1000))?
964 .build()?;
965
966 let expected = "Limit: skip=1000, fetch=1000\
967 \n Cross Join: \
968 \n Limit: skip=0, fetch=2000\
969 \n TableScan: test, fetch=2000\
970 \n Limit: skip=0, fetch=2000\
971 \n TableScan: test2, fetch=2000";
972
973 assert_optimized_plan_equal(plan, expected)
974 }
975
976 #[test]
977 fn merge_limit_result_empty() -> Result<()> {
978 let scan = test_table_scan()?;
979
980 let plan = LogicalPlanBuilder::from(scan)
981 .limit(0, Some(1000))?
982 .limit(1000, None)?
983 .build()?;
984
985 let expected = "Limit: skip=1000, fetch=0\
986 \n TableScan: test, fetch=0";
987
988 assert_optimized_plan_equal(plan, expected)
989 }
990
991 #[test]
992 fn skip_great_than_fetch() -> Result<()> {
993 let scan = test_table_scan()?;
994
995 let plan = LogicalPlanBuilder::from(scan)
996 .limit(0, Some(1))?
997 .limit(1000, None)?
998 .build()?;
999
1000 let expected = "Limit: skip=1000, fetch=0\
1001 \n TableScan: test, fetch=0";
1002
1003 assert_optimized_plan_equal(plan, expected)
1004 }
1005
1006 #[test]
1007 fn push_down_subquery_alias() -> Result<()> {
1008 let scan = test_table_scan()?;
1009
1010 let plan = LogicalPlanBuilder::from(scan)
1011 .alias("a")?
1012 .limit(0, Some(1))?
1013 .limit(1000, None)?
1014 .build()?;
1015
1016 let expected = "SubqueryAlias: a\
1017 \n Limit: skip=1000, fetch=0\
1018 \n TableScan: test, fetch=0";
1019
1020 assert_optimized_plan_equal(plan, expected)
1021 }
1022}