1use std::collections::BTreeSet;
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::decorrelate::PullUpCorrelatedExpr;
24use crate::optimizer::ApplyOrder;
25use crate::utils::replace_qualified_name;
26use crate::{OptimizerConfig, OptimizerRule};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30use datafusion_common::{Column, Result, assert_or_internal_err, plan_err};
31use datafusion_expr::expr::{Exists, InSubquery};
32use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
33use datafusion_expr::logical_plan::{JoinType, Subquery};
34use datafusion_expr::utils::{conjunction, expr_to_columns, split_conjunction_owned};
35use datafusion_expr::{
36 BinaryExpr, Expr, Filter, LogicalPlan, LogicalPlanBuilder, Operator, exists,
37 in_subquery, lit, not, not_exists, not_in_subquery,
38};
39
40use log::debug;
41
42#[derive(Default, Debug)]
44pub struct DecorrelatePredicateSubquery {}
45
46impl DecorrelatePredicateSubquery {
47 #[expect(missing_docs)]
48 pub fn new() -> Self {
49 Self::default()
50 }
51}
52
53impl OptimizerRule for DecorrelatePredicateSubquery {
54 fn supports_rewrite(&self) -> bool {
55 true
56 }
57
58 fn rewrite(
59 &self,
60 plan: LogicalPlan,
61 config: &dyn OptimizerConfig,
62 ) -> Result<Transformed<LogicalPlan>> {
63 let plan = plan
64 .map_subqueries(|subquery| {
65 subquery.transform_down(|p| self.rewrite(p, config))
66 })?
67 .data;
68
69 let LogicalPlan::Filter(filter) = plan else {
70 return Ok(Transformed::no(plan));
71 };
72
73 if !has_subquery(&filter.predicate) {
74 return Ok(Transformed::no(LogicalPlan::Filter(filter)));
75 }
76
77 let (with_subqueries, mut other_exprs): (Vec<_>, Vec<_>) =
78 split_conjunction_owned(filter.predicate)
79 .into_iter()
80 .partition(has_subquery);
81
82 assert_or_internal_err!(
83 !with_subqueries.is_empty(),
84 "can not find expected subqueries in DecorrelatePredicateSubquery"
85 );
86
87 let mut cur_input = Arc::unwrap_or_clone(filter.input);
89 for subquery_expr in with_subqueries {
90 match extract_subquery_info(subquery_expr) {
91 SubqueryPredicate::Top(subquery) => {
93 match build_join_top(&subquery, &cur_input, config.alias_generator())?
94 {
95 Some(plan) => cur_input = plan,
96 None => other_exprs.push(subquery.expr()),
98 }
99 }
100 SubqueryPredicate::Embedded(expr) => {
102 let (plan, expr_without_subqueries) =
103 rewrite_inner_subqueries(cur_input, expr, config)?;
104 cur_input = plan;
105 other_exprs.push(expr_without_subqueries);
106 }
107 }
108 }
109
110 let expr = conjunction(other_exprs);
111 if let Some(expr) = expr {
112 let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
113 cur_input = LogicalPlan::Filter(new_filter);
114 }
115 Ok(Transformed::yes(cur_input))
116 }
117
118 fn name(&self) -> &str {
119 "decorrelate_predicate_subquery"
120 }
121
122 fn apply_order(&self) -> Option<ApplyOrder> {
123 Some(ApplyOrder::TopDown)
124 }
125}
126
127fn rewrite_inner_subqueries(
128 outer: LogicalPlan,
129 expr: Expr,
130 config: &dyn OptimizerConfig,
131) -> Result<(LogicalPlan, Expr)> {
132 let mut cur_input = outer;
133 let alias = config.alias_generator();
134 let expr_without_subqueries = expr.transform(|e| match e {
135 Expr::Exists(Exists {
136 subquery: Subquery { subquery, .. },
137 negated,
138 }) => match mark_join(&cur_input, &subquery, None, negated, alias)? {
139 Some((plan, exists_expr)) => {
140 cur_input = plan;
141 Ok(Transformed::yes(exists_expr))
142 }
143 None if negated => Ok(Transformed::no(not_exists(subquery))),
144 None => Ok(Transformed::no(exists(subquery))),
145 },
146 Expr::InSubquery(InSubquery {
147 expr,
148 subquery: Subquery { subquery, .. },
149 negated,
150 }) => {
151 let in_predicate = subquery
152 .head_output_expr()?
153 .map_or(plan_err!("single expression required."), |output_expr| {
154 Ok(Expr::eq(*expr.clone(), output_expr))
155 })?;
156 match mark_join(&cur_input, &subquery, Some(&in_predicate), negated, alias)? {
157 Some((plan, exists_expr)) => {
158 cur_input = plan;
159 Ok(Transformed::yes(exists_expr))
160 }
161 None if negated => Ok(Transformed::no(not_in_subquery(*expr, subquery))),
162 None => Ok(Transformed::no(in_subquery(*expr, subquery))),
163 }
164 }
165 _ => Ok(Transformed::no(e)),
166 })?;
167 Ok((cur_input, expr_without_subqueries.data))
168}
169
170enum SubqueryPredicate {
171 Top(SubqueryInfo),
174 Embedded(Expr),
177}
178
179fn extract_subquery_info(expr: Expr) -> SubqueryPredicate {
180 match expr {
181 Expr::Not(not_expr) => match *not_expr {
182 Expr::InSubquery(InSubquery {
183 expr,
184 subquery,
185 negated,
186 }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
187 subquery, *expr, !negated,
188 )),
189 Expr::Exists(Exists { subquery, negated }) => {
190 SubqueryPredicate::Top(SubqueryInfo::new(subquery, !negated))
191 }
192 expr => SubqueryPredicate::Embedded(not(expr)),
193 },
194 Expr::InSubquery(InSubquery {
195 expr,
196 subquery,
197 negated,
198 }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
199 subquery, *expr, negated,
200 )),
201 Expr::Exists(Exists { subquery, negated }) => {
202 SubqueryPredicate::Top(SubqueryInfo::new(subquery, negated))
203 }
204 expr => SubqueryPredicate::Embedded(expr),
205 }
206}
207
208fn has_subquery(expr: &Expr) -> bool {
209 expr.exists(|e| match e {
210 Expr::InSubquery(_) | Expr::Exists(_) => Ok(true),
211 _ => Ok(false),
212 })
213 .unwrap()
214}
215
216fn build_join_top(
247 query_info: &SubqueryInfo,
248 left: &LogicalPlan,
249 alias: &Arc<AliasGenerator>,
250) -> Result<Option<LogicalPlan>> {
251 let where_in_expr_opt = &query_info.where_in_expr;
252 let in_predicate_opt = where_in_expr_opt
253 .clone()
254 .map(|where_in_expr| {
255 query_info
256 .query
257 .subquery
258 .head_output_expr()?
259 .map_or(plan_err!("single expression required."), |expr| {
260 Ok(Expr::eq(where_in_expr, expr))
261 })
262 })
263 .map_or(Ok(None), |v| v.map(Some))?;
264
265 let join_type = match query_info.negated {
266 true => JoinType::LeftAnti,
267 false => JoinType::LeftSemi,
268 };
269 let subquery = query_info.query.subquery.as_ref();
270 let subquery_alias = alias.next("__correlated_sq");
271 build_join(
272 left,
273 subquery,
274 in_predicate_opt.as_ref(),
275 join_type,
276 subquery_alias,
277 )
278}
279
280fn mark_join(
296 left: &LogicalPlan,
297 subquery: &LogicalPlan,
298 in_predicate_opt: Option<&Expr>,
299 negated: bool,
300 alias_generator: &Arc<AliasGenerator>,
301) -> Result<Option<(LogicalPlan, Expr)>> {
302 let alias = alias_generator.next("__correlated_sq");
303
304 let exists_col = Expr::Column(Column::new(Some(alias.clone()), "mark"));
305 let exists_expr = if negated { !exists_col } else { exists_col };
306
307 Ok(
308 build_join(left, subquery, in_predicate_opt, JoinType::LeftMark, alias)?
309 .map(|plan| (plan, exists_expr)),
310 )
311}
312
313fn build_join(
314 left: &LogicalPlan,
315 subquery: &LogicalPlan,
316 in_predicate_opt: Option<&Expr>,
317 join_type: JoinType,
318 alias: String,
319) -> Result<Option<LogicalPlan>> {
320 let mut pull_up = PullUpCorrelatedExpr::new()
321 .with_in_predicate_opt(in_predicate_opt.cloned())
322 .with_exists_sub_query(in_predicate_opt.is_none());
323
324 let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
325 if !pull_up.can_pull_up {
326 return Ok(None);
327 }
328
329 let sub_query_alias = LogicalPlanBuilder::from(new_plan)
330 .alias(alias.to_string())?
331 .build()?;
332 let mut all_correlated_cols = BTreeSet::new();
333 pull_up
334 .correlated_subquery_cols_map
335 .values()
336 .for_each(|cols| all_correlated_cols.extend(cols.clone()));
337
338 let join_filter_opt = conjunction(pull_up.join_filters)
340 .map_or(Ok(None), |filter| {
341 replace_qualified_name(filter, &all_correlated_cols, &alias).map(Some)
342 })?;
343
344 let join_filter = match (join_filter_opt, in_predicate_opt.cloned()) {
345 (
346 Some(join_filter),
347 Some(Expr::BinaryExpr(BinaryExpr {
348 left,
349 op: Operator::Eq,
350 right,
351 })),
352 ) => {
353 let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
354 let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
355 in_predicate.and(join_filter)
356 }
357 (Some(join_filter), _) => join_filter,
358 (
359 _,
360 Some(Expr::BinaryExpr(BinaryExpr {
361 left,
362 op: Operator::Eq,
363 right,
364 })),
365 ) => {
366 let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
367
368 Expr::eq(left.deref().clone(), Expr::Column(right_col))
369 }
370 (None, None) => lit(true),
371 _ => return Ok(None),
372 };
373
374 if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) {
375 let right_schema = sub_query_alias.schema();
376
377 let mut needed = std::collections::HashSet::new();
379 expr_to_columns(&join_filter, &mut needed)?;
380 if let Some(in_pred) = in_predicate_opt {
381 expr_to_columns(in_pred, &mut needed)?;
382 }
383
384 let mut right_cols_idx_and_col: Vec<(usize, Column)> = needed
387 .into_iter()
388 .filter_map(|c| right_schema.index_of_column(&c).ok().map(|idx| (idx, c)))
389 .collect();
390
391 right_cols_idx_and_col.sort_by_key(|(idx, _)| *idx);
392
393 let right_proj_exprs: Vec<Expr> = right_cols_idx_and_col
394 .into_iter()
395 .map(|(_, c)| Expr::Column(c))
396 .collect();
397
398 let right_projected = if !right_proj_exprs.is_empty() {
399 LogicalPlanBuilder::from(sub_query_alias.clone())
400 .project(right_proj_exprs)?
401 .build()?
402 } else {
403 sub_query_alias.clone()
405 };
406 let new_plan = LogicalPlanBuilder::from(left.clone())
407 .join_on(right_projected, join_type, Some(join_filter))?
408 .build()?;
409
410 debug!(
411 "predicate subquery optimized:\n{}",
412 new_plan.display_indent()
413 );
414
415 return Ok(Some(new_plan));
416 }
417
418 let new_plan = LogicalPlanBuilder::from(left.clone())
420 .join_on(sub_query_alias, join_type, Some(join_filter))?
421 .build()?;
422 debug!(
423 "predicate subquery optimized:\n{}",
424 new_plan.display_indent()
425 );
426 Ok(Some(new_plan))
427}
428
429#[derive(Debug)]
430struct SubqueryInfo {
431 query: Subquery,
432 where_in_expr: Option<Expr>,
433 negated: bool,
434}
435
436impl SubqueryInfo {
437 pub fn new(query: Subquery, negated: bool) -> Self {
438 Self {
439 query,
440 where_in_expr: None,
441 negated,
442 }
443 }
444
445 pub fn new_with_in_expr(query: Subquery, expr: Expr, negated: bool) -> Self {
446 Self {
447 query,
448 where_in_expr: Some(expr),
449 negated,
450 }
451 }
452
453 pub fn expr(self) -> Expr {
454 match self.where_in_expr {
455 Some(expr) => match self.negated {
456 true => not_in_subquery(expr, self.query.subquery),
457 false => in_subquery(expr, self.query.subquery),
458 },
459 None => match self.negated {
460 true => not_exists(self.query.subquery),
461 false => exists(self.query.subquery),
462 },
463 }
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use std::ops::Add;
470
471 use super::*;
472 use crate::test::*;
473
474 use crate::assert_optimized_plan_eq_display_indent_snapshot;
475 use arrow::datatypes::{DataType, Field, Schema};
476 use datafusion_expr::builder::table_source;
477 use datafusion_expr::{and, binary_expr, col, lit, not, out_ref_col, table_scan};
478
479 macro_rules! assert_optimized_plan_equal {
480 (
481 $plan:expr,
482 @ $expected:literal $(,)?
483 ) => {{
484 let rule: Arc<dyn crate::OptimizerRule + Send + Sync> = Arc::new(DecorrelatePredicateSubquery::new());
485 assert_optimized_plan_eq_display_indent_snapshot!(
486 rule,
487 $plan,
488 @ $expected,
489 )
490 }};
491 }
492
493 fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
494 let table_scan = test_table_scan_with_name(name)?;
495 Ok(Arc::new(
496 LogicalPlanBuilder::from(table_scan)
497 .project(vec![col("c")])?
498 .build()?,
499 ))
500 }
501
502 #[test]
504 fn in_subquery_multiple() -> Result<()> {
505 let table_scan = test_table_scan()?;
506 let plan = LogicalPlanBuilder::from(table_scan)
507 .filter(and(
508 in_subquery(col("c"), test_subquery_with_name("sq_1")?),
509 in_subquery(col("b"), test_subquery_with_name("sq_2")?),
510 ))?
511 .project(vec![col("test.b")])?
512 .build()?;
513
514 assert_optimized_plan_equal!(
515 plan,
516 @r"
517 Projection: test.b [b:UInt32]
518 LeftSemi Join: Filter: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]
519 LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
520 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
521 SubqueryAlias: __correlated_sq_1 [c:UInt32]
522 Projection: sq_1.c [c:UInt32]
523 TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]
524 SubqueryAlias: __correlated_sq_2 [c:UInt32]
525 Projection: sq_2.c [c:UInt32]
526 TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]
527 "
528 )
529 }
530
531 #[test]
533 fn in_subquery_with_and_filters() -> Result<()> {
534 let table_scan = test_table_scan()?;
535 let plan = LogicalPlanBuilder::from(table_scan)
536 .filter(and(
537 in_subquery(col("c"), test_subquery_with_name("sq")?),
538 and(
539 binary_expr(col("a"), Operator::Eq, lit(1_u32)),
540 binary_expr(col("b"), Operator::Lt, lit(30_u32)),
541 ),
542 ))?
543 .project(vec![col("test.b")])?
544 .build()?;
545
546 assert_optimized_plan_equal!(
547 plan,
548 @r"
549 Projection: test.b [b:UInt32]
550 Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]
551 LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
552 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
553 SubqueryAlias: __correlated_sq_1 [c:UInt32]
554 Projection: sq.c [c:UInt32]
555 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
556 "
557 )
558 }
559
560 #[test]
562 fn in_subquery_nested() -> Result<()> {
563 let table_scan = test_table_scan()?;
564
565 let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
566 .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
567 .project(vec![col("a")])?
568 .build()?;
569
570 let plan = LogicalPlanBuilder::from(table_scan)
571 .filter(in_subquery(col("b"), Arc::new(subquery)))?
572 .project(vec![col("test.b")])?
573 .build()?;
574
575 assert_optimized_plan_equal!(
576 plan,
577 @r"
578 Projection: test.b [b:UInt32]
579 LeftSemi Join: Filter: test.b = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
580 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
581 SubqueryAlias: __correlated_sq_2 [a:UInt32]
582 Projection: sq.a [a:UInt32]
583 LeftSemi Join: Filter: sq.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
584 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
585 SubqueryAlias: __correlated_sq_1 [c:UInt32]
586 Projection: sq_nested.c [c:UInt32]
587 TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]
588 "
589 )
590 }
591
592 #[test]
595 fn multiple_subqueries() -> Result<()> {
596 let orders = Arc::new(
597 LogicalPlanBuilder::from(scan_tpch_table("orders"))
598 .filter(
599 col("orders.o_custkey")
600 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
601 )?
602 .project(vec![col("orders.o_custkey")])?
603 .build()?,
604 );
605 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
606 .filter(
607 in_subquery(col("customer.c_custkey"), Arc::clone(&orders))
608 .and(in_subquery(col("customer.c_custkey"), orders)),
609 )?
610 .project(vec![col("customer.c_custkey")])?
611 .build()?;
612 debug!("plan to optimize:\n{}", plan.display_indent());
613
614 assert_optimized_plan_equal!(
615 plan,
616 @r"
617 Projection: customer.c_custkey [c_custkey:Int64]
618 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
619 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
620 TableScan: customer [c_custkey:Int64, c_name:Utf8]
621 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
622 Projection: orders.o_custkey [o_custkey:Int64]
623 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
624 SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
625 Projection: orders.o_custkey [o_custkey:Int64]
626 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
627 "
628 )
629 }
630
631 #[test]
634 fn recursive_subqueries() -> Result<()> {
635 let lineitem = Arc::new(
636 LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
637 .filter(
638 col("lineitem.l_orderkey")
639 .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
640 )?
641 .project(vec![col("lineitem.l_orderkey")])?
642 .build()?,
643 );
644
645 let orders = Arc::new(
646 LogicalPlanBuilder::from(scan_tpch_table("orders"))
647 .filter(
648 in_subquery(col("orders.o_orderkey"), lineitem).and(
649 col("orders.o_custkey")
650 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
651 ),
652 )?
653 .project(vec![col("orders.o_custkey")])?
654 .build()?,
655 );
656
657 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
658 .filter(in_subquery(col("customer.c_custkey"), orders))?
659 .project(vec![col("customer.c_custkey")])?
660 .build()?;
661
662 assert_optimized_plan_equal!(
663 plan,
664 @r"
665 Projection: customer.c_custkey [c_custkey:Int64]
666 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
667 TableScan: customer [c_custkey:Int64, c_name:Utf8]
668 SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
669 Projection: orders.o_custkey [o_custkey:Int64]
670 LeftSemi Join: Filter: orders.o_orderkey = __correlated_sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
671 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
672 SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]
673 Projection: lineitem.l_orderkey [l_orderkey:Int64]
674 TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]
675 "
676 )
677 }
678
679 #[test]
681 fn in_subquery_with_subquery_filters() -> Result<()> {
682 let sq = Arc::new(
683 LogicalPlanBuilder::from(scan_tpch_table("orders"))
684 .filter(
685 out_ref_col(DataType::Int64, "customer.c_custkey")
686 .eq(col("orders.o_custkey"))
687 .and(col("o_orderkey").eq(lit(1))),
688 )?
689 .project(vec![col("orders.o_custkey")])?
690 .build()?,
691 );
692
693 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
694 .filter(in_subquery(col("customer.c_custkey"), sq))?
695 .project(vec![col("customer.c_custkey")])?
696 .build()?;
697
698 assert_optimized_plan_equal!(
699 plan,
700 @r"
701 Projection: customer.c_custkey [c_custkey:Int64]
702 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
703 TableScan: customer [c_custkey:Int64, c_name:Utf8]
704 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
705 Projection: orders.o_custkey [o_custkey:Int64]
706 Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
707 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
708 "
709 )
710 }
711
712 #[test]
714 fn in_subquery_no_cols() -> Result<()> {
715 let sq = Arc::new(
716 LogicalPlanBuilder::from(scan_tpch_table("orders"))
717 .filter(
718 out_ref_col(DataType::Int64, "customer.c_custkey")
719 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
720 )?
721 .project(vec![col("orders.o_custkey")])?
722 .build()?,
723 );
724
725 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
726 .filter(in_subquery(col("customer.c_custkey"), sq))?
727 .project(vec![col("customer.c_custkey")])?
728 .build()?;
729
730 assert_optimized_plan_equal!(
731 plan,
732 @r"
733 Projection: customer.c_custkey [c_custkey:Int64]
734 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
735 TableScan: customer [c_custkey:Int64, c_name:Utf8]
736 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
737 Projection: orders.o_custkey [o_custkey:Int64]
738 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
739 "
740 )
741 }
742
743 #[test]
745 fn in_subquery_with_no_correlated_cols() -> Result<()> {
746 let sq = Arc::new(
747 LogicalPlanBuilder::from(scan_tpch_table("orders"))
748 .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
749 .project(vec![col("orders.o_custkey")])?
750 .build()?,
751 );
752
753 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
754 .filter(in_subquery(col("customer.c_custkey"), sq))?
755 .project(vec![col("customer.c_custkey")])?
756 .build()?;
757
758 assert_optimized_plan_equal!(
759 plan,
760 @r"
761 Projection: customer.c_custkey [c_custkey:Int64]
762 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
763 TableScan: customer [c_custkey:Int64, c_name:Utf8]
764 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
765 Projection: orders.o_custkey [o_custkey:Int64]
766 Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
767 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
768 "
769 )
770 }
771
772 #[test]
774 fn in_subquery_where_not_eq() -> Result<()> {
775 let sq = Arc::new(
776 LogicalPlanBuilder::from(scan_tpch_table("orders"))
777 .filter(
778 out_ref_col(DataType::Int64, "customer.c_custkey")
779 .not_eq(col("orders.o_custkey")),
780 )?
781 .project(vec![col("orders.o_custkey")])?
782 .build()?,
783 );
784
785 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
786 .filter(in_subquery(col("customer.c_custkey"), sq))?
787 .project(vec![col("customer.c_custkey")])?
788 .build()?;
789
790 assert_optimized_plan_equal!(
791 plan,
792 @r"
793 Projection: customer.c_custkey [c_custkey:Int64]
794 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
795 TableScan: customer [c_custkey:Int64, c_name:Utf8]
796 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
797 Projection: orders.o_custkey [o_custkey:Int64]
798 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
799 "
800 )
801 }
802
803 #[test]
805 fn in_subquery_where_less_than() -> Result<()> {
806 let sq = Arc::new(
807 LogicalPlanBuilder::from(scan_tpch_table("orders"))
808 .filter(
809 out_ref_col(DataType::Int64, "customer.c_custkey")
810 .lt(col("orders.o_custkey")),
811 )?
812 .project(vec![col("orders.o_custkey")])?
813 .build()?,
814 );
815
816 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
817 .filter(in_subquery(col("customer.c_custkey"), sq))?
818 .project(vec![col("customer.c_custkey")])?
819 .build()?;
820
821 assert_optimized_plan_equal!(
822 plan,
823 @r"
824 Projection: customer.c_custkey [c_custkey:Int64]
825 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
826 TableScan: customer [c_custkey:Int64, c_name:Utf8]
827 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
828 Projection: orders.o_custkey [o_custkey:Int64]
829 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
830 "
831 )
832 }
833
834 #[test]
836 fn in_subquery_with_subquery_disjunction() -> Result<()> {
837 let sq = Arc::new(
838 LogicalPlanBuilder::from(scan_tpch_table("orders"))
839 .filter(
840 out_ref_col(DataType::Int64, "customer.c_custkey")
841 .eq(col("orders.o_custkey"))
842 .or(col("o_orderkey").eq(lit(1))),
843 )?
844 .project(vec![col("orders.o_custkey")])?
845 .build()?,
846 );
847
848 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
849 .filter(in_subquery(col("customer.c_custkey"), sq))?
850 .project(vec![col("customer.c_custkey")])?
851 .build()?;
852
853 assert_optimized_plan_equal!(
854 plan,
855 @r"
856 Projection: customer.c_custkey [c_custkey:Int64]
857 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND (customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1)) [c_custkey:Int64, c_name:Utf8]
858 TableScan: customer [c_custkey:Int64, c_name:Utf8]
859 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]
860 Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]
861 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
862 "
863 )
864 }
865
866 #[test]
868 fn in_subquery_no_projection() -> Result<()> {
869 let sq = Arc::new(
870 LogicalPlanBuilder::from(scan_tpch_table("orders"))
871 .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
872 .build()?,
873 );
874
875 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
876 .filter(in_subquery(col("customer.c_custkey"), sq))?
877 .project(vec![col("customer.c_custkey")])?
878 .build()?;
879
880 let expected = "Invalid (non-executable) plan after Analyzer\
882 \ncaused by\
883 \nError during planning: InSubquery should only return one column, but found 4";
884 assert_analyzer_check_err(vec![], plan, expected);
885
886 Ok(())
887 }
888
889 #[test]
891 fn in_subquery_join_expr() -> Result<()> {
892 let sq = Arc::new(
893 LogicalPlanBuilder::from(scan_tpch_table("orders"))
894 .filter(
895 out_ref_col(DataType::Int64, "customer.c_custkey")
896 .eq(col("orders.o_custkey")),
897 )?
898 .project(vec![col("orders.o_custkey")])?
899 .build()?,
900 );
901
902 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
903 .filter(in_subquery(col("customer.c_custkey").add(lit(1)), sq))?
904 .project(vec![col("customer.c_custkey")])?
905 .build()?;
906
907 assert_optimized_plan_equal!(
908 plan,
909 @r"
910 Projection: customer.c_custkey [c_custkey:Int64]
911 LeftSemi Join: Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
912 TableScan: customer [c_custkey:Int64, c_name:Utf8]
913 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
914 Projection: orders.o_custkey [o_custkey:Int64]
915 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
916 "
917 )
918 }
919
920 #[test]
922 fn in_subquery_project_expr() -> Result<()> {
923 let sq = Arc::new(
924 LogicalPlanBuilder::from(scan_tpch_table("orders"))
925 .filter(
926 out_ref_col(DataType::Int64, "customer.c_custkey")
927 .eq(col("orders.o_custkey")),
928 )?
929 .project(vec![col("orders.o_custkey").add(lit(1))])?
930 .build()?,
931 );
932
933 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
934 .filter(in_subquery(col("customer.c_custkey"), sq))?
935 .project(vec![col("customer.c_custkey")])?
936 .build()?;
937
938 assert_optimized_plan_equal!(
939 plan,
940 @r"
941 Projection: customer.c_custkey [c_custkey:Int64]
942 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.orders.o_custkey + Int32(1) AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
943 TableScan: customer [c_custkey:Int64, c_name:Utf8]
944 SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
945 Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
946 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
947 "
948 )
949 }
950
951 #[test]
953 fn in_subquery_multi_col() -> Result<()> {
954 let sq = Arc::new(
955 LogicalPlanBuilder::from(scan_tpch_table("orders"))
956 .filter(
957 out_ref_col(DataType::Int64, "customer.c_custkey")
958 .eq(col("orders.o_custkey")),
959 )?
960 .project(vec![col("orders.o_custkey"), col("orders.o_orderkey")])?
961 .build()?,
962 );
963
964 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
965 .filter(
966 in_subquery(col("customer.c_custkey"), sq)
967 .and(col("c_custkey").eq(lit(1))),
968 )?
969 .project(vec![col("customer.c_custkey")])?
970 .build()?;
971
972 let expected = "Invalid (non-executable) plan after Analyzer\
973 \ncaused by\
974 \nError during planning: InSubquery should only return one column";
975 assert_analyzer_check_err(vec![], plan, expected);
976
977 Ok(())
978 }
979
980 #[test]
982 fn should_support_additional_filters() -> Result<()> {
983 let sq = Arc::new(
984 LogicalPlanBuilder::from(scan_tpch_table("orders"))
985 .filter(
986 out_ref_col(DataType::Int64, "customer.c_custkey")
987 .eq(col("orders.o_custkey")),
988 )?
989 .project(vec![col("orders.o_custkey")])?
990 .build()?,
991 );
992
993 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
994 .filter(
995 in_subquery(col("customer.c_custkey"), sq)
996 .and(col("c_custkey").eq(lit(1))),
997 )?
998 .project(vec![col("customer.c_custkey")])?
999 .build()?;
1000
1001 assert_optimized_plan_equal!(
1002 plan,
1003 @r"
1004 Projection: customer.c_custkey [c_custkey:Int64]
1005 Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1006 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1007 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1008 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1009 Projection: orders.o_custkey [o_custkey:Int64]
1010 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1011 "
1012 )
1013 }
1014
1015 #[test]
1017 fn in_subquery_correlated() -> Result<()> {
1018 let sq = Arc::new(
1019 LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1020 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1021 .project(vec![col("c")])?
1022 .build()?,
1023 );
1024
1025 let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1026 .filter(in_subquery(col("c"), sq))?
1027 .project(vec![col("test.b")])?
1028 .build()?;
1029
1030 assert_optimized_plan_equal!(
1031 plan,
1032 @r"
1033 Projection: test.b [b:UInt32]
1034 LeftSemi Join: Filter: test.c = __correlated_sq_1.c AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1035 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1036 SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1037 Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1038 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1039 "
1040 )
1041 }
1042
1043 #[test]
1045 fn in_subquery_simple() -> Result<()> {
1046 let table_scan = test_table_scan()?;
1047 let plan = LogicalPlanBuilder::from(table_scan)
1048 .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
1049 .project(vec![col("test.b")])?
1050 .build()?;
1051
1052 assert_optimized_plan_equal!(
1053 plan,
1054 @r"
1055 Projection: test.b [b:UInt32]
1056 LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1057 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1058 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1059 Projection: sq.c [c:UInt32]
1060 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1061 "
1062 )
1063 }
1064
1065 #[test]
1067 fn not_in_subquery_simple() -> Result<()> {
1068 let table_scan = test_table_scan()?;
1069 let plan = LogicalPlanBuilder::from(table_scan)
1070 .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
1071 .project(vec![col("test.b")])?
1072 .build()?;
1073
1074 assert_optimized_plan_equal!(
1075 plan,
1076 @r"
1077 Projection: test.b [b:UInt32]
1078 LeftAnti Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1079 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1080 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1081 Projection: sq.c [c:UInt32]
1082 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1083 "
1084 )
1085 }
1086
1087 #[test]
1088 fn wrapped_not_in_subquery() -> Result<()> {
1089 let table_scan = test_table_scan()?;
1090 let plan = LogicalPlanBuilder::from(table_scan)
1091 .filter(not(in_subquery(col("c"), test_subquery_with_name("sq")?)))?
1092 .project(vec![col("test.b")])?
1093 .build()?;
1094
1095 assert_optimized_plan_equal!(
1096 plan,
1097 @r"
1098 Projection: test.b [b:UInt32]
1099 LeftAnti Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1100 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1101 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1102 Projection: sq.c [c:UInt32]
1103 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1104 "
1105 )
1106 }
1107
1108 #[test]
1109 fn wrapped_not_not_in_subquery() -> Result<()> {
1110 let table_scan = test_table_scan()?;
1111 let plan = LogicalPlanBuilder::from(table_scan)
1112 .filter(not(not_in_subquery(
1113 col("c"),
1114 test_subquery_with_name("sq")?,
1115 )))?
1116 .project(vec![col("test.b")])?
1117 .build()?;
1118
1119 assert_optimized_plan_equal!(
1120 plan,
1121 @r"
1122 Projection: test.b [b:UInt32]
1123 LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1124 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1125 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1126 Projection: sq.c [c:UInt32]
1127 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1128 "
1129 )
1130 }
1131
1132 #[test]
1133 fn in_subquery_both_side_expr() -> Result<()> {
1134 let table_scan = test_table_scan()?;
1135 let subquery_scan = test_table_scan_with_name("sq")?;
1136
1137 let subquery = LogicalPlanBuilder::from(subquery_scan)
1138 .project(vec![col("c") * lit(2u32)])?
1139 .build()?;
1140
1141 let plan = LogicalPlanBuilder::from(table_scan)
1142 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1143 .project(vec![col("test.b")])?
1144 .build()?;
1145
1146 assert_optimized_plan_equal!(
1147 plan,
1148 @r"
1149 Projection: test.b [b:UInt32]
1150 LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1151 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1152 SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32]
1153 Projection: sq.c * UInt32(2) [sq.c * UInt32(2):UInt32]
1154 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1155 "
1156 )
1157 }
1158
1159 #[test]
1160 fn in_subquery_join_filter_and_inner_filter() -> Result<()> {
1161 let table_scan = test_table_scan()?;
1162 let subquery_scan = test_table_scan_with_name("sq")?;
1163
1164 let subquery = LogicalPlanBuilder::from(subquery_scan)
1165 .filter(
1166 out_ref_col(DataType::UInt32, "test.a")
1167 .eq(col("sq.a"))
1168 .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1169 )?
1170 .project(vec![col("c") * lit(2u32)])?
1171 .build()?;
1172
1173 let plan = LogicalPlanBuilder::from(table_scan)
1174 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1175 .project(vec![col("test.b")])?
1176 .build()?;
1177
1178 assert_optimized_plan_equal!(
1179 plan,
1180 @r"
1181 Projection: test.b [b:UInt32]
1182 LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1183 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1184 SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32]
1185 Projection: sq.c * UInt32(2), sq.a [sq.c * UInt32(2):UInt32, a:UInt32]
1186 Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]
1187 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1188 "
1189 )
1190 }
1191
1192 #[test]
1193 fn in_subquery_multi_project_subquery_cols() -> Result<()> {
1194 let table_scan = test_table_scan()?;
1195 let subquery_scan = test_table_scan_with_name("sq")?;
1196
1197 let subquery = LogicalPlanBuilder::from(subquery_scan)
1198 .filter(
1199 out_ref_col(DataType::UInt32, "test.a")
1200 .add(out_ref_col(DataType::UInt32, "test.b"))
1201 .eq(col("sq.a").add(col("sq.b")))
1202 .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1203 )?
1204 .project(vec![col("c") * lit(2u32)])?
1205 .build()?;
1206
1207 let plan = LogicalPlanBuilder::from(table_scan)
1208 .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1209 .project(vec![col("test.b")])?
1210 .build()?;
1211
1212 assert_optimized_plan_equal!(
1213 plan,
1214 @r"
1215 Projection: test.b [b:UInt32]
1216 LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a + test.b = __correlated_sq_1.a + __correlated_sq_1.b [a:UInt32, b:UInt32, c:UInt32]
1217 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1218 SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]
1219 Projection: sq.c * UInt32(2), sq.a, sq.b [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]
1220 Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]
1221 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1222 "
1223 )
1224 }
1225
1226 #[test]
1227 fn two_in_subquery_with_outer_filter() -> Result<()> {
1228 let table_scan = test_table_scan()?;
1229 let subquery_scan1 = test_table_scan_with_name("sq1")?;
1230 let subquery_scan2 = test_table_scan_with_name("sq2")?;
1231
1232 let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1233 .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq1.a")))?
1234 .project(vec![col("c") * lit(2u32)])?
1235 .build()?;
1236
1237 let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1238 .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq2.a")))?
1239 .project(vec![col("c") * lit(2u32)])?
1240 .build()?;
1241
1242 let plan = LogicalPlanBuilder::from(table_scan)
1243 .filter(
1244 in_subquery(col("c") + lit(1u32), Arc::new(subquery1)).and(
1245 in_subquery(col("c") * lit(2u32), Arc::new(subquery2))
1246 .and(col("test.c").gt(lit(1u32))),
1247 ),
1248 )?
1249 .project(vec![col("test.b")])?
1250 .build()?;
1251
1252 assert_optimized_plan_equal!(
1253 plan,
1254 @r"
1255 Projection: test.b [b:UInt32]
1256 Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]
1257 LeftSemi Join: Filter: test.c * UInt32(2) = __correlated_sq_2.sq2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
1258 LeftSemi Join: Filter: test.c + UInt32(1) = __correlated_sq_1.sq1.c * UInt32(2) AND test.a > __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1259 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1260 SubqueryAlias: __correlated_sq_1 [sq1.c * UInt32(2):UInt32, a:UInt32]
1261 Projection: sq1.c * UInt32(2), sq1.a [sq1.c * UInt32(2):UInt32, a:UInt32]
1262 TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]
1263 SubqueryAlias: __correlated_sq_2 [sq2.c * UInt32(2):UInt32, a:UInt32]
1264 Projection: sq2.c * UInt32(2), sq2.a [sq2.c * UInt32(2):UInt32, a:UInt32]
1265 TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]
1266 "
1267 )
1268 }
1269
1270 #[test]
1271 fn in_subquery_with_same_table() -> Result<()> {
1272 let outer_scan = test_table_scan()?;
1273 let subquery_scan = test_table_scan()?;
1274 let subquery = LogicalPlanBuilder::from(subquery_scan)
1275 .filter(col("test.a").gt(col("test.b")))?
1276 .project(vec![col("c")])?
1277 .build()?;
1278
1279 let plan = LogicalPlanBuilder::from(outer_scan)
1280 .filter(in_subquery(col("test.a"), Arc::new(subquery)))?
1281 .project(vec![col("test.b")])?
1282 .build()?;
1283
1284 assert_optimized_plan_equal!(
1286 plan,
1287 @r"
1288 Projection: test.b [b:UInt32]
1289 LeftSemi Join: Filter: test.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1290 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1291 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1292 Projection: test.c [c:UInt32]
1293 Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]
1294 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1295 "
1296 )
1297 }
1298
1299 #[test]
1301 fn multiple_exists_subqueries() -> Result<()> {
1302 let orders = Arc::new(
1303 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1304 .filter(
1305 col("orders.o_custkey")
1306 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1307 )?
1308 .project(vec![col("orders.o_custkey")])?
1309 .build()?,
1310 );
1311
1312 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1313 .filter(exists(Arc::clone(&orders)).and(exists(orders)))?
1314 .project(vec![col("customer.c_custkey")])?
1315 .build()?;
1316
1317 assert_optimized_plan_equal!(
1318 plan,
1319 @r"
1320 Projection: customer.c_custkey [c_custkey:Int64]
1321 LeftSemi Join: Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1322 LeftSemi Join: Filter: __correlated_sq_1.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1323 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1324 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1325 Projection: orders.o_custkey [o_custkey:Int64]
1326 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1327 SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
1328 Projection: orders.o_custkey [o_custkey:Int64]
1329 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1330 "
1331 )
1332 }
1333
1334 #[test]
1336 fn recursive_exists_subqueries() -> Result<()> {
1337 let lineitem = Arc::new(
1338 LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
1339 .filter(
1340 col("lineitem.l_orderkey")
1341 .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
1342 )?
1343 .project(vec![col("lineitem.l_orderkey")])?
1344 .build()?,
1345 );
1346
1347 let orders = Arc::new(
1348 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1349 .filter(
1350 exists(lineitem).and(
1351 col("orders.o_custkey")
1352 .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1353 ),
1354 )?
1355 .project(vec![col("orders.o_custkey")])?
1356 .build()?,
1357 );
1358
1359 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1360 .filter(exists(orders))?
1361 .project(vec![col("customer.c_custkey")])?
1362 .build()?;
1363
1364 assert_optimized_plan_equal!(
1365 plan,
1366 @r"
1367 Projection: customer.c_custkey [c_custkey:Int64]
1368 LeftSemi Join: Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1369 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1370 SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
1371 Projection: orders.o_custkey [o_custkey:Int64]
1372 LeftSemi Join: Filter: __correlated_sq_1.l_orderkey = orders.o_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1373 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1374 SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]
1375 Projection: lineitem.l_orderkey [l_orderkey:Int64]
1376 TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]
1377 "
1378 )
1379 }
1380
1381 #[test]
1383 fn exists_subquery_with_subquery_filters() -> Result<()> {
1384 let sq = Arc::new(
1385 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1386 .filter(
1387 out_ref_col(DataType::Int64, "customer.c_custkey")
1388 .eq(col("orders.o_custkey"))
1389 .and(col("o_orderkey").eq(lit(1))),
1390 )?
1391 .project(vec![col("orders.o_custkey")])?
1392 .build()?,
1393 );
1394
1395 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1396 .filter(exists(sq))?
1397 .project(vec![col("customer.c_custkey")])?
1398 .build()?;
1399
1400 assert_optimized_plan_equal!(
1401 plan,
1402 @r"
1403 Projection: customer.c_custkey [c_custkey:Int64]
1404 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1405 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1406 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1407 Projection: orders.o_custkey [o_custkey:Int64]
1408 Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1409 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1410 "
1411 )
1412 }
1413
1414 #[test]
1415 fn exists_subquery_no_cols() -> Result<()> {
1416 let sq = Arc::new(
1417 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1418 .filter(out_ref_col(DataType::Int64, "customer.c_custkey").eq(lit(1u32)))?
1419 .project(vec![col("orders.o_custkey")])?
1420 .build()?,
1421 );
1422
1423 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1424 .filter(exists(sq))?
1425 .project(vec![col("customer.c_custkey")])?
1426 .build()?;
1427
1428 assert_optimized_plan_equal!(
1430 plan,
1431 @r"
1432 Projection: customer.c_custkey [c_custkey:Int64]
1433 LeftSemi Join: Filter: customer.c_custkey = UInt32(1) [c_custkey:Int64, c_name:Utf8]
1434 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1435 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1436 Projection: orders.o_custkey [o_custkey:Int64]
1437 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1438 "
1439 )
1440 }
1441
1442 #[test]
1444 fn exists_subquery_with_no_correlated_cols() -> Result<()> {
1445 let sq = Arc::new(
1446 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1447 .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
1448 .project(vec![col("orders.o_custkey")])?
1449 .build()?,
1450 );
1451
1452 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1453 .filter(exists(sq))?
1454 .project(vec![col("customer.c_custkey")])?
1455 .build()?;
1456
1457 assert_optimized_plan_equal!(
1458 plan,
1459 @r"
1460 Projection: customer.c_custkey [c_custkey:Int64]
1461 LeftSemi Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8]
1462 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1463 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1464 Projection: orders.o_custkey [o_custkey:Int64]
1465 Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1466 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1467 "
1468 )
1469 }
1470
1471 #[test]
1473 fn exists_subquery_where_not_eq() -> Result<()> {
1474 let sq = Arc::new(
1475 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1476 .filter(
1477 out_ref_col(DataType::Int64, "customer.c_custkey")
1478 .not_eq(col("orders.o_custkey")),
1479 )?
1480 .project(vec![col("orders.o_custkey")])?
1481 .build()?,
1482 );
1483
1484 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1485 .filter(exists(sq))?
1486 .project(vec![col("customer.c_custkey")])?
1487 .build()?;
1488
1489 assert_optimized_plan_equal!(
1490 plan,
1491 @r"
1492 Projection: customer.c_custkey [c_custkey:Int64]
1493 LeftSemi Join: Filter: customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1494 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1495 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1496 Projection: orders.o_custkey [o_custkey:Int64]
1497 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1498 "
1499 )
1500 }
1501
1502 #[test]
1504 fn exists_subquery_where_less_than() -> Result<()> {
1505 let sq = Arc::new(
1506 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1507 .filter(
1508 out_ref_col(DataType::Int64, "customer.c_custkey")
1509 .lt(col("orders.o_custkey")),
1510 )?
1511 .project(vec![col("orders.o_custkey")])?
1512 .build()?,
1513 );
1514
1515 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1516 .filter(exists(sq))?
1517 .project(vec![col("customer.c_custkey")])?
1518 .build()?;
1519
1520 assert_optimized_plan_equal!(
1521 plan,
1522 @r"
1523 Projection: customer.c_custkey [c_custkey:Int64]
1524 LeftSemi Join: Filter: customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1525 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1526 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1527 Projection: orders.o_custkey [o_custkey:Int64]
1528 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1529 "
1530 )
1531 }
1532
1533 #[test]
1535 fn exists_subquery_with_subquery_disjunction() -> Result<()> {
1536 let sq = Arc::new(
1537 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1538 .filter(
1539 out_ref_col(DataType::Int64, "customer.c_custkey")
1540 .eq(col("orders.o_custkey"))
1541 .or(col("o_orderkey").eq(lit(1))),
1542 )?
1543 .project(vec![col("orders.o_custkey")])?
1544 .build()?,
1545 );
1546
1547 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1548 .filter(exists(sq))?
1549 .project(vec![col("customer.c_custkey")])?
1550 .build()?;
1551
1552 assert_optimized_plan_equal!(
1553 plan,
1554 @r"
1555 Projection: customer.c_custkey [c_custkey:Int64]
1556 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1557 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1558 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]
1559 Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]
1560 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1561 "
1562 )
1563 }
1564
1565 #[test]
1567 fn exists_subquery_no_projection() -> Result<()> {
1568 let sq = Arc::new(
1569 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1570 .filter(
1571 out_ref_col(DataType::Int64, "customer.c_custkey")
1572 .eq(col("orders.o_custkey")),
1573 )?
1574 .build()?,
1575 );
1576
1577 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1578 .filter(exists(sq))?
1579 .project(vec![col("customer.c_custkey")])?
1580 .build()?;
1581
1582 assert_optimized_plan_equal!(
1583 plan,
1584 @r"
1585 Projection: customer.c_custkey [c_custkey:Int64]
1586 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1587 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1588 SubqueryAlias: __correlated_sq_1 [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1589 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1590 "
1591 )
1592 }
1593
1594 #[test]
1596 fn exists_subquery_project_expr() -> Result<()> {
1597 let sq = Arc::new(
1598 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1599 .filter(
1600 out_ref_col(DataType::Int64, "customer.c_custkey")
1601 .eq(col("orders.o_custkey")),
1602 )?
1603 .project(vec![col("orders.o_custkey").add(lit(1))])?
1604 .build()?,
1605 );
1606
1607 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1608 .filter(exists(sq))?
1609 .project(vec![col("customer.c_custkey")])?
1610 .build()?;
1611
1612 assert_optimized_plan_equal!(
1613 plan,
1614 @r"
1615 Projection: customer.c_custkey [c_custkey:Int64]
1616 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1617 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1618 SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1619 Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1620 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1621 "
1622 )
1623 }
1624
1625 #[test]
1627 fn exists_subquery_should_support_additional_filters() -> Result<()> {
1628 let sq = Arc::new(
1629 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1630 .filter(
1631 out_ref_col(DataType::Int64, "customer.c_custkey")
1632 .eq(col("orders.o_custkey")),
1633 )?
1634 .project(vec![col("orders.o_custkey")])?
1635 .build()?,
1636 );
1637 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1638 .filter(exists(sq).and(col("c_custkey").eq(lit(1))))?
1639 .project(vec![col("customer.c_custkey")])?
1640 .build()?;
1641
1642 assert_optimized_plan_equal!(
1643 plan,
1644 @r"
1645 Projection: customer.c_custkey [c_custkey:Int64]
1646 Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1647 LeftSemi Join: Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1648 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1649 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1650 Projection: orders.o_custkey [o_custkey:Int64]
1651 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1652 "
1653 )
1654 }
1655
1656 #[test]
1658 fn exists_subquery_disjunction() -> Result<()> {
1659 let sq = Arc::new(
1660 LogicalPlanBuilder::from(scan_tpch_table("orders"))
1661 .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
1662 .project(vec![col("orders.o_custkey")])?
1663 .build()?,
1664 );
1665
1666 let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1667 .filter(exists(sq).or(col("customer.c_custkey").eq(lit(1))))?
1668 .project(vec![col("customer.c_custkey")])?
1669 .build()?;
1670
1671 assert_optimized_plan_equal!(
1672 plan,
1673 @r"
1674 Projection: customer.c_custkey [c_custkey:Int64]
1675 Filter: __correlated_sq_1.mark OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, mark:Boolean]
1676 LeftMark Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, mark:Boolean]
1677 TableScan: customer [c_custkey:Int64, c_name:Utf8]
1678 SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1679 Projection: orders.o_custkey [o_custkey:Int64]
1680 Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1681 TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1682 "
1683 )
1684 }
1685
1686 #[test]
1688 fn exists_subquery_correlated() -> Result<()> {
1689 let sq = Arc::new(
1690 LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1691 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1692 .project(vec![col("c")])?
1693 .build()?,
1694 );
1695
1696 let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1697 .filter(exists(sq))?
1698 .project(vec![col("test.c")])?
1699 .build()?;
1700
1701 assert_optimized_plan_equal!(
1702 plan,
1703 @r"
1704 Projection: test.c [c:UInt32]
1705 LeftSemi Join: Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1706 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1707 SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1708 Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1709 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1710 "
1711 )
1712 }
1713
1714 #[test]
1716 fn exists_subquery_simple() -> Result<()> {
1717 let table_scan = test_table_scan()?;
1718 let plan = LogicalPlanBuilder::from(table_scan)
1719 .filter(exists(test_subquery_with_name("sq")?))?
1720 .project(vec![col("test.b")])?
1721 .build()?;
1722
1723 assert_optimized_plan_equal!(
1724 plan,
1725 @r"
1726 Projection: test.b [b:UInt32]
1727 LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1728 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1729 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1730 Projection: sq.c [c:UInt32]
1731 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1732 "
1733 )
1734 }
1735
1736 #[test]
1738 fn not_exists_subquery_simple() -> Result<()> {
1739 let table_scan = test_table_scan()?;
1740 let plan = LogicalPlanBuilder::from(table_scan)
1741 .filter(not_exists(test_subquery_with_name("sq")?))?
1742 .project(vec![col("test.b")])?
1743 .build()?;
1744
1745 assert_optimized_plan_equal!(
1746 plan,
1747 @r"
1748 Projection: test.b [b:UInt32]
1749 LeftAnti Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1750 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1751 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1752 Projection: sq.c [c:UInt32]
1753 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1754 "
1755 )
1756 }
1757
1758 #[test]
1759 fn two_exists_subquery_with_outer_filter() -> Result<()> {
1760 let table_scan = test_table_scan()?;
1761 let subquery_scan1 = test_table_scan_with_name("sq1")?;
1762 let subquery_scan2 = test_table_scan_with_name("sq2")?;
1763
1764 let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1765 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq1.a")))?
1766 .project(vec![col("c")])?
1767 .build()?;
1768
1769 let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1770 .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq2.a")))?
1771 .project(vec![col("c")])?
1772 .build()?;
1773
1774 let plan = LogicalPlanBuilder::from(table_scan)
1775 .filter(
1776 exists(Arc::new(subquery1))
1777 .and(exists(Arc::new(subquery2)).and(col("test.c").gt(lit(1u32)))),
1778 )?
1779 .project(vec![col("test.b")])?
1780 .build()?;
1781
1782 assert_optimized_plan_equal!(
1783 plan,
1784 @r"
1785 Projection: test.b [b:UInt32]
1786 Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]
1787 LeftSemi Join: Filter: test.a = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
1788 LeftSemi Join: Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1789 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1790 SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1791 Projection: sq1.c, sq1.a [c:UInt32, a:UInt32]
1792 TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]
1793 SubqueryAlias: __correlated_sq_2 [c:UInt32, a:UInt32]
1794 Projection: sq2.c, sq2.a [c:UInt32, a:UInt32]
1795 TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]
1796 "
1797 )
1798 }
1799
1800 #[test]
1801 fn exists_subquery_expr_filter() -> Result<()> {
1802 let table_scan = test_table_scan()?;
1803 let subquery_scan = test_table_scan_with_name("sq")?;
1804 let subquery = LogicalPlanBuilder::from(subquery_scan)
1805 .filter(
1806 (lit(1u32) + col("sq.a"))
1807 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1808 )?
1809 .project(vec![lit(1u32)])?
1810 .build()?;
1811 let plan = LogicalPlanBuilder::from(table_scan)
1812 .filter(exists(Arc::new(subquery)))?
1813 .project(vec![col("test.b")])?
1814 .build()?;
1815
1816 assert_optimized_plan_equal!(
1817 plan,
1818 @r"
1819 Projection: test.b [b:UInt32]
1820 LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1821 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1822 SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, a:UInt32]
1823 Projection: UInt32(1), sq.a [UInt32(1):UInt32, a:UInt32]
1824 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1825 "
1826 )
1827 }
1828
1829 #[test]
1830 fn exists_subquery_with_same_table() -> Result<()> {
1831 let outer_scan = test_table_scan()?;
1832 let subquery_scan = test_table_scan()?;
1833 let subquery = LogicalPlanBuilder::from(subquery_scan)
1834 .filter(col("test.a").gt(col("test.b")))?
1835 .project(vec![col("c")])?
1836 .build()?;
1837
1838 let plan = LogicalPlanBuilder::from(outer_scan)
1839 .filter(exists(Arc::new(subquery)))?
1840 .project(vec![col("test.b")])?
1841 .build()?;
1842
1843 assert_optimized_plan_equal!(
1845 plan,
1846 @r"
1847 Projection: test.b [b:UInt32]
1848 LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1849 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1850 SubqueryAlias: __correlated_sq_1 [c:UInt32]
1851 Projection: test.c [c:UInt32]
1852 Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]
1853 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1854 "
1855 )
1856 }
1857
1858 #[test]
1859 fn exists_distinct_subquery() -> Result<()> {
1860 let table_scan = test_table_scan()?;
1861 let subquery_scan = test_table_scan_with_name("sq")?;
1862 let subquery = LogicalPlanBuilder::from(subquery_scan)
1863 .filter(
1864 (lit(1u32) + col("sq.a"))
1865 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1866 )?
1867 .project(vec![col("sq.c")])?
1868 .distinct()?
1869 .build()?;
1870 let plan = LogicalPlanBuilder::from(table_scan)
1871 .filter(exists(Arc::new(subquery)))?
1872 .project(vec![col("test.b")])?
1873 .build()?;
1874
1875 assert_optimized_plan_equal!(
1876 plan,
1877 @r"
1878 Projection: test.b [b:UInt32]
1879 LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1880 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1881 SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1882 Distinct: [c:UInt32, a:UInt32]
1883 Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1884 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1885 "
1886 )
1887 }
1888
1889 #[test]
1890 fn exists_distinct_expr_subquery() -> Result<()> {
1891 let table_scan = test_table_scan()?;
1892 let subquery_scan = test_table_scan_with_name("sq")?;
1893 let subquery = LogicalPlanBuilder::from(subquery_scan)
1894 .filter(
1895 (lit(1u32) + col("sq.a"))
1896 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1897 )?
1898 .project(vec![col("sq.b") + col("sq.c")])?
1899 .distinct()?
1900 .build()?;
1901 let plan = LogicalPlanBuilder::from(table_scan)
1902 .filter(exists(Arc::new(subquery)))?
1903 .project(vec![col("test.b")])?
1904 .build()?;
1905
1906 assert_optimized_plan_equal!(
1907 plan,
1908 @r"
1909 Projection: test.b [b:UInt32]
1910 LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1911 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1912 SubqueryAlias: __correlated_sq_1 [sq.b + sq.c:UInt32, a:UInt32]
1913 Distinct: [sq.b + sq.c:UInt32, a:UInt32]
1914 Projection: sq.b + sq.c, sq.a [sq.b + sq.c:UInt32, a:UInt32]
1915 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1916 "
1917 )
1918 }
1919
1920 #[test]
1921 fn exists_distinct_subquery_with_literal() -> Result<()> {
1922 let table_scan = test_table_scan()?;
1923 let subquery_scan = test_table_scan_with_name("sq")?;
1924 let subquery = LogicalPlanBuilder::from(subquery_scan)
1925 .filter(
1926 (lit(1u32) + col("sq.a"))
1927 .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1928 )?
1929 .project(vec![lit(1u32), col("sq.c")])?
1930 .distinct()?
1931 .build()?;
1932 let plan = LogicalPlanBuilder::from(table_scan)
1933 .filter(exists(Arc::new(subquery)))?
1934 .project(vec![col("test.b")])?
1935 .build()?;
1936
1937 assert_optimized_plan_equal!(
1938 plan,
1939 @r"
1940 Projection: test.b [b:UInt32]
1941 LeftSemi Join: Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1942 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1943 SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, c:UInt32, a:UInt32]
1944 Distinct: [UInt32(1):UInt32, c:UInt32, a:UInt32]
1945 Projection: UInt32(1), sq.c, sq.a [UInt32(1):UInt32, c:UInt32, a:UInt32]
1946 TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1947 "
1948 )
1949 }
1950
1951 #[test]
1952 fn exists_uncorrelated_unnest() -> Result<()> {
1953 let subquery_table_source = table_source(&Schema::new(vec![Field::new(
1954 "arr",
1955 DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1956 true,
1957 )]));
1958 let subquery = LogicalPlanBuilder::scan_with_filters(
1959 "sq",
1960 subquery_table_source,
1961 None,
1962 vec![],
1963 )?
1964 .unnest_column("arr")?
1965 .build()?;
1966 let table_scan = test_table_scan()?;
1967 let plan = LogicalPlanBuilder::from(table_scan)
1968 .filter(exists(Arc::new(subquery)))?
1969 .project(vec![col("test.b")])?
1970 .build()?;
1971
1972 assert_optimized_plan_equal!(
1973 plan,
1974 @r"
1975 Projection: test.b [b:UInt32]
1976 LeftSemi Join: Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1977 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1978 SubqueryAlias: __correlated_sq_1 [arr:Int32;N]
1979 Unnest: lists[sq.arr|depth=1] structs[] [arr:Int32;N]
1980 TableScan: sq [arr:List(Field { data_type: Int32, nullable: true });N]
1981 "
1982 )
1983 }
1984
1985 #[test]
1986 fn exists_correlated_unnest() -> Result<()> {
1987 let table_scan = test_table_scan()?;
1988 let subquery_table_source = table_source(&Schema::new(vec![Field::new(
1989 "a",
1990 DataType::List(Arc::new(Field::new_list_field(DataType::UInt32, true))),
1991 true,
1992 )]));
1993 let subquery = LogicalPlanBuilder::scan_with_filters(
1994 "sq",
1995 subquery_table_source,
1996 None,
1997 vec![],
1998 )?
1999 .unnest_column("a")?
2000 .filter(col("a").eq(out_ref_col(DataType::UInt32, "test.b")))?
2001 .build()?;
2002 let plan = LogicalPlanBuilder::from(table_scan)
2003 .filter(exists(Arc::new(subquery)))?
2004 .project(vec![col("test.b")])?
2005 .build()?;
2006
2007 assert_optimized_plan_equal!(
2008 plan,
2009 @r"
2010 Projection: test.b [b:UInt32]
2011 LeftSemi Join: Filter: __correlated_sq_1.a = test.b [a:UInt32, b:UInt32, c:UInt32]
2012 TableScan: test [a:UInt32, b:UInt32, c:UInt32]
2013 SubqueryAlias: __correlated_sq_1 [a:UInt32;N]
2014 Unnest: lists[sq.a|depth=1] structs[] [a:UInt32;N]
2015 TableScan: sq [a:List(Field { data_type: UInt32, nullable: true });N]
2016 "
2017 )
2018 }
2019
2020 #[test]
2021 fn upper_case_ident() -> Result<()> {
2022 let fields = vec![
2023 Field::new("A", DataType::UInt32, false),
2024 Field::new("B", DataType::UInt32, false),
2025 ];
2026
2027 let schema = Schema::new(fields);
2028 let table_scan_a = table_scan(Some("\"TEST_A\""), &schema, None)?.build()?;
2029 let table_scan_b = table_scan(Some("\"TEST_B\""), &schema, None)?.build()?;
2030
2031 let subquery = LogicalPlanBuilder::from(table_scan_b)
2032 .filter(col("\"A\"").eq(out_ref_col(DataType::UInt32, "\"TEST_A\".\"A\"")))?
2033 .project(vec![lit(1)])?
2034 .build()?;
2035
2036 let plan = LogicalPlanBuilder::from(table_scan_a)
2037 .filter(exists(Arc::new(subquery)))?
2038 .project(vec![col("\"TEST_A\".\"B\"")])?
2039 .build()?;
2040
2041 assert_optimized_plan_equal!(
2042 plan,
2043 @r"
2044 Projection: TEST_A.B [B:UInt32]
2045 LeftSemi Join: Filter: __correlated_sq_1.A = TEST_A.A [A:UInt32, B:UInt32]
2046 TableScan: TEST_A [A:UInt32, B:UInt32]
2047 SubqueryAlias: __correlated_sq_1 [Int32(1):Int32, A:UInt32]
2048 Projection: Int32(1), TEST_B.A [Int32(1):Int32, A:UInt32]
2049 TableScan: TEST_B [A:UInt32, B:UInt32]
2050 "
2051 )
2052 }
2053}