1use std::collections::BTreeSet;
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::decorrelate::PullUpCorrelatedExpr;
24use crate::optimizer::ApplyOrder;
25use crate::utils::replace_qualified_name;
26use crate::{OptimizerConfig, OptimizerRule};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30use datafusion_common::{
31 Column, DFSchemaRef, ExprSchema, NullEquality, Result, assert_or_internal_err,
32 plan_err,
33};
34use datafusion_expr::expr::{Exists, InSubquery};
35use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
36use datafusion_expr::logical_plan::{JoinType, Subquery};
37use datafusion_expr::utils::{conjunction, expr_to_columns, split_conjunction_owned};
38use datafusion_expr::{
39 BinaryExpr, Expr, Filter, LogicalPlan, LogicalPlanBuilder, Operator, exists,
40 in_subquery, lit, not, not_exists, not_in_subquery,
41};
42
43use log::debug;
44
45#[derive(Default, Debug)]
47pub struct DecorrelatePredicateSubquery {}
48
49impl DecorrelatePredicateSubquery {
50 #[expect(missing_docs)]
51 pub fn new() -> Self {
52 Self::default()
53 }
54}
55
56impl OptimizerRule for DecorrelatePredicateSubquery {
57 fn supports_rewrite(&self) -> bool {
58 true
59 }
60
61 fn rewrite(
62 &self,
63 plan: LogicalPlan,
64 config: &dyn OptimizerConfig,
65 ) -> Result<Transformed<LogicalPlan>> {
66 let plan = plan
67 .map_subqueries(|subquery| {
68 subquery.transform_down(|p| self.rewrite(p, config))
69 })?
70 .data;
71
72 let LogicalPlan::Filter(filter) = plan else {
73 return Ok(Transformed::no(plan));
74 };
75
76 if !has_subquery(&filter.predicate) {
77 return Ok(Transformed::no(LogicalPlan::Filter(filter)));
78 }
79
80 let (with_subqueries, mut other_exprs): (Vec<_>, Vec<_>) =
81 split_conjunction_owned(filter.predicate)
82 .into_iter()
83 .partition(has_subquery);
84
85 assert_or_internal_err!(
86 !with_subqueries.is_empty(),
87 "can not find expected subqueries in DecorrelatePredicateSubquery"
88 );
89
90 let mut cur_input = Arc::unwrap_or_clone(filter.input);
92 let original_schema = cur_input.schema().columns();
93 for subquery_expr in with_subqueries {
94 match extract_subquery_info(subquery_expr) {
95 SubqueryPredicate::Top(subquery) => {
97 match build_join_top(&subquery, &cur_input, config.alias_generator())?
98 {
99 Some(plan) => cur_input = plan,
100 None => other_exprs.push(subquery.expr()),
102 }
103 }
104 SubqueryPredicate::Embedded(expr) => {
106 let (plan, expr_without_subqueries) =
107 rewrite_inner_subqueries(cur_input, expr, config)?;
108 cur_input = plan;
109 other_exprs.push(expr_without_subqueries);
110 }
111 }
112 }
113
114 let expr = conjunction(other_exprs);
115 if let Some(expr) = expr {
116 let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
117 cur_input = LogicalPlan::Filter(new_filter);
118 }
119
120 if cur_input.schema().fields().len() != original_schema.len() {
121 cur_input = LogicalPlanBuilder::from(cur_input)
122 .project(original_schema.into_iter().map(Expr::from))?
123 .build()?;
124 }
125
126 Ok(Transformed::yes(cur_input))
127 }
128
129 fn name(&self) -> &str {
130 "decorrelate_predicate_subquery"
131 }
132
133 fn apply_order(&self) -> Option<ApplyOrder> {
134 Some(ApplyOrder::TopDown)
135 }
136}
137
138fn rewrite_inner_subqueries(
139 outer: LogicalPlan,
140 expr: Expr,
141 config: &dyn OptimizerConfig,
142) -> Result<(LogicalPlan, Expr)> {
143 let mut cur_input = outer;
144 let alias = config.alias_generator();
145 let expr_without_subqueries = expr.transform(|e| match e {
146 Expr::Exists(Exists {
147 subquery: Subquery { subquery, .. },
148 negated,
149 }) => match mark_join(&cur_input, &subquery, None, negated, alias)? {
150 Some((plan, exists_expr)) => {
151 cur_input = plan;
152 Ok(Transformed::yes(exists_expr))
153 }
154 None if negated => Ok(Transformed::no(not_exists(subquery))),
155 None => Ok(Transformed::no(exists(subquery))),
156 },
157 Expr::InSubquery(InSubquery {
158 expr,
159 subquery: Subquery { subquery, .. },
160 negated,
161 }) => {
162 let in_predicate = subquery
163 .head_output_expr()?
164 .map_or(plan_err!("single expression required."), |output_expr| {
165 Ok(Expr::eq(*expr.clone(), output_expr))
166 })?;
167 match mark_join(&cur_input, &subquery, Some(&in_predicate), negated, alias)? {
168 Some((plan, exists_expr)) => {
169 cur_input = plan;
170 Ok(Transformed::yes(exists_expr))
171 }
172 None if negated => Ok(Transformed::no(not_in_subquery(*expr, subquery))),
173 None => Ok(Transformed::no(in_subquery(*expr, subquery))),
174 }
175 }
176 _ => Ok(Transformed::no(e)),
177 })?;
178 Ok((cur_input, expr_without_subqueries.data))
179}
180
181enum SubqueryPredicate {
182 Top(SubqueryInfo),
185 Embedded(Expr),
188}
189
190fn extract_subquery_info(expr: Expr) -> SubqueryPredicate {
191 match expr {
192 Expr::Not(not_expr) => match *not_expr {
193 Expr::InSubquery(InSubquery {
194 expr,
195 subquery,
196 negated,
197 }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
198 subquery, *expr, !negated,
199 )),
200 Expr::Exists(Exists { subquery, negated }) => {
201 SubqueryPredicate::Top(SubqueryInfo::new(subquery, !negated))
202 }
203 expr => SubqueryPredicate::Embedded(not(expr)),
204 },
205 Expr::InSubquery(InSubquery {
206 expr,
207 subquery,
208 negated,
209 }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
210 subquery, *expr, negated,
211 )),
212 Expr::Exists(Exists { subquery, negated }) => {
213 SubqueryPredicate::Top(SubqueryInfo::new(subquery, negated))
214 }
215 expr => SubqueryPredicate::Embedded(expr),
216 }
217}
218
219fn has_subquery(expr: &Expr) -> bool {
220 expr.exists(|e| match e {
221 Expr::InSubquery(_) | Expr::Exists(_) => Ok(true),
222 _ => Ok(false),
223 })
224 .unwrap()
225}
226
227fn build_join_top(
258 query_info: &SubqueryInfo,
259 left: &LogicalPlan,
260 alias: &Arc<AliasGenerator>,
261) -> Result<Option<LogicalPlan>> {
262 let where_in_expr_opt = &query_info.where_in_expr;
263 let in_predicate_opt = where_in_expr_opt
264 .clone()
265 .map(|where_in_expr| {
266 query_info
267 .query
268 .subquery
269 .head_output_expr()?
270 .map_or(plan_err!("single expression required."), |expr| {
271 Ok(Expr::eq(where_in_expr, expr))
272 })
273 })
274 .map_or(Ok(None), |v| v.map(Some))?;
275
276 let join_type = match query_info.negated {
277 true => JoinType::LeftAnti,
278 false => JoinType::LeftSemi,
279 };
280 let subquery = query_info.query.subquery.as_ref();
281 let subquery_alias = alias.next("__correlated_sq");
282 build_join(
283 left,
284 subquery,
285 in_predicate_opt.as_ref(),
286 join_type,
287 subquery_alias,
288 )
289}
290
291fn mark_join(
307 left: &LogicalPlan,
308 subquery: &LogicalPlan,
309 in_predicate_opt: Option<&Expr>,
310 negated: bool,
311 alias_generator: &Arc<AliasGenerator>,
312) -> Result<Option<(LogicalPlan, Expr)>> {
313 let alias = alias_generator.next("__correlated_sq");
314
315 let exists_col = Expr::Column(Column::new(Some(alias.clone()), "mark"));
316 let exists_expr = if negated { !exists_col } else { exists_col };
317
318 Ok(
319 build_join(left, subquery, in_predicate_opt, JoinType::LeftMark, alias)?
320 .map(|plan| (plan, exists_expr)),
321 )
322}
323
324fn join_keys_may_be_null(
330 join_filter: &Expr,
331 left_schema: &DFSchemaRef,
332 right_schema: &DFSchemaRef,
333) -> Result<bool> {
334 let mut columns = std::collections::HashSet::new();
336 expr_to_columns(join_filter, &mut columns)?;
337
338 for col in columns {
340 if let Ok(field) = left_schema.field_from_column(&col)
342 && field.as_ref().is_nullable()
343 {
344 return Ok(true);
345 }
346 if let Ok(field) = right_schema.field_from_column(&col)
348 && field.as_ref().is_nullable()
349 {
350 return Ok(true);
351 }
352 }
353
354 Ok(false)
355}
356
357fn build_join(
358 left: &LogicalPlan,
359 subquery: &LogicalPlan,
360 in_predicate_opt: Option<&Expr>,
361 join_type: JoinType,
362 alias: String,
363) -> Result<Option<LogicalPlan>> {
364 let mut pull_up = PullUpCorrelatedExpr::new()
365 .with_in_predicate_opt(in_predicate_opt.cloned())
366 .with_exists_sub_query(in_predicate_opt.is_none());
367
368 let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
369 if !pull_up.can_pull_up {
370 return Ok(None);
371 }
372
373 let sub_query_alias = LogicalPlanBuilder::from(new_plan)
374 .alias(alias.to_string())?
375 .build()?;
376 let mut all_correlated_cols = BTreeSet::new();
377 pull_up
378 .correlated_subquery_cols_map
379 .values()
380 .for_each(|cols| all_correlated_cols.extend(cols.clone()));
381
382 let join_filter_opt = conjunction(pull_up.join_filters)
384 .map_or(Ok(None), |filter| {
385 replace_qualified_name(filter, &all_correlated_cols, &alias).map(Some)
386 })?;
387
388 let join_filter = match (join_filter_opt, in_predicate_opt.cloned()) {
389 (
390 Some(join_filter),
391 Some(Expr::BinaryExpr(BinaryExpr {
392 left,
393 op: Operator::Eq,
394 right,
395 })),
396 ) => {
397 let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
398 let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
399 in_predicate.and(join_filter)
400 }
401 (Some(join_filter), _) => join_filter,
402 (
403 _,
404 Some(Expr::BinaryExpr(BinaryExpr {
405 left,
406 op: Operator::Eq,
407 right,
408 })),
409 ) => {
410 let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
411
412 Expr::eq(left.deref().clone(), Expr::Column(right_col))
413 }
414 (None, None) => lit(true),
415 _ => return Ok(None),
416 };
417
418 if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) {
419 let right_schema = sub_query_alias.schema();
420
421 let mut needed = std::collections::HashSet::new();
423 expr_to_columns(&join_filter, &mut needed)?;
424 if let Some(in_pred) = in_predicate_opt {
425 expr_to_columns(in_pred, &mut needed)?;
426 }
427
428 let mut right_cols_idx_and_col: Vec<(usize, Column)> = needed
431 .into_iter()
432 .filter_map(|c| right_schema.index_of_column(&c).ok().map(|idx| (idx, c)))
433 .collect();
434
435 right_cols_idx_and_col.sort_by_key(|(idx, _)| *idx);
436
437 let right_proj_exprs: Vec<Expr> = right_cols_idx_and_col
438 .into_iter()
439 .map(|(_, c)| Expr::Column(c))
440 .collect();
441
442 let right_projected = if !right_proj_exprs.is_empty() {
443 LogicalPlanBuilder::from(sub_query_alias.clone())
444 .project(right_proj_exprs)?
445 .build()?
446 } else {
447 sub_query_alias.clone()
449 };
450
451 let new_plan = LogicalPlanBuilder::from(left.clone())
453 .join_on(right_projected, join_type, Some(join_filter))?
454 .build()?;
455
456 debug!(
457 "predicate subquery optimized:\n{}",
458 new_plan.display_indent()
459 );
460
461 return Ok(Some(new_plan));
462 }
463
464 let null_aware = join_type == JoinType::LeftAnti
473 && in_predicate_opt.is_some()
474 && join_keys_may_be_null(&join_filter, left.schema(), sub_query_alias.schema())?;
475
476 let new_plan = if null_aware {
478 LogicalPlanBuilder::from(left.clone())
480 .join_detailed_with_options(
481 sub_query_alias,
482 join_type,
483 (Vec::<Column>::new(), Vec::<Column>::new()), Some(join_filter),
485 NullEquality::NullEqualsNothing,
486 true, )?
488 .build()?
489 } else {
490 LogicalPlanBuilder::from(left.clone())
491 .join_on(sub_query_alias, join_type, Some(join_filter))?
492 .build()?
493 };
494 debug!(
495 "predicate subquery optimized:\n{}",
496 new_plan.display_indent()
497 );
498 Ok(Some(new_plan))
499}
500
501#[derive(Debug)]
502struct SubqueryInfo {
503 query: Subquery,
504 where_in_expr: Option<Expr>,
505 negated: bool,
506}
507
508impl SubqueryInfo {
509 pub fn new(query: Subquery, negated: bool) -> Self {
510 Self {
511 query,
512 where_in_expr: None,
513 negated,
514 }
515 }
516
517 pub fn new_with_in_expr(query: Subquery, expr: Expr, negated: bool) -> Self {
518 Self {
519 query,
520 where_in_expr: Some(expr),
521 negated,
522 }
523 }
524
525 pub fn expr(self) -> Expr {
526 match self.where_in_expr {
527 Some(expr) => match self.negated {
528 true => not_in_subquery(expr, self.query.subquery),
529 false => in_subquery(expr, self.query.subquery),
530 },
531 None => match self.negated {
532 true => not_exists(self.query.subquery),
533 false => exists(self.query.subquery),
534 },
535 }
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use std::ops::Add;
542
543 use super::*;
544 use crate::test::*;
545
546 use crate::assert_optimized_plan_eq_display_indent_snapshot;
547 use arrow::datatypes::{DataType, Field, Schema};
548 use datafusion_expr::builder::table_source;
549 use datafusion_expr::{and, binary_expr, col, out_ref_col, table_scan};
550
551 macro_rules! assert_optimized_plan_equal {
552 (
553 $plan:expr,
554 @ $expected:literal $(,)?
555 ) => {{
556 let rule: Arc<dyn crate::OptimizerRule + Send + Sync> = Arc::new(DecorrelatePredicateSubquery::new());
557 assert_optimized_plan_eq_display_indent_snapshot!(
558 rule,
559 $plan,
560 @ $expected,
561 )
562 }};
563 }
564
565 fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
566 let table_scan = test_table_scan_with_name(name)?;
567 Ok(Arc::new(
568 LogicalPlanBuilder::from(table_scan)
569 .project(vec![col("c")])?
570 .build()?,
571 ))
572 }
573
574 #[test]
576 fn in_subquery_multiple() -> Result<()> {
577 let table_scan = test_table_scan()?;
578 let plan = LogicalPlanBuilder::from(table_scan)
579 .filter(and(
580 in_subquery(col("c"), test_subquery_with_name("sq_1")?),
581 in_subquery(col("b"), test_subquery_with_name("sq_2")?),
582 ))?
583 .project(vec![col("test.b")])?
584 .build()?;
585
586 assert_optimized_plan_equal!(
587 plan,
588 @r"
589 Projection: test.b [b:UInt32]
590 LeftSemi Join: Filter: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]
591 LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
592 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
593 SubqueryAlias: __correlated_sq_1 [c:UInt32]
594 Projection: sq_1.c [c:UInt32]
595 TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]
596 SubqueryAlias: __correlated_sq_2 [c:UInt32]
597 Projection: sq_2.c [c:UInt32]
598 TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]
599 "
600 )
601 }
602
603 #[test]
605 fn in_subquery_with_and_filters() -> Result<()> {
606 let table_scan = test_table_scan()?;
607 let plan = LogicalPlanBuilder::from(table_scan)
608 .filter(and(
609 in_subquery(col("c"), test_subquery_with_name("sq")?),
610 and(
611 binary_expr(col("a"), Operator::Eq, lit(1_u32)),
612 binary_expr(col("b"), Operator::Lt, lit(30_u32)),
613 ),
614 ))?
615 .project(vec![col("test.b")])?
616 .build()?;
617
618 assert_optimized_plan_equal!(
619 plan,
620 @r"
621 Projection: test.b [b:UInt32]
622 Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]
623 LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
624 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
625 SubqueryAlias: __correlated_sq_1 [c:UInt32]
626 Projection: sq.c [c:UInt32]
627 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
628 "
629 )
630 }
631
632 #[test]
634 fn in_subquery_nested() -> Result<()> {
635 let table_scan = test_table_scan()?;
636
637 let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
638 .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
639 .project(vec![col("a")])?
640 .build()?;
641
642 let plan = LogicalPlanBuilder::from(table_scan)
643 .filter(in_subquery(col("b"), Arc::new(subquery)))?
644 .project(vec![col("test.b")])?
645 .build()?;
646
647 assert_optimized_plan_equal!(
648 plan,
649 @r"
650 Projection: test.b [b:UInt32]
651 LeftSemi Join: Filter: test.b = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
652 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
653 SubqueryAlias: __correlated_sq_2 [a:UInt32]
654 Projection: sq.a [a:UInt32]
655 LeftSemi Join: Filter: sq.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
656 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
657 SubqueryAlias: __correlated_sq_1 [c:UInt32]
658 Projection: sq_nested.c [c:UInt32]
659 TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]
660 "
661 )
662 }
663
664 #[test]
667 fn multiple_subqueries() -> Result<()> {
668 let orders = Arc::new(
669 LogicalPlanBuilder::from(scan_tpch_table("orders"))
670 .filter(
671 col("orders.o_custkey")
672 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
673 )?
674 .project(vec![col("orders.o_custkey")])?
675 .build()?,
676 );
677 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
678 .filter(
679 in_subquery(col("customer.c_custkey"), Arc::clone(&orders))
680 .and(in_subquery(col("customer.c_custkey"), orders)),
681 )?
682 .project(vec![col("customer.c_custkey")])?
683 .build()?;
684 debug!("plan to optimize:\n{}", plan.display_indent());
685
686 assert_optimized_plan_equal!(
687 plan,
688 @r"
689 Projection: customer.c_custkey [c_custkey:Int64]
690 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
691 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
692 TableScan: customer [c_custkey:Int64, c_name:Utf8]
693 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
694 Projection: orders.o_custkey [o_custkey:Int64]
695 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
696 SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
697 Projection: orders.o_custkey [o_custkey:Int64]
698 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
699 "
700 )
701 }
702
703 #[test]
706 fn recursive_subqueries() -> Result<()> {
707 let lineitem = Arc::new(
708 LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
709 .filter(
710 col("lineitem.l_orderkey")
711 .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
712 )?
713 .project(vec![col("lineitem.l_orderkey")])?
714 .build()?,
715 );
716
717 let orders = Arc::new(
718 LogicalPlanBuilder::from(scan_tpch_table("orders"))
719 .filter(
720 in_subquery(col("orders.o_orderkey"), lineitem).and(
721 col("orders.o_custkey")
722 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
723 ),
724 )?
725 .project(vec![col("orders.o_custkey")])?
726 .build()?,
727 );
728
729 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
730 .filter(in_subquery(col("customer.c_custkey"), orders))?
731 .project(vec![col("customer.c_custkey")])?
732 .build()?;
733
734 assert_optimized_plan_equal!(
735 plan,
736 @r"
737 Projection: customer.c_custkey [c_custkey:Int64]
738 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
739 TableScan: customer [c_custkey:Int64, c_name:Utf8]
740 SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
741 Projection: orders.o_custkey [o_custkey:Int64]
742 LeftSemi Join: Filter: orders.o_orderkey = __correlated_sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
743 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
744 SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]
745 Projection: lineitem.l_orderkey [l_orderkey:Int64]
746 TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]
747 "
748 )
749 }
750
751 #[test]
753 fn in_subquery_with_subquery_filters() -> Result<()> {
754 let sq = Arc::new(
755 LogicalPlanBuilder::from(scan_tpch_table("orders"))
756 .filter(
757 out_ref_col(DataType::Int64, "customer.c_custkey")
758 .eq(col("orders.o_custkey"))
759 .and(col("o_orderkey").eq(lit(1))),
760 )?
761 .project(vec![col("orders.o_custkey")])?
762 .build()?,
763 );
764
765 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
766 .filter(in_subquery(col("customer.c_custkey"), sq))?
767 .project(vec![col("customer.c_custkey")])?
768 .build()?;
769
770 assert_optimized_plan_equal!(
771 plan,
772 @r"
773 Projection: customer.c_custkey [c_custkey:Int64]
774 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
775 TableScan: customer [c_custkey:Int64, c_name:Utf8]
776 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
777 Projection: orders.o_custkey [o_custkey:Int64]
778 Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
779 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
780 "
781 )
782 }
783
784 #[test]
786 fn in_subquery_no_cols() -> Result<()> {
787 let sq = Arc::new(
788 LogicalPlanBuilder::from(scan_tpch_table("orders"))
789 .filter(
790 out_ref_col(DataType::Int64, "customer.c_custkey")
791 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
792 )?
793 .project(vec![col("orders.o_custkey")])?
794 .build()?,
795 );
796
797 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
798 .filter(in_subquery(col("customer.c_custkey"), sq))?
799 .project(vec![col("customer.c_custkey")])?
800 .build()?;
801
802 assert_optimized_plan_equal!(
803 plan,
804 @r"
805 Projection: customer.c_custkey [c_custkey:Int64]
806 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
807 TableScan: customer [c_custkey:Int64, c_name:Utf8]
808 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
809 Projection: orders.o_custkey [o_custkey:Int64]
810 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
811 "
812 )
813 }
814
815 #[test]
817 fn in_subquery_with_no_correlated_cols() -> Result<()> {
818 let sq = Arc::new(
819 LogicalPlanBuilder::from(scan_tpch_table("orders"))
820 .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
821 .project(vec![col("orders.o_custkey")])?
822 .build()?,
823 );
824
825 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
826 .filter(in_subquery(col("customer.c_custkey"), sq))?
827 .project(vec![col("customer.c_custkey")])?
828 .build()?;
829
830 assert_optimized_plan_equal!(
831 plan,
832 @r"
833 Projection: customer.c_custkey [c_custkey:Int64]
834 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
835 TableScan: customer [c_custkey:Int64, c_name:Utf8]
836 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
837 Projection: orders.o_custkey [o_custkey:Int64]
838 Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
839 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
840 "
841 )
842 }
843
844 #[test]
846 fn in_subquery_where_not_eq() -> Result<()> {
847 let sq = Arc::new(
848 LogicalPlanBuilder::from(scan_tpch_table("orders"))
849 .filter(
850 out_ref_col(DataType::Int64, "customer.c_custkey")
851 .not_eq(col("orders.o_custkey")),
852 )?
853 .project(vec![col("orders.o_custkey")])?
854 .build()?,
855 );
856
857 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
858 .filter(in_subquery(col("customer.c_custkey"), sq))?
859 .project(vec![col("customer.c_custkey")])?
860 .build()?;
861
862 assert_optimized_plan_equal!(
863 plan,
864 @r"
865 Projection: customer.c_custkey [c_custkey:Int64]
866 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
867 TableScan: customer [c_custkey:Int64, c_name:Utf8]
868 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
869 Projection: orders.o_custkey [o_custkey:Int64]
870 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
871 "
872 )
873 }
874
875 #[test]
877 fn in_subquery_where_less_than() -> Result<()> {
878 let sq = Arc::new(
879 LogicalPlanBuilder::from(scan_tpch_table("orders"))
880 .filter(
881 out_ref_col(DataType::Int64, "customer.c_custkey")
882 .lt(col("orders.o_custkey")),
883 )?
884 .project(vec![col("orders.o_custkey")])?
885 .build()?,
886 );
887
888 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
889 .filter(in_subquery(col("customer.c_custkey"), sq))?
890 .project(vec![col("customer.c_custkey")])?
891 .build()?;
892
893 assert_optimized_plan_equal!(
894 plan,
895 @r"
896 Projection: customer.c_custkey [c_custkey:Int64]
897 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
898 TableScan: customer [c_custkey:Int64, c_name:Utf8]
899 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
900 Projection: orders.o_custkey [o_custkey:Int64]
901 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
902 "
903 )
904 }
905
906 #[test]
908 fn in_subquery_with_subquery_disjunction() -> Result<()> {
909 let sq = Arc::new(
910 LogicalPlanBuilder::from(scan_tpch_table("orders"))
911 .filter(
912 out_ref_col(DataType::Int64, "customer.c_custkey")
913 .eq(col("orders.o_custkey"))
914 .or(col("o_orderkey").eq(lit(1))),
915 )?
916 .project(vec![col("orders.o_custkey")])?
917 .build()?,
918 );
919
920 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
921 .filter(in_subquery(col("customer.c_custkey"), sq))?
922 .project(vec![col("customer.c_custkey")])?
923 .build()?;
924
925 assert_optimized_plan_equal!(
926 plan,
927 @r"
928 Projection: customer.c_custkey [c_custkey:Int64]
929 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND (customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1)) [c_custkey:Int64, c_name:Utf8]
930 TableScan: customer [c_custkey:Int64, c_name:Utf8]
931 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]
932 Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]
933 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
934 "
935 )
936 }
937
938 #[test]
940 fn in_subquery_no_projection() -> Result<()> {
941 let sq = Arc::new(
942 LogicalPlanBuilder::from(scan_tpch_table("orders"))
943 .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
944 .build()?,
945 );
946
947 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
948 .filter(in_subquery(col("customer.c_custkey"), sq))?
949 .project(vec![col("customer.c_custkey")])?
950 .build()?;
951
952 let expected = "Invalid (non-executable) plan after Analyzer\
954 \ncaused by\
955 \nError during planning: InSubquery should only return one column, but found 4";
956 assert_analyzer_check_err(vec![], plan, expected);
957
958 Ok(())
959 }
960
961 #[test]
963 fn in_subquery_join_expr() -> Result<()> {
964 let sq = Arc::new(
965 LogicalPlanBuilder::from(scan_tpch_table("orders"))
966 .filter(
967 out_ref_col(DataType::Int64, "customer.c_custkey")
968 .eq(col("orders.o_custkey")),
969 )?
970 .project(vec![col("orders.o_custkey")])?
971 .build()?,
972 );
973
974 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
975 .filter(in_subquery(col("customer.c_custkey").add(lit(1)), sq))?
976 .project(vec![col("customer.c_custkey")])?
977 .build()?;
978
979 assert_optimized_plan_equal!(
980 plan,
981 @r"
982 Projection: customer.c_custkey [c_custkey:Int64]
983 LeftSemi Join: Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
984 TableScan: customer [c_custkey:Int64, c_name:Utf8]
985 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
986 Projection: orders.o_custkey [o_custkey:Int64]
987 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
988 "
989 )
990 }
991
992 #[test]
994 fn in_subquery_project_expr() -> Result<()> {
995 let sq = Arc::new(
996 LogicalPlanBuilder::from(scan_tpch_table("orders"))
997 .filter(
998 out_ref_col(DataType::Int64, "customer.c_custkey")
999 .eq(col("orders.o_custkey")),
1000 )?
1001 .project(vec![col("orders.o_custkey").add(lit(1))])?
1002 .build()?,
1003 );
1004
1005 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1006 .filter(in_subquery(col("customer.c_custkey"), sq))?
1007 .project(vec![col("customer.c_custkey")])?
1008 .build()?;
1009
1010 assert_optimized_plan_equal!(
1011 plan,
1012 @r"
1013 Projection: customer.c_custkey [c_custkey:Int64]
1014 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.orders.o_custkey + Int32(1) AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1015 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1016 SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1017 Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1018 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1019 "
1020 )
1021 }
1022
1023 #[test]
1025 fn in_subquery_multi_col() -> Result<()> {
1026 let sq = Arc::new(
1027 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1028 .filter(
1029 out_ref_col(DataType::Int64, "customer.c_custkey")
1030 .eq(col("orders.o_custkey")),
1031 )?
1032 .project(vec![col("orders.o_custkey"), col("orders.o_orderkey")])?
1033 .build()?,
1034 );
1035
1036 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1037 .filter(
1038 in_subquery(col("customer.c_custkey"), sq)
1039 .and(col("c_custkey").eq(lit(1))),
1040 )?
1041 .project(vec![col("customer.c_custkey")])?
1042 .build()?;
1043
1044 let expected = "Invalid (non-executable) plan after Analyzer\
1045 \ncaused by\
1046 \nError during planning: InSubquery should only return one column";
1047 assert_analyzer_check_err(vec![], plan, expected);
1048
1049 Ok(())
1050 }
1051
1052 #[test]
1054 fn should_support_additional_filters() -> Result<()> {
1055 let sq = Arc::new(
1056 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1057 .filter(
1058 out_ref_col(DataType::Int64, "customer.c_custkey")
1059 .eq(col("orders.o_custkey")),
1060 )?
1061 .project(vec![col("orders.o_custkey")])?
1062 .build()?,
1063 );
1064
1065 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1066 .filter(
1067 in_subquery(col("customer.c_custkey"), sq)
1068 .and(col("c_custkey").eq(lit(1))),
1069 )?
1070 .project(vec![col("customer.c_custkey")])?
1071 .build()?;
1072
1073 assert_optimized_plan_equal!(
1074 plan,
1075 @r"
1076 Projection: customer.c_custkey [c_custkey:Int64]
1077 Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1078 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1079 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1080 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1081 Projection: orders.o_custkey [o_custkey:Int64]
1082 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1083 "
1084 )
1085 }
1086
1087 #[test]
1089 fn in_subquery_correlated() -> Result<()> {
1090 let sq = Arc::new(
1091 LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1092 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1093 .project(vec![col("c")])?
1094 .build()?,
1095 );
1096
1097 let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1098 .filter(in_subquery(col("c"), sq))?
1099 .project(vec![col("test.b")])?
1100 .build()?;
1101
1102 assert_optimized_plan_equal!(
1103 plan,
1104 @r"
1105 Projection: test.b [b:UInt32]
1106 LeftSemi Join: Filter: test.c = __correlated_sq_1.c AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1107 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1108 SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1109 Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1110 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1111 "
1112 )
1113 }
1114
1115 #[test]
1117 fn in_subquery_simple() -> Result<()> {
1118 let table_scan = test_table_scan()?;
1119 let plan = LogicalPlanBuilder::from(table_scan)
1120 .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
1121 .project(vec![col("test.b")])?
1122 .build()?;
1123
1124 assert_optimized_plan_equal!(
1125 plan,
1126 @r"
1127 Projection: test.b [b:UInt32]
1128 LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1129 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1130 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1131 Projection: sq.c [c:UInt32]
1132 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1133 "
1134 )
1135 }
1136
1137 #[test]
1139 fn not_in_subquery_simple() -> Result<()> {
1140 let table_scan = test_table_scan()?;
1141 let plan = LogicalPlanBuilder::from(table_scan)
1142 .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
1143 .project(vec![col("test.b")])?
1144 .build()?;
1145
1146 assert_optimized_plan_equal!(
1147 plan,
1148 @r"
1149 Projection: test.b [b:UInt32]
1150 LeftAnti Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1151 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1152 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1153 Projection: sq.c [c:UInt32]
1154 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1155 "
1156 )
1157 }
1158
1159 #[test]
1160 fn wrapped_not_in_subquery() -> Result<()> {
1161 let table_scan = test_table_scan()?;
1162 let plan = LogicalPlanBuilder::from(table_scan)
1163 .filter(not(in_subquery(col("c"), test_subquery_with_name("sq")?)))?
1164 .project(vec![col("test.b")])?
1165 .build()?;
1166
1167 assert_optimized_plan_equal!(
1168 plan,
1169 @r"
1170 Projection: test.b [b:UInt32]
1171 LeftAnti Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1172 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1173 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1174 Projection: sq.c [c:UInt32]
1175 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1176 "
1177 )
1178 }
1179
1180 #[test]
1181 fn wrapped_not_not_in_subquery() -> Result<()> {
1182 let table_scan = test_table_scan()?;
1183 let plan = LogicalPlanBuilder::from(table_scan)
1184 .filter(not(not_in_subquery(
1185 col("c"),
1186 test_subquery_with_name("sq")?,
1187 )))?
1188 .project(vec![col("test.b")])?
1189 .build()?;
1190
1191 assert_optimized_plan_equal!(
1192 plan,
1193 @r"
1194 Projection: test.b [b:UInt32]
1195 LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1196 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1197 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1198 Projection: sq.c [c:UInt32]
1199 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1200 "
1201 )
1202 }
1203
1204 #[test]
1205 fn in_subquery_both_side_expr() -> Result<()> {
1206 let table_scan = test_table_scan()?;
1207 let subquery_scan = test_table_scan_with_name("sq")?;
1208
1209 let subquery = LogicalPlanBuilder::from(subquery_scan)
1210 .project(vec![col("c") * lit(2u32)])?
1211 .build()?;
1212
1213 let plan = LogicalPlanBuilder::from(table_scan)
1214 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1215 .project(vec![col("test.b")])?
1216 .build()?;
1217
1218 assert_optimized_plan_equal!(
1219 plan,
1220 @r"
1221 Projection: test.b [b:UInt32]
1222 LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1223 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1224 SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32]
1225 Projection: sq.c * UInt32(2) [sq.c * UInt32(2):UInt32]
1226 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1227 "
1228 )
1229 }
1230
1231 #[test]
1232 fn in_subquery_join_filter_and_inner_filter() -> Result<()> {
1233 let table_scan = test_table_scan()?;
1234 let subquery_scan = test_table_scan_with_name("sq")?;
1235
1236 let subquery = LogicalPlanBuilder::from(subquery_scan)
1237 .filter(
1238 out_ref_col(DataType::UInt32, "test.a")
1239 .eq(col("sq.a"))
1240 .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1241 )?
1242 .project(vec![col("c") * lit(2u32)])?
1243 .build()?;
1244
1245 let plan = LogicalPlanBuilder::from(table_scan)
1246 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1247 .project(vec![col("test.b")])?
1248 .build()?;
1249
1250 assert_optimized_plan_equal!(
1251 plan,
1252 @r"
1253 Projection: test.b [b:UInt32]
1254 LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1255 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1256 SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32]
1257 Projection: sq.c * UInt32(2), sq.a [sq.c * UInt32(2):UInt32, a:UInt32]
1258 Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]
1259 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1260 "
1261 )
1262 }
1263
1264 #[test]
1265 fn in_subquery_multi_project_subquery_cols() -> Result<()> {
1266 let table_scan = test_table_scan()?;
1267 let subquery_scan = test_table_scan_with_name("sq")?;
1268
1269 let subquery = LogicalPlanBuilder::from(subquery_scan)
1270 .filter(
1271 out_ref_col(DataType::UInt32, "test.a")
1272 .add(out_ref_col(DataType::UInt32, "test.b"))
1273 .eq(col("sq.a").add(col("sq.b")))
1274 .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1275 )?
1276 .project(vec![col("c") * lit(2u32)])?
1277 .build()?;
1278
1279 let plan = LogicalPlanBuilder::from(table_scan)
1280 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1281 .project(vec![col("test.b")])?
1282 .build()?;
1283
1284 assert_optimized_plan_equal!(
1285 plan,
1286 @r"
1287 Projection: test.b [b:UInt32]
1288 LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a + test.b = __correlated_sq_1.a + __correlated_sq_1.b [a:UInt32, b:UInt32, c:UInt32]
1289 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1290 SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]
1291 Projection: sq.c * UInt32(2), sq.a, sq.b [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]
1292 Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]
1293 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1294 "
1295 )
1296 }
1297
1298 #[test]
1299 fn two_in_subquery_with_outer_filter() -> Result<()> {
1300 let table_scan = test_table_scan()?;
1301 let subquery_scan1 = test_table_scan_with_name("sq1")?;
1302 let subquery_scan2 = test_table_scan_with_name("sq2")?;
1303
1304 let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1305 .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq1.a")))?
1306 .project(vec![col("c") * lit(2u32)])?
1307 .build()?;
1308
1309 let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1310 .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq2.a")))?
1311 .project(vec![col("c") * lit(2u32)])?
1312 .build()?;
1313
1314 let plan = LogicalPlanBuilder::from(table_scan)
1315 .filter(
1316 in_subquery(col("c") + lit(1u32), Arc::new(subquery1)).and(
1317 in_subquery(col("c") * lit(2u32), Arc::new(subquery2))
1318 .and(col("test.c").gt(lit(1u32))),
1319 ),
1320 )?
1321 .project(vec![col("test.b")])?
1322 .build()?;
1323
1324 assert_optimized_plan_equal!(
1325 plan,
1326 @r"
1327 Projection: test.b [b:UInt32]
1328 Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]
1329 LeftSemi Join: Filter: test.c * UInt32(2) = __correlated_sq_2.sq2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
1330 LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq1.c * UInt32(2) AND test.a > __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1331 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1332 SubqueryAlias: __correlated_sq_1 [sq1.c * UInt32(2):UInt32, a:UInt32]
1333 Projection: sq1.c * UInt32(2), sq1.a [sq1.c * UInt32(2):UInt32, a:UInt32]
1334 TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]
1335 SubqueryAlias: __correlated_sq_2 [sq2.c * UInt32(2):UInt32, a:UInt32]
1336 Projection: sq2.c * UInt32(2), sq2.a [sq2.c * UInt32(2):UInt32, a:UInt32]
1337 TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]
1338 "
1339 )
1340 }
1341
1342 #[test]
1343 fn in_subquery_with_same_table() -> Result<()> {
1344 let outer_scan = test_table_scan()?;
1345 let subquery_scan = test_table_scan()?;
1346 let subquery = LogicalPlanBuilder::from(subquery_scan)
1347 .filter(col("test.a").gt(col("test.b")))?
1348 .project(vec![col("c")])?
1349 .build()?;
1350
1351 let plan = LogicalPlanBuilder::from(outer_scan)
1352 .filter(in_subquery(col("test.a"), Arc::new(subquery)))?
1353 .project(vec![col("test.b")])?
1354 .build()?;
1355
1356 assert_optimized_plan_equal!(
1358 plan,
1359 @r"
1360 Projection: test.b [b:UInt32]
1361 LeftSemi Join: Filter: test.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1362 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1363 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1364 Projection: test.c [c:UInt32]
1365 Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]
1366 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1367 "
1368 )
1369 }
1370
1371 #[test]
1373 fn multiple_exists_subqueries() -> Result<()> {
1374 let orders = Arc::new(
1375 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1376 .filter(
1377 col("orders.o_custkey")
1378 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1379 )?
1380 .project(vec![col("orders.o_custkey")])?
1381 .build()?,
1382 );
1383
1384 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1385 .filter(exists(Arc::clone(&orders)).and(exists(orders)))?
1386 .project(vec![col("customer.c_custkey")])?
1387 .build()?;
1388
1389 assert_optimized_plan_equal!(
1390 plan,
1391 @r"
1392 Projection: customer.c_custkey [c_custkey:Int64]
1393 LeftSemi Join: Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1394 LeftSemi Join: Filter: __correlated_sq_1.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1395 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1396 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1397 Projection: orders.o_custkey [o_custkey:Int64]
1398 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1399 SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
1400 Projection: orders.o_custkey [o_custkey:Int64]
1401 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1402 "
1403 )
1404 }
1405
1406 #[test]
1408 fn recursive_exists_subqueries() -> Result<()> {
1409 let lineitem = Arc::new(
1410 LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
1411 .filter(
1412 col("lineitem.l_orderkey")
1413 .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
1414 )?
1415 .project(vec![col("lineitem.l_orderkey")])?
1416 .build()?,
1417 );
1418
1419 let orders = Arc::new(
1420 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1421 .filter(
1422 exists(lineitem).and(
1423 col("orders.o_custkey")
1424 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1425 ),
1426 )?
1427 .project(vec![col("orders.o_custkey")])?
1428 .build()?,
1429 );
1430
1431 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1432 .filter(exists(orders))?
1433 .project(vec![col("customer.c_custkey")])?
1434 .build()?;
1435
1436 assert_optimized_plan_equal!(
1437 plan,
1438 @r"
1439 Projection: customer.c_custkey [c_custkey:Int64]
1440 LeftSemi Join: Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1441 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1442 SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
1443 Projection: orders.o_custkey [o_custkey:Int64]
1444 LeftSemi Join: Filter: __correlated_sq_1.l_orderkey = orders.o_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1445 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1446 SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]
1447 Projection: lineitem.l_orderkey [l_orderkey:Int64]
1448 TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]
1449 "
1450 )
1451 }
1452
1453 #[test]
1455 fn exists_subquery_with_subquery_filters() -> Result<()> {
1456 let sq = Arc::new(
1457 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1458 .filter(
1459 out_ref_col(DataType::Int64, "customer.c_custkey")
1460 .eq(col("orders.o_custkey"))
1461 .and(col("o_orderkey").eq(lit(1))),
1462 )?
1463 .project(vec![col("orders.o_custkey")])?
1464 .build()?,
1465 );
1466
1467 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1468 .filter(exists(sq))?
1469 .project(vec![col("customer.c_custkey")])?
1470 .build()?;
1471
1472 assert_optimized_plan_equal!(
1473 plan,
1474 @r"
1475 Projection: customer.c_custkey [c_custkey:Int64]
1476 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1477 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1478 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1479 Projection: orders.o_custkey [o_custkey:Int64]
1480 Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1481 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1482 "
1483 )
1484 }
1485
1486 #[test]
1487 fn exists_subquery_no_cols() -> Result<()> {
1488 let sq = Arc::new(
1489 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1490 .filter(out_ref_col(DataType::Int64, "customer.c_custkey").eq(lit(1u32)))?
1491 .project(vec![col("orders.o_custkey")])?
1492 .build()?,
1493 );
1494
1495 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1496 .filter(exists(sq))?
1497 .project(vec![col("customer.c_custkey")])?
1498 .build()?;
1499
1500 assert_optimized_plan_equal!(
1502 plan,
1503 @r"
1504 Projection: customer.c_custkey [c_custkey:Int64]
1505 LeftSemi Join: Filter: customer.c_custkey = UInt32(1) [c_custkey:Int64, c_name:Utf8]
1506 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1507 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1508 Projection: orders.o_custkey [o_custkey:Int64]
1509 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1510 "
1511 )
1512 }
1513
1514 #[test]
1516 fn exists_subquery_with_no_correlated_cols() -> Result<()> {
1517 let sq = Arc::new(
1518 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1519 .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
1520 .project(vec![col("orders.o_custkey")])?
1521 .build()?,
1522 );
1523
1524 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1525 .filter(exists(sq))?
1526 .project(vec![col("customer.c_custkey")])?
1527 .build()?;
1528
1529 assert_optimized_plan_equal!(
1530 plan,
1531 @r"
1532 Projection: customer.c_custkey [c_custkey:Int64]
1533 LeftSemi Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8]
1534 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1535 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1536 Projection: orders.o_custkey [o_custkey:Int64]
1537 Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1538 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1539 "
1540 )
1541 }
1542
1543 #[test]
1545 fn exists_subquery_where_not_eq() -> Result<()> {
1546 let sq = Arc::new(
1547 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1548 .filter(
1549 out_ref_col(DataType::Int64, "customer.c_custkey")
1550 .not_eq(col("orders.o_custkey")),
1551 )?
1552 .project(vec![col("orders.o_custkey")])?
1553 .build()?,
1554 );
1555
1556 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1557 .filter(exists(sq))?
1558 .project(vec![col("customer.c_custkey")])?
1559 .build()?;
1560
1561 assert_optimized_plan_equal!(
1562 plan,
1563 @r"
1564 Projection: customer.c_custkey [c_custkey:Int64]
1565 LeftSemi Join: Filter: customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1566 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1567 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1568 Projection: orders.o_custkey [o_custkey:Int64]
1569 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1570 "
1571 )
1572 }
1573
1574 #[test]
1576 fn exists_subquery_where_less_than() -> Result<()> {
1577 let sq = Arc::new(
1578 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1579 .filter(
1580 out_ref_col(DataType::Int64, "customer.c_custkey")
1581 .lt(col("orders.o_custkey")),
1582 )?
1583 .project(vec![col("orders.o_custkey")])?
1584 .build()?,
1585 );
1586
1587 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1588 .filter(exists(sq))?
1589 .project(vec![col("customer.c_custkey")])?
1590 .build()?;
1591
1592 assert_optimized_plan_equal!(
1593 plan,
1594 @r"
1595 Projection: customer.c_custkey [c_custkey:Int64]
1596 LeftSemi Join: Filter: customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1597 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1598 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1599 Projection: orders.o_custkey [o_custkey:Int64]
1600 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1601 "
1602 )
1603 }
1604
1605 #[test]
1607 fn exists_subquery_with_subquery_disjunction() -> Result<()> {
1608 let sq = Arc::new(
1609 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1610 .filter(
1611 out_ref_col(DataType::Int64, "customer.c_custkey")
1612 .eq(col("orders.o_custkey"))
1613 .or(col("o_orderkey").eq(lit(1))),
1614 )?
1615 .project(vec![col("orders.o_custkey")])?
1616 .build()?,
1617 );
1618
1619 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1620 .filter(exists(sq))?
1621 .project(vec![col("customer.c_custkey")])?
1622 .build()?;
1623
1624 assert_optimized_plan_equal!(
1625 plan,
1626 @r"
1627 Projection: customer.c_custkey [c_custkey:Int64]
1628 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1629 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1630 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]
1631 Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]
1632 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1633 "
1634 )
1635 }
1636
1637 #[test]
1639 fn exists_subquery_no_projection() -> Result<()> {
1640 let sq = Arc::new(
1641 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1642 .filter(
1643 out_ref_col(DataType::Int64, "customer.c_custkey")
1644 .eq(col("orders.o_custkey")),
1645 )?
1646 .build()?,
1647 );
1648
1649 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1650 .filter(exists(sq))?
1651 .project(vec![col("customer.c_custkey")])?
1652 .build()?;
1653
1654 assert_optimized_plan_equal!(
1655 plan,
1656 @r"
1657 Projection: customer.c_custkey [c_custkey:Int64]
1658 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1659 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1660 SubqueryAlias: __correlated_sq_1 [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1661 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1662 "
1663 )
1664 }
1665
1666 #[test]
1668 fn exists_subquery_project_expr() -> Result<()> {
1669 let sq = Arc::new(
1670 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1671 .filter(
1672 out_ref_col(DataType::Int64, "customer.c_custkey")
1673 .eq(col("orders.o_custkey")),
1674 )?
1675 .project(vec![col("orders.o_custkey").add(lit(1))])?
1676 .build()?,
1677 );
1678
1679 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1680 .filter(exists(sq))?
1681 .project(vec![col("customer.c_custkey")])?
1682 .build()?;
1683
1684 assert_optimized_plan_equal!(
1685 plan,
1686 @r"
1687 Projection: customer.c_custkey [c_custkey:Int64]
1688 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1689 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1690 SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1691 Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1692 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1693 "
1694 )
1695 }
1696
1697 #[test]
1699 fn exists_subquery_should_support_additional_filters() -> Result<()> {
1700 let sq = Arc::new(
1701 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1702 .filter(
1703 out_ref_col(DataType::Int64, "customer.c_custkey")
1704 .eq(col("orders.o_custkey")),
1705 )?
1706 .project(vec![col("orders.o_custkey")])?
1707 .build()?,
1708 );
1709 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1710 .filter(exists(sq).and(col("c_custkey").eq(lit(1))))?
1711 .project(vec![col("customer.c_custkey")])?
1712 .build()?;
1713
1714 assert_optimized_plan_equal!(
1715 plan,
1716 @r"
1717 Projection: customer.c_custkey [c_custkey:Int64]
1718 Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1719 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1720 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1721 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1722 Projection: orders.o_custkey [o_custkey:Int64]
1723 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1724 "
1725 )
1726 }
1727
1728 #[test]
1730 fn exists_subquery_disjunction() -> Result<()> {
1731 let sq = Arc::new(
1732 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1733 .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
1734 .project(vec![col("orders.o_custkey")])?
1735 .build()?,
1736 );
1737
1738 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1739 .filter(exists(sq).or(col("customer.c_custkey").eq(lit(1))))?
1740 .project(vec![col("customer.c_custkey")])?
1741 .build()?;
1742
1743 assert_optimized_plan_equal!(
1744 plan,
1745 @r"
1746 Projection: customer.c_custkey [c_custkey:Int64]
1747 Projection: customer.c_custkey, customer.c_name [c_custkey:Int64, c_name:Utf8]
1748 Filter: __correlated_sq_1.mark OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, mark:Boolean]
1749 LeftMark Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, mark:Boolean]
1750 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1751 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1752 Projection: orders.o_custkey [o_custkey:Int64]
1753 Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1754 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1755 "
1756 )
1757 }
1758
1759 #[test]
1761 fn exists_subquery_correlated() -> Result<()> {
1762 let sq = Arc::new(
1763 LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1764 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1765 .project(vec![col("c")])?
1766 .build()?,
1767 );
1768
1769 let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1770 .filter(exists(sq))?
1771 .project(vec![col("test.c")])?
1772 .build()?;
1773
1774 assert_optimized_plan_equal!(
1775 plan,
1776 @r"
1777 Projection: test.c [c:UInt32]
1778 LeftSemi Join: Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1779 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1780 SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1781 Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1782 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1783 "
1784 )
1785 }
1786
1787 #[test]
1789 fn exists_subquery_simple() -> Result<()> {
1790 let table_scan = test_table_scan()?;
1791 let plan = LogicalPlanBuilder::from(table_scan)
1792 .filter(exists(test_subquery_with_name("sq")?))?
1793 .project(vec![col("test.b")])?
1794 .build()?;
1795
1796 assert_optimized_plan_equal!(
1797 plan,
1798 @r"
1799 Projection: test.b [b:UInt32]
1800 LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1801 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1802 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1803 Projection: sq.c [c:UInt32]
1804 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1805 "
1806 )
1807 }
1808
1809 #[test]
1811 fn not_exists_subquery_simple() -> Result<()> {
1812 let table_scan = test_table_scan()?;
1813 let plan = LogicalPlanBuilder::from(table_scan)
1814 .filter(not_exists(test_subquery_with_name("sq")?))?
1815 .project(vec![col("test.b")])?
1816 .build()?;
1817
1818 assert_optimized_plan_equal!(
1819 plan,
1820 @r"
1821 Projection: test.b [b:UInt32]
1822 LeftAnti Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1823 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1824 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1825 Projection: sq.c [c:UInt32]
1826 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1827 "
1828 )
1829 }
1830
1831 #[test]
1832 fn two_exists_subquery_with_outer_filter() -> Result<()> {
1833 let table_scan = test_table_scan()?;
1834 let subquery_scan1 = test_table_scan_with_name("sq1")?;
1835 let subquery_scan2 = test_table_scan_with_name("sq2")?;
1836
1837 let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1838 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq1.a")))?
1839 .project(vec![col("c")])?
1840 .build()?;
1841
1842 let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1843 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq2.a")))?
1844 .project(vec![col("c")])?
1845 .build()?;
1846
1847 let plan = LogicalPlanBuilder::from(table_scan)
1848 .filter(
1849 exists(Arc::new(subquery1))
1850 .and(exists(Arc::new(subquery2)).and(col("test.c").gt(lit(1u32)))),
1851 )?
1852 .project(vec![col("test.b")])?
1853 .build()?;
1854
1855 assert_optimized_plan_equal!(
1856 plan,
1857 @r"
1858 Projection: test.b [b:UInt32]
1859 Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]
1860 LeftSemi Join: Filter: test.a = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
1861 LeftSemi Join: Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1862 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1863 SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1864 Projection: sq1.c, sq1.a [c:UInt32, a:UInt32]
1865 TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]
1866 SubqueryAlias: __correlated_sq_2 [c:UInt32, a:UInt32]
1867 Projection: sq2.c, sq2.a [c:UInt32, a:UInt32]
1868 TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]
1869 "
1870 )
1871 }
1872
1873 #[test]
1874 fn exists_subquery_expr_filter() -> Result<()> {
1875 let table_scan = test_table_scan()?;
1876 let subquery_scan = test_table_scan_with_name("sq")?;
1877 let subquery = LogicalPlanBuilder::from(subquery_scan)
1878 .filter(
1879 (lit(1u32) + col("sq.a"))
1880 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1881 )?
1882 .project(vec![lit(1u32)])?
1883 .build()?;
1884 let plan = LogicalPlanBuilder::from(table_scan)
1885 .filter(exists(Arc::new(subquery)))?
1886 .project(vec![col("test.b")])?
1887 .build()?;
1888
1889 assert_optimized_plan_equal!(
1890 plan,
1891 @r"
1892 Projection: test.b [b:UInt32]
1893 LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1894 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1895 SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, a:UInt32]
1896 Projection: UInt32(1), sq.a [UInt32(1):UInt32, a:UInt32]
1897 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1898 "
1899 )
1900 }
1901
1902 #[test]
1903 fn exists_subquery_with_same_table() -> Result<()> {
1904 let outer_scan = test_table_scan()?;
1905 let subquery_scan = test_table_scan()?;
1906 let subquery = LogicalPlanBuilder::from(subquery_scan)
1907 .filter(col("test.a").gt(col("test.b")))?
1908 .project(vec![col("c")])?
1909 .build()?;
1910
1911 let plan = LogicalPlanBuilder::from(outer_scan)
1912 .filter(exists(Arc::new(subquery)))?
1913 .project(vec![col("test.b")])?
1914 .build()?;
1915
1916 assert_optimized_plan_equal!(
1918 plan,
1919 @r"
1920 Projection: test.b [b:UInt32]
1921 LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1922 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1923 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1924 Projection: test.c [c:UInt32]
1925 Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]
1926 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1927 "
1928 )
1929 }
1930
1931 #[test]
1932 fn exists_distinct_subquery() -> Result<()> {
1933 let table_scan = test_table_scan()?;
1934 let subquery_scan = test_table_scan_with_name("sq")?;
1935 let subquery = LogicalPlanBuilder::from(subquery_scan)
1936 .filter(
1937 (lit(1u32) + col("sq.a"))
1938 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1939 )?
1940 .project(vec![col("sq.c")])?
1941 .distinct()?
1942 .build()?;
1943 let plan = LogicalPlanBuilder::from(table_scan)
1944 .filter(exists(Arc::new(subquery)))?
1945 .project(vec![col("test.b")])?
1946 .build()?;
1947
1948 assert_optimized_plan_equal!(
1949 plan,
1950 @r"
1951 Projection: test.b [b:UInt32]
1952 LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1953 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1954 SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1955 Distinct: [c:UInt32, a:UInt32]
1956 Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1957 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1958 "
1959 )
1960 }
1961
1962 #[test]
1963 fn exists_distinct_expr_subquery() -> Result<()> {
1964 let table_scan = test_table_scan()?;
1965 let subquery_scan = test_table_scan_with_name("sq")?;
1966 let subquery = LogicalPlanBuilder::from(subquery_scan)
1967 .filter(
1968 (lit(1u32) + col("sq.a"))
1969 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1970 )?
1971 .project(vec![col("sq.b") + col("sq.c")])?
1972 .distinct()?
1973 .build()?;
1974 let plan = LogicalPlanBuilder::from(table_scan)
1975 .filter(exists(Arc::new(subquery)))?
1976 .project(vec![col("test.b")])?
1977 .build()?;
1978
1979 assert_optimized_plan_equal!(
1980 plan,
1981 @r"
1982 Projection: test.b [b:UInt32]
1983 LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1984 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1985 SubqueryAlias: __correlated_sq_1 [sq.b + sq.c:UInt32, a:UInt32]
1986 Distinct: [sq.b + sq.c:UInt32, a:UInt32]
1987 Projection: sq.b + sq.c, sq.a [sq.b + sq.c:UInt32, a:UInt32]
1988 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1989 "
1990 )
1991 }
1992
1993 #[test]
1994 fn exists_distinct_subquery_with_literal() -> Result<()> {
1995 let table_scan = test_table_scan()?;
1996 let subquery_scan = test_table_scan_with_name("sq")?;
1997 let subquery = LogicalPlanBuilder::from(subquery_scan)
1998 .filter(
1999 (lit(1u32) + col("sq.a"))
2000 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
2001 )?
2002 .project(vec![lit(1u32), col("sq.c")])?
2003 .distinct()?
2004 .build()?;
2005 let plan = LogicalPlanBuilder::from(table_scan)
2006 .filter(exists(Arc::new(subquery)))?
2007 .project(vec![col("test.b")])?
2008 .build()?;
2009
2010 assert_optimized_plan_equal!(
2011 plan,
2012 @r"
2013 Projection: test.b [b:UInt32]
2014 LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
2015 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
2016 SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, c:UInt32, a:UInt32]
2017 Distinct: [UInt32(1):UInt32, c:UInt32, a:UInt32]
2018 Projection: UInt32(1), sq.c, sq.a [UInt32(1):UInt32, c:UInt32, a:UInt32]
2019 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
2020 "
2021 )
2022 }
2023
2024 #[test]
2025 fn exists_uncorrelated_unnest() -> Result<()> {
2026 let subquery_table_source = table_source(&Schema::new(vec![Field::new(
2027 "arr",
2028 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2029 true,
2030 )]));
2031 let subquery = LogicalPlanBuilder::scan_with_filters(
2032 "sq",
2033 subquery_table_source,
2034 None,
2035 vec![],
2036 )?
2037 .unnest_column("arr")?
2038 .build()?;
2039 let table_scan = test_table_scan()?;
2040 let plan = LogicalPlanBuilder::from(table_scan)
2041 .filter(exists(Arc::new(subquery)))?
2042 .project(vec![col("test.b")])?
2043 .build()?;
2044
2045 assert_optimized_plan_equal!(
2046 plan,
2047 @r"
2048 Projection: test.b [b:UInt32]
2049 LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
2050 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
2051 SubqueryAlias: __correlated_sq_1 [arr:Int32;N]
2052 Unnest: lists[sq.arr|depth=1] structs[] [arr:Int32;N]
2053 TableScan: sq [arr:List(Int32);N]
2054 "
2055 )
2056 }
2057
2058 #[test]
2059 fn exists_correlated_unnest() -> Result<()> {
2060 let table_scan = test_table_scan()?;
2061 let subquery_table_source = table_source(&Schema::new(vec![Field::new(
2062 "a",
2063 DataType::List(Arc::new(Field::new_list_field(DataType::UInt32, true))),
2064 true,
2065 )]));
2066 let subquery = LogicalPlanBuilder::scan_with_filters(
2067 "sq",
2068 subquery_table_source,
2069 None,
2070 vec![],
2071 )?
2072 .unnest_column("a")?
2073 .filter(col("a").eq(out_ref_col(DataType::UInt32, "test.b")))?
2074 .build()?;
2075 let plan = LogicalPlanBuilder::from(table_scan)
2076 .filter(exists(Arc::new(subquery)))?
2077 .project(vec![col("test.b")])?
2078 .build()?;
2079
2080 assert_optimized_plan_equal!(
2081 plan,
2082 @r"
2083 Projection: test.b [b:UInt32]
2084 LeftSemi Join: Filter: __correlated_sq_1.a = test.b [a:UInt32, b:UInt32, c:UInt32]
2085 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
2086 SubqueryAlias: __correlated_sq_1 [a:UInt32;N]
2087 Unnest: lists[sq.a|depth=1] structs[] [a:UInt32;N]
2088 TableScan: sq [a:List(UInt32);N]
2089 "
2090 )
2091 }
2092
2093 #[test]
2094 fn upper_case_ident() -> Result<()> {
2095 let fields = vec![
2096 Field::new("A", DataType::UInt32, false),
2097 Field::new("B", DataType::UInt32, false),
2098 ];
2099
2100 let schema = Schema::new(fields);
2101 let table_scan_a = table_scan(Some("\"TEST_A\""), &schema, None)?.build()?;
2102 let table_scan_b = table_scan(Some("\"TEST_B\""), &schema, None)?.build()?;
2103
2104 let subquery = LogicalPlanBuilder::from(table_scan_b)
2105 .filter(col("\"A\"").eq(out_ref_col(DataType::UInt32, "\"TEST_A\".\"A\"")))?
2106 .project(vec![lit(1)])?
2107 .build()?;
2108
2109 let plan = LogicalPlanBuilder::from(table_scan_a)
2110 .filter(exists(Arc::new(subquery)))?
2111 .project(vec![col("\"TEST_A\".\"B\"")])?
2112 .build()?;
2113
2114 assert_optimized_plan_equal!(
2115 plan,
2116 @r"
2117 Projection: TEST_A.B [B:UInt32]
2118 LeftSemi Join: Filter: __correlated_sq_1.A = TEST_A.A [A:UInt32, B:UInt32]
2119 TableScan: TEST_A [A:UInt32, B:UInt32]
2120 SubqueryAlias: __correlated_sq_1 [Int32(1):Int32, A:UInt32]
2121 Projection: Int32(1), TEST_B.A [Int32(1):Int32, A:UInt32]
2122 TableScan: TEST_B [A:UInt32, B:UInt32]
2123 "
2124 )
2125 }
2126}