datafusion_optimizer/
decorrelate_predicate_subquery.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DecorrelatePredicateSubquery`] converts `IN`/`EXISTS` subquery predicates to `SEMI`/`ANTI` joins
19use std::collections::BTreeSet;
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::decorrelate::PullUpCorrelatedExpr;
24use crate::optimizer::ApplyOrder;
25use crate::utils::replace_qualified_name;
26use crate::{OptimizerConfig, OptimizerRule};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30use datafusion_common::{Column, Result, assert_or_internal_err, plan_err};
31use datafusion_expr::expr::{Exists, InSubquery};
32use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
33use datafusion_expr::logical_plan::{JoinType, Subquery};
34use datafusion_expr::utils::{conjunction, expr_to_columns, split_conjunction_owned};
35use datafusion_expr::{
36    BinaryExpr, Expr, Filter, LogicalPlan, LogicalPlanBuilder, Operator, exists,
37    in_subquery, lit, not, not_exists, not_in_subquery,
38};
39
40use log::debug;
41
42/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins
43#[derive(Default, Debug)]
44pub struct DecorrelatePredicateSubquery {}
45
46impl DecorrelatePredicateSubquery {
47    #[expect(missing_docs)]
48    pub fn new() -> Self {
49        Self::default()
50    }
51}
52
53impl OptimizerRule for DecorrelatePredicateSubquery {
54    fn supports_rewrite(&self) -> bool {
55        true
56    }
57
58    fn rewrite(
59        &self,
60        plan: LogicalPlan,
61        config: &dyn OptimizerConfig,
62    ) -> Result<Transformed<LogicalPlan>> {
63        let plan = plan
64            .map_subqueries(|subquery| {
65                subquery.transform_down(|p| self.rewrite(p, config))
66            })?
67            .data;
68
69        let LogicalPlan::Filter(filter) = plan else {
70            return Ok(Transformed::no(plan));
71        };
72
73        if !has_subquery(&filter.predicate) {
74            return Ok(Transformed::no(LogicalPlan::Filter(filter)));
75        }
76
77        let (with_subqueries, mut other_exprs): (Vec<_>, Vec<_>) =
78            split_conjunction_owned(filter.predicate)
79                .into_iter()
80                .partition(has_subquery);
81
82        assert_or_internal_err!(
83            !with_subqueries.is_empty(),
84            "can not find expected subqueries in DecorrelatePredicateSubquery"
85        );
86
87        // iterate through all exists clauses in predicate, turning each into a join
88        let mut cur_input = Arc::unwrap_or_clone(filter.input);
89        for subquery_expr in with_subqueries {
90            match extract_subquery_info(subquery_expr) {
91                // The subquery expression is at the top level of the filter
92                SubqueryPredicate::Top(subquery) => {
93                    match build_join_top(&subquery, &cur_input, config.alias_generator())?
94                    {
95                        Some(plan) => cur_input = plan,
96                        // If the subquery can not be converted to a Join, reconstruct the subquery expression and add it to the Filter
97                        None => other_exprs.push(subquery.expr()),
98                    }
99                }
100                // The subquery expression is embedded within another expression
101                SubqueryPredicate::Embedded(expr) => {
102                    let (plan, expr_without_subqueries) =
103                        rewrite_inner_subqueries(cur_input, expr, config)?;
104                    cur_input = plan;
105                    other_exprs.push(expr_without_subqueries);
106                }
107            }
108        }
109
110        let expr = conjunction(other_exprs);
111        if let Some(expr) = expr {
112            let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
113            cur_input = LogicalPlan::Filter(new_filter);
114        }
115        Ok(Transformed::yes(cur_input))
116    }
117
118    fn name(&self) -> &str {
119        "decorrelate_predicate_subquery"
120    }
121
122    fn apply_order(&self) -> Option<ApplyOrder> {
123        Some(ApplyOrder::TopDown)
124    }
125}
126
127fn rewrite_inner_subqueries(
128    outer: LogicalPlan,
129    expr: Expr,
130    config: &dyn OptimizerConfig,
131) -> Result<(LogicalPlan, Expr)> {
132    let mut cur_input = outer;
133    let alias = config.alias_generator();
134    let expr_without_subqueries = expr.transform(|e| match e {
135        Expr::Exists(Exists {
136            subquery: Subquery { subquery, .. },
137            negated,
138        }) => match mark_join(&cur_input, &subquery, None, negated, alias)? {
139            Some((plan, exists_expr)) => {
140                cur_input = plan;
141                Ok(Transformed::yes(exists_expr))
142            }
143            None if negated => Ok(Transformed::no(not_exists(subquery))),
144            None => Ok(Transformed::no(exists(subquery))),
145        },
146        Expr::InSubquery(InSubquery {
147            expr,
148            subquery: Subquery { subquery, .. },
149            negated,
150        }) => {
151            let in_predicate = subquery
152                .head_output_expr()?
153                .map_or(plan_err!("single expression required."), |output_expr| {
154                    Ok(Expr::eq(*expr.clone(), output_expr))
155                })?;
156            match mark_join(&cur_input, &subquery, Some(&in_predicate), negated, alias)? {
157                Some((plan, exists_expr)) => {
158                    cur_input = plan;
159                    Ok(Transformed::yes(exists_expr))
160                }
161                None if negated => Ok(Transformed::no(not_in_subquery(*expr, subquery))),
162                None => Ok(Transformed::no(in_subquery(*expr, subquery))),
163            }
164        }
165        _ => Ok(Transformed::no(e)),
166    })?;
167    Ok((cur_input, expr_without_subqueries.data))
168}
169
170enum SubqueryPredicate {
171    // The subquery expression is at the top level of the filter and can be fully replaced by a
172    // semi/anti join
173    Top(SubqueryInfo),
174    // The subquery expression is embedded within another expression and is replaced using an
175    // existence join
176    Embedded(Expr),
177}
178
179fn extract_subquery_info(expr: Expr) -> SubqueryPredicate {
180    match expr {
181        Expr::Not(not_expr) => match *not_expr {
182            Expr::InSubquery(InSubquery {
183                expr,
184                subquery,
185                negated,
186            }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
187                subquery, *expr, !negated,
188            )),
189            Expr::Exists(Exists { subquery, negated }) => {
190                SubqueryPredicate::Top(SubqueryInfo::new(subquery, !negated))
191            }
192            expr => SubqueryPredicate::Embedded(not(expr)),
193        },
194        Expr::InSubquery(InSubquery {
195            expr,
196            subquery,
197            negated,
198        }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
199            subquery, *expr, negated,
200        )),
201        Expr::Exists(Exists { subquery, negated }) => {
202            SubqueryPredicate::Top(SubqueryInfo::new(subquery, negated))
203        }
204        expr => SubqueryPredicate::Embedded(expr),
205    }
206}
207
208fn has_subquery(expr: &Expr) -> bool {
209    expr.exists(|e| match e {
210        Expr::InSubquery(_) | Expr::Exists(_) => Ok(true),
211        _ => Ok(false),
212    })
213    .unwrap()
214}
215
216/// Optimize the subquery to left-anti/left-semi join.
217/// If the subquery is a correlated subquery, we need extract the join predicate from the subquery.
218///
219/// For example, given a query like:
220/// `select t1.a, t1.b from t1 where t1 in (select t2.a from t2 where t1.b = t2.b and t1.c > t2.c)`
221///
222/// The optimized plan will be:
223///
224/// ```text
225/// Projection: t1.a, t1.b
226///   LeftSemi Join:  Filter: t1.a = __correlated_sq_1.a AND t1.b = __correlated_sq_1.b AND t1.c > __correlated_sq_1.c
227///     TableScan: t1
228///     SubqueryAlias: __correlated_sq_1
229///       Projection: t2.a, t2.b, t2.c
230///         TableScan: t2
231/// ```
232///
233/// Given another query like:
234/// `select t1.id from t1 where exists(SELECT t2.id FROM t2 WHERE t1.id = t2.id)`
235///
236/// The optimized plan will be:
237///
238/// ```text
239/// Projection: t1.id
240///   LeftSemi Join:  Filter: t1.id = __correlated_sq_1.id
241///     TableScan: t1
242///     SubqueryAlias: __correlated_sq_1
243///       Projection: t2.id
244///         TableScan: t2
245/// ```
246fn build_join_top(
247    query_info: &SubqueryInfo,
248    left: &LogicalPlan,
249    alias: &Arc<AliasGenerator>,
250) -> Result<Option<LogicalPlan>> {
251    let where_in_expr_opt = &query_info.where_in_expr;
252    let in_predicate_opt = where_in_expr_opt
253        .clone()
254        .map(|where_in_expr| {
255            query_info
256                .query
257                .subquery
258                .head_output_expr()?
259                .map_or(plan_err!("single expression required."), |expr| {
260                    Ok(Expr::eq(where_in_expr, expr))
261                })
262        })
263        .map_or(Ok(None), |v| v.map(Some))?;
264
265    let join_type = match query_info.negated {
266        true => JoinType::LeftAnti,
267        false => JoinType::LeftSemi,
268    };
269    let subquery = query_info.query.subquery.as_ref();
270    let subquery_alias = alias.next("__correlated_sq");
271    build_join(
272        left,
273        subquery,
274        in_predicate_opt.as_ref(),
275        join_type,
276        subquery_alias,
277    )
278}
279
280/// This is used to handle the case when the subquery is embedded in a more complex boolean
281/// expression like and OR. For example
282///
283/// `select t1.id from t1 where t1.id < 0 OR exists(SELECT t2.id FROM t2 WHERE t1.id = t2.id)`
284///
285/// The optimized plan will be:
286///
287/// ```text
288/// Projection: t1.id
289///   Filter: t1.id < 0 OR __correlated_sq_1.mark
290///     LeftMark Join:  Filter: t1.id = __correlated_sq_1.id
291///       TableScan: t1
292///       SubqueryAlias: __correlated_sq_1
293///         Projection: t2.id
294///           TableScan: t2
295fn mark_join(
296    left: &LogicalPlan,
297    subquery: &LogicalPlan,
298    in_predicate_opt: Option<&Expr>,
299    negated: bool,
300    alias_generator: &Arc<AliasGenerator>,
301) -> Result<Option<(LogicalPlan, Expr)>> {
302    let alias = alias_generator.next("__correlated_sq");
303
304    let exists_col = Expr::Column(Column::new(Some(alias.clone()), "mark"));
305    let exists_expr = if negated { !exists_col } else { exists_col };
306
307    Ok(
308        build_join(left, subquery, in_predicate_opt, JoinType::LeftMark, alias)?
309            .map(|plan| (plan, exists_expr)),
310    )
311}
312
313fn build_join(
314    left: &LogicalPlan,
315    subquery: &LogicalPlan,
316    in_predicate_opt: Option<&Expr>,
317    join_type: JoinType,
318    alias: String,
319) -> Result<Option<LogicalPlan>> {
320    let mut pull_up = PullUpCorrelatedExpr::new()
321        .with_in_predicate_opt(in_predicate_opt.cloned())
322        .with_exists_sub_query(in_predicate_opt.is_none());
323
324    let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
325    if !pull_up.can_pull_up {
326        return Ok(None);
327    }
328
329    let sub_query_alias = LogicalPlanBuilder::from(new_plan)
330        .alias(alias.to_string())?
331        .build()?;
332    let mut all_correlated_cols = BTreeSet::new();
333    pull_up
334        .correlated_subquery_cols_map
335        .values()
336        .for_each(|cols| all_correlated_cols.extend(cols.clone()));
337
338    // alias the join filter
339    let join_filter_opt = conjunction(pull_up.join_filters)
340        .map_or(Ok(None), |filter| {
341            replace_qualified_name(filter, &all_correlated_cols, &alias).map(Some)
342        })?;
343
344    let join_filter = match (join_filter_opt, in_predicate_opt.cloned()) {
345        (
346            Some(join_filter),
347            Some(Expr::BinaryExpr(BinaryExpr {
348                left,
349                op: Operator::Eq,
350                right,
351            })),
352        ) => {
353            let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
354            let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
355            in_predicate.and(join_filter)
356        }
357        (Some(join_filter), _) => join_filter,
358        (
359            _,
360            Some(Expr::BinaryExpr(BinaryExpr {
361                left,
362                op: Operator::Eq,
363                right,
364            })),
365        ) => {
366            let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
367
368            Expr::eq(left.deref().clone(), Expr::Column(right_col))
369        }
370        (None, None) => lit(true),
371        _ => return Ok(None),
372    };
373
374    if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) {
375        let right_schema = sub_query_alias.schema();
376
377        // Gather all columns needed for the join filter + predicates
378        let mut needed = std::collections::HashSet::new();
379        expr_to_columns(&join_filter, &mut needed)?;
380        if let Some(in_pred) = in_predicate_opt {
381            expr_to_columns(in_pred, &mut needed)?;
382        }
383
384        // Keep only columns that actually belong to the RIGHT child, and sort by their
385        // position in the right schema for deterministic order.
386        let mut right_cols_idx_and_col: Vec<(usize, Column)> = needed
387            .into_iter()
388            .filter_map(|c| right_schema.index_of_column(&c).ok().map(|idx| (idx, c)))
389            .collect();
390
391        right_cols_idx_and_col.sort_by_key(|(idx, _)| *idx);
392
393        let right_proj_exprs: Vec<Expr> = right_cols_idx_and_col
394            .into_iter()
395            .map(|(_, c)| Expr::Column(c))
396            .collect();
397
398        let right_projected = if !right_proj_exprs.is_empty() {
399            LogicalPlanBuilder::from(sub_query_alias.clone())
400                .project(right_proj_exprs)?
401                .build()?
402        } else {
403            // Degenerate case: no right columns referenced by the predicate(s)
404            sub_query_alias.clone()
405        };
406        let new_plan = LogicalPlanBuilder::from(left.clone())
407            .join_on(right_projected, join_type, Some(join_filter))?
408            .build()?;
409
410        debug!(
411            "predicate subquery optimized:\n{}",
412            new_plan.display_indent()
413        );
414
415        return Ok(Some(new_plan));
416    }
417
418    // join our sub query into the main plan
419    let new_plan = LogicalPlanBuilder::from(left.clone())
420        .join_on(sub_query_alias, join_type, Some(join_filter))?
421        .build()?;
422    debug!(
423        "predicate subquery optimized:\n{}",
424        new_plan.display_indent()
425    );
426    Ok(Some(new_plan))
427}
428
429#[derive(Debug)]
430struct SubqueryInfo {
431    query: Subquery,
432    where_in_expr: Option<Expr>,
433    negated: bool,
434}
435
436impl SubqueryInfo {
437    pub fn new(query: Subquery, negated: bool) -> Self {
438        Self {
439            query,
440            where_in_expr: None,
441            negated,
442        }
443    }
444
445    pub fn new_with_in_expr(query: Subquery, expr: Expr, negated: bool) -> Self {
446        Self {
447            query,
448            where_in_expr: Some(expr),
449            negated,
450        }
451    }
452
453    pub fn expr(self) -> Expr {
454        match self.where_in_expr {
455            Some(expr) => match self.negated {
456                true => not_in_subquery(expr, self.query.subquery),
457                false => in_subquery(expr, self.query.subquery),
458            },
459            None => match self.negated {
460                true => not_exists(self.query.subquery),
461                false => exists(self.query.subquery),
462            },
463        }
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use std::ops::Add;
470
471    use super::*;
472    use crate::test::*;
473
474    use crate::assert_optimized_plan_eq_display_indent_snapshot;
475    use arrow::datatypes::{DataType, Field, Schema};
476    use datafusion_expr::builder::table_source;
477    use datafusion_expr::{and, binary_expr, col, lit, not, out_ref_col, table_scan};
478
479    macro_rules! assert_optimized_plan_equal {
480        (
481            $plan:expr,
482            @ $expected:literal $(,)?
483        ) => {{
484            let rule: Arc<dyn crate::OptimizerRule + Send + Sync> = Arc::new(DecorrelatePredicateSubquery::new());
485            assert_optimized_plan_eq_display_indent_snapshot!(
486                rule,
487                $plan,
488                @ $expected,
489            )
490        }};
491    }
492
493    fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
494        let table_scan = test_table_scan_with_name(name)?;
495        Ok(Arc::new(
496            LogicalPlanBuilder::from(table_scan)
497                .project(vec![col("c")])?
498                .build()?,
499        ))
500    }
501
502    /// Test for several IN subquery expressions
503    #[test]
504    fn in_subquery_multiple() -> Result<()> {
505        let table_scan = test_table_scan()?;
506        let plan = LogicalPlanBuilder::from(table_scan)
507            .filter(and(
508                in_subquery(col("c"), test_subquery_with_name("sq_1")?),
509                in_subquery(col("b"), test_subquery_with_name("sq_2")?),
510            ))?
511            .project(vec![col("test.b")])?
512            .build()?;
513
514        assert_optimized_plan_equal!(
515            plan,
516            @r"
517        Projection: test.b [b:UInt32]
518          LeftSemi Join:  Filter: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]
519            LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
520              TableScan: test [a:UInt32, b:UInt32, c:UInt32]
521              SubqueryAlias: __correlated_sq_1 [c:UInt32]
522                Projection: sq_1.c [c:UInt32]
523                  TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]
524            SubqueryAlias: __correlated_sq_2 [c:UInt32]
525              Projection: sq_2.c [c:UInt32]
526                TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]
527        "
528        )
529    }
530
531    /// Test for IN subquery with additional AND filter
532    #[test]
533    fn in_subquery_with_and_filters() -> Result<()> {
534        let table_scan = test_table_scan()?;
535        let plan = LogicalPlanBuilder::from(table_scan)
536            .filter(and(
537                in_subquery(col("c"), test_subquery_with_name("sq")?),
538                and(
539                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
540                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
541                ),
542            ))?
543            .project(vec![col("test.b")])?
544            .build()?;
545
546        assert_optimized_plan_equal!(
547            plan,
548            @r"
549        Projection: test.b [b:UInt32]
550          Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]
551            LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
552              TableScan: test [a:UInt32, b:UInt32, c:UInt32]
553              SubqueryAlias: __correlated_sq_1 [c:UInt32]
554                Projection: sq.c [c:UInt32]
555                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
556        "
557        )
558    }
559
560    /// Test for nested IN subqueries
561    #[test]
562    fn in_subquery_nested() -> Result<()> {
563        let table_scan = test_table_scan()?;
564
565        let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
566            .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
567            .project(vec![col("a")])?
568            .build()?;
569
570        let plan = LogicalPlanBuilder::from(table_scan)
571            .filter(in_subquery(col("b"), Arc::new(subquery)))?
572            .project(vec![col("test.b")])?
573            .build()?;
574
575        assert_optimized_plan_equal!(
576            plan,
577            @r"
578        Projection: test.b [b:UInt32]
579          LeftSemi Join:  Filter: test.b = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
580            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
581            SubqueryAlias: __correlated_sq_2 [a:UInt32]
582              Projection: sq.a [a:UInt32]
583                LeftSemi Join:  Filter: sq.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
584                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
585                  SubqueryAlias: __correlated_sq_1 [c:UInt32]
586                    Projection: sq_nested.c [c:UInt32]
587                      TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]
588        "
589        )
590    }
591
592    /// Test multiple correlated subqueries
593    /// See subqueries.rs where_in_multiple()
594    #[test]
595    fn multiple_subqueries() -> Result<()> {
596        let orders = Arc::new(
597            LogicalPlanBuilder::from(scan_tpch_table("orders"))
598                .filter(
599                    col("orders.o_custkey")
600                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
601                )?
602                .project(vec![col("orders.o_custkey")])?
603                .build()?,
604        );
605        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
606            .filter(
607                in_subquery(col("customer.c_custkey"), Arc::clone(&orders))
608                    .and(in_subquery(col("customer.c_custkey"), orders)),
609            )?
610            .project(vec![col("customer.c_custkey")])?
611            .build()?;
612        debug!("plan to optimize:\n{}", plan.display_indent());
613
614        assert_optimized_plan_equal!(
615                plan,
616                @r"
617        Projection: customer.c_custkey [c_custkey:Int64]
618          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
619            LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
620              TableScan: customer [c_custkey:Int64, c_name:Utf8]
621              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
622                Projection: orders.o_custkey [o_custkey:Int64]
623                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
624            SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
625              Projection: orders.o_custkey [o_custkey:Int64]
626                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
627        "    
628        )
629    }
630
631    /// Test recursive correlated subqueries
632    /// See subqueries.rs where_in_recursive()
633    #[test]
634    fn recursive_subqueries() -> Result<()> {
635        let lineitem = Arc::new(
636            LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
637                .filter(
638                    col("lineitem.l_orderkey")
639                        .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
640                )?
641                .project(vec![col("lineitem.l_orderkey")])?
642                .build()?,
643        );
644
645        let orders = Arc::new(
646            LogicalPlanBuilder::from(scan_tpch_table("orders"))
647                .filter(
648                    in_subquery(col("orders.o_orderkey"), lineitem).and(
649                        col("orders.o_custkey")
650                            .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
651                    ),
652                )?
653                .project(vec![col("orders.o_custkey")])?
654                .build()?,
655        );
656
657        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
658            .filter(in_subquery(col("customer.c_custkey"), orders))?
659            .project(vec![col("customer.c_custkey")])?
660            .build()?;
661
662        assert_optimized_plan_equal!(
663            plan,
664            @r"
665        Projection: customer.c_custkey [c_custkey:Int64]
666          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
667            TableScan: customer [c_custkey:Int64, c_name:Utf8]
668            SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
669              Projection: orders.o_custkey [o_custkey:Int64]
670                LeftSemi Join:  Filter: orders.o_orderkey = __correlated_sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
671                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
672                  SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]
673                    Projection: lineitem.l_orderkey [l_orderkey:Int64]
674                      TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]
675        "
676        )
677    }
678
679    /// Test for correlated IN subquery filter with additional subquery filters
680    #[test]
681    fn in_subquery_with_subquery_filters() -> Result<()> {
682        let sq = Arc::new(
683            LogicalPlanBuilder::from(scan_tpch_table("orders"))
684                .filter(
685                    out_ref_col(DataType::Int64, "customer.c_custkey")
686                        .eq(col("orders.o_custkey"))
687                        .and(col("o_orderkey").eq(lit(1))),
688                )?
689                .project(vec![col("orders.o_custkey")])?
690                .build()?,
691        );
692
693        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
694            .filter(in_subquery(col("customer.c_custkey"), sq))?
695            .project(vec![col("customer.c_custkey")])?
696            .build()?;
697
698        assert_optimized_plan_equal!(
699            plan,
700            @r"
701        Projection: customer.c_custkey [c_custkey:Int64]
702          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
703            TableScan: customer [c_custkey:Int64, c_name:Utf8]
704            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
705              Projection: orders.o_custkey [o_custkey:Int64]
706                Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
707                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
708        "
709        )
710    }
711
712    /// Test for correlated IN subquery with no columns in schema
713    #[test]
714    fn in_subquery_no_cols() -> Result<()> {
715        let sq = Arc::new(
716            LogicalPlanBuilder::from(scan_tpch_table("orders"))
717                .filter(
718                    out_ref_col(DataType::Int64, "customer.c_custkey")
719                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
720                )?
721                .project(vec![col("orders.o_custkey")])?
722                .build()?,
723        );
724
725        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
726            .filter(in_subquery(col("customer.c_custkey"), sq))?
727            .project(vec![col("customer.c_custkey")])?
728            .build()?;
729
730        assert_optimized_plan_equal!(
731            plan,
732            @r"
733        Projection: customer.c_custkey [c_custkey:Int64]
734          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
735            TableScan: customer [c_custkey:Int64, c_name:Utf8]
736            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
737              Projection: orders.o_custkey [o_custkey:Int64]
738                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
739        "
740        )
741    }
742
743    /// Test for IN subquery with both columns in schema
744    #[test]
745    fn in_subquery_with_no_correlated_cols() -> Result<()> {
746        let sq = Arc::new(
747            LogicalPlanBuilder::from(scan_tpch_table("orders"))
748                .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
749                .project(vec![col("orders.o_custkey")])?
750                .build()?,
751        );
752
753        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
754            .filter(in_subquery(col("customer.c_custkey"), sq))?
755            .project(vec![col("customer.c_custkey")])?
756            .build()?;
757
758        assert_optimized_plan_equal!(
759            plan,
760            @r"
761        Projection: customer.c_custkey [c_custkey:Int64]
762          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
763            TableScan: customer [c_custkey:Int64, c_name:Utf8]
764            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
765              Projection: orders.o_custkey [o_custkey:Int64]
766                Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
767                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
768        "
769        )
770    }
771
772    /// Test for correlated IN subquery not equal
773    #[test]
774    fn in_subquery_where_not_eq() -> Result<()> {
775        let sq = Arc::new(
776            LogicalPlanBuilder::from(scan_tpch_table("orders"))
777                .filter(
778                    out_ref_col(DataType::Int64, "customer.c_custkey")
779                        .not_eq(col("orders.o_custkey")),
780                )?
781                .project(vec![col("orders.o_custkey")])?
782                .build()?,
783        );
784
785        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
786            .filter(in_subquery(col("customer.c_custkey"), sq))?
787            .project(vec![col("customer.c_custkey")])?
788            .build()?;
789
790        assert_optimized_plan_equal!(
791            plan,
792            @r"
793        Projection: customer.c_custkey [c_custkey:Int64]
794          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
795            TableScan: customer [c_custkey:Int64, c_name:Utf8]
796            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
797              Projection: orders.o_custkey [o_custkey:Int64]
798                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
799        "
800        )
801    }
802
803    /// Test for correlated IN subquery less than
804    #[test]
805    fn in_subquery_where_less_than() -> Result<()> {
806        let sq = Arc::new(
807            LogicalPlanBuilder::from(scan_tpch_table("orders"))
808                .filter(
809                    out_ref_col(DataType::Int64, "customer.c_custkey")
810                        .lt(col("orders.o_custkey")),
811                )?
812                .project(vec![col("orders.o_custkey")])?
813                .build()?,
814        );
815
816        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
817            .filter(in_subquery(col("customer.c_custkey"), sq))?
818            .project(vec![col("customer.c_custkey")])?
819            .build()?;
820
821        assert_optimized_plan_equal!(
822            plan,
823            @r"
824        Projection: customer.c_custkey [c_custkey:Int64]
825          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
826            TableScan: customer [c_custkey:Int64, c_name:Utf8]
827            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
828              Projection: orders.o_custkey [o_custkey:Int64]
829                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
830        "
831        )
832    }
833
834    /// Test for correlated IN subquery filter with subquery disjunction
835    #[test]
836    fn in_subquery_with_subquery_disjunction() -> Result<()> {
837        let sq = Arc::new(
838            LogicalPlanBuilder::from(scan_tpch_table("orders"))
839                .filter(
840                    out_ref_col(DataType::Int64, "customer.c_custkey")
841                        .eq(col("orders.o_custkey"))
842                        .or(col("o_orderkey").eq(lit(1))),
843                )?
844                .project(vec![col("orders.o_custkey")])?
845                .build()?,
846        );
847
848        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
849            .filter(in_subquery(col("customer.c_custkey"), sq))?
850            .project(vec![col("customer.c_custkey")])?
851            .build()?;
852
853        assert_optimized_plan_equal!(
854            plan,
855            @r"
856        Projection: customer.c_custkey [c_custkey:Int64]
857          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND (customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1)) [c_custkey:Int64, c_name:Utf8]
858            TableScan: customer [c_custkey:Int64, c_name:Utf8]
859            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]
860              Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]
861                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
862        "
863        )
864    }
865
866    /// Test for correlated IN without projection
867    #[test]
868    fn in_subquery_no_projection() -> Result<()> {
869        let sq = Arc::new(
870            LogicalPlanBuilder::from(scan_tpch_table("orders"))
871                .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
872                .build()?,
873        );
874
875        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
876            .filter(in_subquery(col("customer.c_custkey"), sq))?
877            .project(vec![col("customer.c_custkey")])?
878            .build()?;
879
880        // Maybe okay if the table only has a single column?
881        let expected = "Invalid (non-executable) plan after Analyzer\
882        \ncaused by\
883        \nError during planning: InSubquery should only return one column, but found 4";
884        assert_analyzer_check_err(vec![], plan, expected);
885
886        Ok(())
887    }
888
889    /// Test for correlated IN subquery join on expression
890    #[test]
891    fn in_subquery_join_expr() -> Result<()> {
892        let sq = Arc::new(
893            LogicalPlanBuilder::from(scan_tpch_table("orders"))
894                .filter(
895                    out_ref_col(DataType::Int64, "customer.c_custkey")
896                        .eq(col("orders.o_custkey")),
897                )?
898                .project(vec![col("orders.o_custkey")])?
899                .build()?,
900        );
901
902        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
903            .filter(in_subquery(col("customer.c_custkey").add(lit(1)), sq))?
904            .project(vec![col("customer.c_custkey")])?
905            .build()?;
906
907        assert_optimized_plan_equal!(
908            plan,
909            @r"
910        Projection: customer.c_custkey [c_custkey:Int64]
911          LeftSemi Join:  Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
912            TableScan: customer [c_custkey:Int64, c_name:Utf8]
913            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
914              Projection: orders.o_custkey [o_custkey:Int64]
915                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
916        "
917        )
918    }
919
920    /// Test for correlated IN expressions
921    #[test]
922    fn in_subquery_project_expr() -> Result<()> {
923        let sq = Arc::new(
924            LogicalPlanBuilder::from(scan_tpch_table("orders"))
925                .filter(
926                    out_ref_col(DataType::Int64, "customer.c_custkey")
927                        .eq(col("orders.o_custkey")),
928                )?
929                .project(vec![col("orders.o_custkey").add(lit(1))])?
930                .build()?,
931        );
932
933        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
934            .filter(in_subquery(col("customer.c_custkey"), sq))?
935            .project(vec![col("customer.c_custkey")])?
936            .build()?;
937
938        assert_optimized_plan_equal!(
939            plan,
940            @r"
941        Projection: customer.c_custkey [c_custkey:Int64]
942          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.orders.o_custkey + Int32(1) AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
943            TableScan: customer [c_custkey:Int64, c_name:Utf8]
944            SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
945              Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
946                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
947        "
948        )
949    }
950
951    /// Test for correlated IN subquery multiple projected columns
952    #[test]
953    fn in_subquery_multi_col() -> Result<()> {
954        let sq = Arc::new(
955            LogicalPlanBuilder::from(scan_tpch_table("orders"))
956                .filter(
957                    out_ref_col(DataType::Int64, "customer.c_custkey")
958                        .eq(col("orders.o_custkey")),
959                )?
960                .project(vec![col("orders.o_custkey"), col("orders.o_orderkey")])?
961                .build()?,
962        );
963
964        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
965            .filter(
966                in_subquery(col("customer.c_custkey"), sq)
967                    .and(col("c_custkey").eq(lit(1))),
968            )?
969            .project(vec![col("customer.c_custkey")])?
970            .build()?;
971
972        let expected = "Invalid (non-executable) plan after Analyzer\
973        \ncaused by\
974        \nError during planning: InSubquery should only return one column";
975        assert_analyzer_check_err(vec![], plan, expected);
976
977        Ok(())
978    }
979
980    /// Test for correlated IN subquery filter with additional filters
981    #[test]
982    fn should_support_additional_filters() -> Result<()> {
983        let sq = Arc::new(
984            LogicalPlanBuilder::from(scan_tpch_table("orders"))
985                .filter(
986                    out_ref_col(DataType::Int64, "customer.c_custkey")
987                        .eq(col("orders.o_custkey")),
988                )?
989                .project(vec![col("orders.o_custkey")])?
990                .build()?,
991        );
992
993        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
994            .filter(
995                in_subquery(col("customer.c_custkey"), sq)
996                    .and(col("c_custkey").eq(lit(1))),
997            )?
998            .project(vec![col("customer.c_custkey")])?
999            .build()?;
1000
1001        assert_optimized_plan_equal!(
1002            plan,
1003            @r"
1004        Projection: customer.c_custkey [c_custkey:Int64]
1005          Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1006            LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1007              TableScan: customer [c_custkey:Int64, c_name:Utf8]
1008              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1009                Projection: orders.o_custkey [o_custkey:Int64]
1010                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1011        "
1012        )
1013    }
1014
1015    /// Test for correlated IN subquery filter
1016    #[test]
1017    fn in_subquery_correlated() -> Result<()> {
1018        let sq = Arc::new(
1019            LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1020                .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1021                .project(vec![col("c")])?
1022                .build()?,
1023        );
1024
1025        let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1026            .filter(in_subquery(col("c"), sq))?
1027            .project(vec![col("test.b")])?
1028            .build()?;
1029
1030        assert_optimized_plan_equal!(
1031            plan,
1032            @r"
1033        Projection: test.b [b:UInt32]
1034          LeftSemi Join:  Filter: test.c = __correlated_sq_1.c AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1035            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1036            SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1037              Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1038                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1039        "
1040        )
1041    }
1042
1043    /// Test for single IN subquery filter
1044    #[test]
1045    fn in_subquery_simple() -> Result<()> {
1046        let table_scan = test_table_scan()?;
1047        let plan = LogicalPlanBuilder::from(table_scan)
1048            .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
1049            .project(vec![col("test.b")])?
1050            .build()?;
1051
1052        assert_optimized_plan_equal!(
1053            plan,
1054            @r"
1055        Projection: test.b [b:UInt32]
1056          LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1057            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1058            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1059              Projection: sq.c [c:UInt32]
1060                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1061        "
1062        )
1063    }
1064
1065    /// Test for single NOT IN subquery filter
1066    #[test]
1067    fn not_in_subquery_simple() -> Result<()> {
1068        let table_scan = test_table_scan()?;
1069        let plan = LogicalPlanBuilder::from(table_scan)
1070            .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
1071            .project(vec![col("test.b")])?
1072            .build()?;
1073
1074        assert_optimized_plan_equal!(
1075            plan,
1076            @r"
1077        Projection: test.b [b:UInt32]
1078          LeftAnti Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1079            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1080            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1081              Projection: sq.c [c:UInt32]
1082                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1083        "
1084        )
1085    }
1086
1087    #[test]
1088    fn wrapped_not_in_subquery() -> Result<()> {
1089        let table_scan = test_table_scan()?;
1090        let plan = LogicalPlanBuilder::from(table_scan)
1091            .filter(not(in_subquery(col("c"), test_subquery_with_name("sq")?)))?
1092            .project(vec![col("test.b")])?
1093            .build()?;
1094
1095        assert_optimized_plan_equal!(
1096            plan,
1097            @r"
1098        Projection: test.b [b:UInt32]
1099          LeftAnti Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1100            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1101            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1102              Projection: sq.c [c:UInt32]
1103                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1104        "
1105        )
1106    }
1107
1108    #[test]
1109    fn wrapped_not_not_in_subquery() -> Result<()> {
1110        let table_scan = test_table_scan()?;
1111        let plan = LogicalPlanBuilder::from(table_scan)
1112            .filter(not(not_in_subquery(
1113                col("c"),
1114                test_subquery_with_name("sq")?,
1115            )))?
1116            .project(vec![col("test.b")])?
1117            .build()?;
1118
1119        assert_optimized_plan_equal!(
1120            plan,
1121            @r"
1122        Projection: test.b [b:UInt32]
1123          LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1124            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1125            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1126              Projection: sq.c [c:UInt32]
1127                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1128        "
1129        )
1130    }
1131
1132    #[test]
1133    fn in_subquery_both_side_expr() -> Result<()> {
1134        let table_scan = test_table_scan()?;
1135        let subquery_scan = test_table_scan_with_name("sq")?;
1136
1137        let subquery = LogicalPlanBuilder::from(subquery_scan)
1138            .project(vec![col("c") * lit(2u32)])?
1139            .build()?;
1140
1141        let plan = LogicalPlanBuilder::from(table_scan)
1142            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1143            .project(vec![col("test.b")])?
1144            .build()?;
1145
1146        assert_optimized_plan_equal!(
1147            plan,
1148            @r"
1149        Projection: test.b [b:UInt32]
1150          LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1151            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1152            SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32]
1153              Projection: sq.c * UInt32(2) [sq.c * UInt32(2):UInt32]
1154                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1155        "
1156        )
1157    }
1158
1159    #[test]
1160    fn in_subquery_join_filter_and_inner_filter() -> Result<()> {
1161        let table_scan = test_table_scan()?;
1162        let subquery_scan = test_table_scan_with_name("sq")?;
1163
1164        let subquery = LogicalPlanBuilder::from(subquery_scan)
1165            .filter(
1166                out_ref_col(DataType::UInt32, "test.a")
1167                    .eq(col("sq.a"))
1168                    .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1169            )?
1170            .project(vec![col("c") * lit(2u32)])?
1171            .build()?;
1172
1173        let plan = LogicalPlanBuilder::from(table_scan)
1174            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1175            .project(vec![col("test.b")])?
1176            .build()?;
1177
1178        assert_optimized_plan_equal!(
1179            plan,
1180            @r"
1181        Projection: test.b [b:UInt32]
1182          LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1183            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1184            SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32]
1185              Projection: sq.c * UInt32(2), sq.a [sq.c * UInt32(2):UInt32, a:UInt32]
1186                Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]
1187                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1188        "
1189        )
1190    }
1191
1192    #[test]
1193    fn in_subquery_multi_project_subquery_cols() -> Result<()> {
1194        let table_scan = test_table_scan()?;
1195        let subquery_scan = test_table_scan_with_name("sq")?;
1196
1197        let subquery = LogicalPlanBuilder::from(subquery_scan)
1198            .filter(
1199                out_ref_col(DataType::UInt32, "test.a")
1200                    .add(out_ref_col(DataType::UInt32, "test.b"))
1201                    .eq(col("sq.a").add(col("sq.b")))
1202                    .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1203            )?
1204            .project(vec![col("c") * lit(2u32)])?
1205            .build()?;
1206
1207        let plan = LogicalPlanBuilder::from(table_scan)
1208            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1209            .project(vec![col("test.b")])?
1210            .build()?;
1211
1212        assert_optimized_plan_equal!(
1213            plan,
1214            @r"
1215        Projection: test.b [b:UInt32]
1216          LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a + test.b = __correlated_sq_1.a + __correlated_sq_1.b [a:UInt32, b:UInt32, c:UInt32]
1217            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1218            SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]
1219              Projection: sq.c * UInt32(2), sq.a, sq.b [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]
1220                Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]
1221                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1222        "
1223        )
1224    }
1225
1226    #[test]
1227    fn two_in_subquery_with_outer_filter() -> Result<()> {
1228        let table_scan = test_table_scan()?;
1229        let subquery_scan1 = test_table_scan_with_name("sq1")?;
1230        let subquery_scan2 = test_table_scan_with_name("sq2")?;
1231
1232        let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1233            .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq1.a")))?
1234            .project(vec![col("c") * lit(2u32)])?
1235            .build()?;
1236
1237        let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1238            .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq2.a")))?
1239            .project(vec![col("c") * lit(2u32)])?
1240            .build()?;
1241
1242        let plan = LogicalPlanBuilder::from(table_scan)
1243            .filter(
1244                in_subquery(col("c") + lit(1u32), Arc::new(subquery1)).and(
1245                    in_subquery(col("c") * lit(2u32), Arc::new(subquery2))
1246                        .and(col("test.c").gt(lit(1u32))),
1247                ),
1248            )?
1249            .project(vec![col("test.b")])?
1250            .build()?;
1251
1252        assert_optimized_plan_equal!(
1253            plan,
1254            @r"
1255        Projection: test.b [b:UInt32]
1256          Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]
1257            LeftSemi Join:  Filter: test.c * UInt32(2) = __correlated_sq_2.sq2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
1258              LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq1.c * UInt32(2) AND test.a > __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1259                TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1260                SubqueryAlias: __correlated_sq_1 [sq1.c * UInt32(2):UInt32, a:UInt32]
1261                  Projection: sq1.c * UInt32(2), sq1.a [sq1.c * UInt32(2):UInt32, a:UInt32]
1262                    TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]
1263              SubqueryAlias: __correlated_sq_2 [sq2.c * UInt32(2):UInt32, a:UInt32]
1264                Projection: sq2.c * UInt32(2), sq2.a [sq2.c * UInt32(2):UInt32, a:UInt32]
1265                  TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]
1266        "
1267        )
1268    }
1269
1270    #[test]
1271    fn in_subquery_with_same_table() -> Result<()> {
1272        let outer_scan = test_table_scan()?;
1273        let subquery_scan = test_table_scan()?;
1274        let subquery = LogicalPlanBuilder::from(subquery_scan)
1275            .filter(col("test.a").gt(col("test.b")))?
1276            .project(vec![col("c")])?
1277            .build()?;
1278
1279        let plan = LogicalPlanBuilder::from(outer_scan)
1280            .filter(in_subquery(col("test.a"), Arc::new(subquery)))?
1281            .project(vec![col("test.b")])?
1282            .build()?;
1283
1284        // Subquery and outer query refer to the same table.
1285        assert_optimized_plan_equal!(
1286            plan,
1287            @r"
1288        Projection: test.b [b:UInt32]
1289          LeftSemi Join:  Filter: test.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1290            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1291            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1292              Projection: test.c [c:UInt32]
1293                Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]
1294                  TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1295        "
1296        )
1297    }
1298
1299    /// Test for multiple exists subqueries in the same filter expression
1300    #[test]
1301    fn multiple_exists_subqueries() -> Result<()> {
1302        let orders = Arc::new(
1303            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1304                .filter(
1305                    col("orders.o_custkey")
1306                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1307                )?
1308                .project(vec![col("orders.o_custkey")])?
1309                .build()?,
1310        );
1311
1312        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1313            .filter(exists(Arc::clone(&orders)).and(exists(orders)))?
1314            .project(vec![col("customer.c_custkey")])?
1315            .build()?;
1316
1317        assert_optimized_plan_equal!(
1318            plan,
1319            @r"
1320        Projection: customer.c_custkey [c_custkey:Int64]
1321          LeftSemi Join:  Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1322            LeftSemi Join:  Filter: __correlated_sq_1.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1323              TableScan: customer [c_custkey:Int64, c_name:Utf8]
1324              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1325                Projection: orders.o_custkey [o_custkey:Int64]
1326                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1327            SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
1328              Projection: orders.o_custkey [o_custkey:Int64]
1329                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1330        "
1331        )
1332    }
1333
1334    /// Test recursive correlated subqueries
1335    #[test]
1336    fn recursive_exists_subqueries() -> Result<()> {
1337        let lineitem = Arc::new(
1338            LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
1339                .filter(
1340                    col("lineitem.l_orderkey")
1341                        .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
1342                )?
1343                .project(vec![col("lineitem.l_orderkey")])?
1344                .build()?,
1345        );
1346
1347        let orders = Arc::new(
1348            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1349                .filter(
1350                    exists(lineitem).and(
1351                        col("orders.o_custkey")
1352                            .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1353                    ),
1354                )?
1355                .project(vec![col("orders.o_custkey")])?
1356                .build()?,
1357        );
1358
1359        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1360            .filter(exists(orders))?
1361            .project(vec![col("customer.c_custkey")])?
1362            .build()?;
1363
1364        assert_optimized_plan_equal!(
1365            plan,
1366            @r"
1367        Projection: customer.c_custkey [c_custkey:Int64]
1368          LeftSemi Join:  Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1369            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1370            SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
1371              Projection: orders.o_custkey [o_custkey:Int64]
1372                LeftSemi Join:  Filter: __correlated_sq_1.l_orderkey = orders.o_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1373                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1374                  SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]
1375                    Projection: lineitem.l_orderkey [l_orderkey:Int64]
1376                      TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]
1377        "
1378        )
1379    }
1380
1381    /// Test for correlated exists subquery filter with additional subquery filters
1382    #[test]
1383    fn exists_subquery_with_subquery_filters() -> Result<()> {
1384        let sq = Arc::new(
1385            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1386                .filter(
1387                    out_ref_col(DataType::Int64, "customer.c_custkey")
1388                        .eq(col("orders.o_custkey"))
1389                        .and(col("o_orderkey").eq(lit(1))),
1390                )?
1391                .project(vec![col("orders.o_custkey")])?
1392                .build()?,
1393        );
1394
1395        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1396            .filter(exists(sq))?
1397            .project(vec![col("customer.c_custkey")])?
1398            .build()?;
1399
1400        assert_optimized_plan_equal!(
1401            plan,
1402            @r"
1403        Projection: customer.c_custkey [c_custkey:Int64]
1404          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1405            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1406            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1407              Projection: orders.o_custkey [o_custkey:Int64]
1408                Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1409                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1410        "
1411        )
1412    }
1413
1414    #[test]
1415    fn exists_subquery_no_cols() -> Result<()> {
1416        let sq = Arc::new(
1417            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1418                .filter(out_ref_col(DataType::Int64, "customer.c_custkey").eq(lit(1u32)))?
1419                .project(vec![col("orders.o_custkey")])?
1420                .build()?,
1421        );
1422
1423        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1424            .filter(exists(sq))?
1425            .project(vec![col("customer.c_custkey")])?
1426            .build()?;
1427
1428        // Other rule will pushdown `customer.c_custkey = 1`,
1429        assert_optimized_plan_equal!(
1430            plan,
1431            @r"
1432        Projection: customer.c_custkey [c_custkey:Int64]
1433          LeftSemi Join:  Filter: customer.c_custkey = UInt32(1) [c_custkey:Int64, c_name:Utf8]
1434            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1435            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1436              Projection: orders.o_custkey [o_custkey:Int64]
1437                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1438        "
1439        )
1440    }
1441
1442    /// Test for exists subquery with both columns in schema
1443    #[test]
1444    fn exists_subquery_with_no_correlated_cols() -> Result<()> {
1445        let sq = Arc::new(
1446            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1447                .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
1448                .project(vec![col("orders.o_custkey")])?
1449                .build()?,
1450        );
1451
1452        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1453            .filter(exists(sq))?
1454            .project(vec![col("customer.c_custkey")])?
1455            .build()?;
1456
1457        assert_optimized_plan_equal!(
1458            plan,
1459            @r"
1460        Projection: customer.c_custkey [c_custkey:Int64]
1461          LeftSemi Join:  Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8]
1462            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1463            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1464              Projection: orders.o_custkey [o_custkey:Int64]
1465                Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1466                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1467        "
1468        )
1469    }
1470
1471    /// Test for correlated exists subquery not equal
1472    #[test]
1473    fn exists_subquery_where_not_eq() -> Result<()> {
1474        let sq = Arc::new(
1475            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1476                .filter(
1477                    out_ref_col(DataType::Int64, "customer.c_custkey")
1478                        .not_eq(col("orders.o_custkey")),
1479                )?
1480                .project(vec![col("orders.o_custkey")])?
1481                .build()?,
1482        );
1483
1484        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1485            .filter(exists(sq))?
1486            .project(vec![col("customer.c_custkey")])?
1487            .build()?;
1488
1489        assert_optimized_plan_equal!(
1490            plan,
1491            @r"
1492        Projection: customer.c_custkey [c_custkey:Int64]
1493          LeftSemi Join:  Filter: customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1494            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1495            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1496              Projection: orders.o_custkey [o_custkey:Int64]
1497                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1498        "
1499        )
1500    }
1501
1502    /// Test for correlated exists subquery less than
1503    #[test]
1504    fn exists_subquery_where_less_than() -> Result<()> {
1505        let sq = Arc::new(
1506            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1507                .filter(
1508                    out_ref_col(DataType::Int64, "customer.c_custkey")
1509                        .lt(col("orders.o_custkey")),
1510                )?
1511                .project(vec![col("orders.o_custkey")])?
1512                .build()?,
1513        );
1514
1515        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1516            .filter(exists(sq))?
1517            .project(vec![col("customer.c_custkey")])?
1518            .build()?;
1519
1520        assert_optimized_plan_equal!(
1521            plan,
1522            @r"
1523        Projection: customer.c_custkey [c_custkey:Int64]
1524          LeftSemi Join:  Filter: customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1525            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1526            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1527              Projection: orders.o_custkey [o_custkey:Int64]
1528                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1529        "
1530        )
1531    }
1532
1533    /// Test for correlated exists subquery filter with subquery disjunction
1534    #[test]
1535    fn exists_subquery_with_subquery_disjunction() -> Result<()> {
1536        let sq = Arc::new(
1537            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1538                .filter(
1539                    out_ref_col(DataType::Int64, "customer.c_custkey")
1540                        .eq(col("orders.o_custkey"))
1541                        .or(col("o_orderkey").eq(lit(1))),
1542                )?
1543                .project(vec![col("orders.o_custkey")])?
1544                .build()?,
1545        );
1546
1547        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1548            .filter(exists(sq))?
1549            .project(vec![col("customer.c_custkey")])?
1550            .build()?;
1551
1552        assert_optimized_plan_equal!(
1553            plan,
1554            @r"
1555        Projection: customer.c_custkey [c_custkey:Int64]
1556          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1557            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1558            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]
1559              Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]
1560                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1561        "
1562        )
1563    }
1564
1565    /// Test for correlated exists without projection
1566    #[test]
1567    fn exists_subquery_no_projection() -> Result<()> {
1568        let sq = Arc::new(
1569            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1570                .filter(
1571                    out_ref_col(DataType::Int64, "customer.c_custkey")
1572                        .eq(col("orders.o_custkey")),
1573                )?
1574                .build()?,
1575        );
1576
1577        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1578            .filter(exists(sq))?
1579            .project(vec![col("customer.c_custkey")])?
1580            .build()?;
1581
1582        assert_optimized_plan_equal!(
1583            plan,
1584            @r"
1585        Projection: customer.c_custkey [c_custkey:Int64]
1586          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1587            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1588            SubqueryAlias: __correlated_sq_1 [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1589              TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1590        "
1591        )
1592    }
1593
1594    /// Test for correlated exists expressions
1595    #[test]
1596    fn exists_subquery_project_expr() -> Result<()> {
1597        let sq = Arc::new(
1598            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1599                .filter(
1600                    out_ref_col(DataType::Int64, "customer.c_custkey")
1601                        .eq(col("orders.o_custkey")),
1602                )?
1603                .project(vec![col("orders.o_custkey").add(lit(1))])?
1604                .build()?,
1605        );
1606
1607        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1608            .filter(exists(sq))?
1609            .project(vec![col("customer.c_custkey")])?
1610            .build()?;
1611
1612        assert_optimized_plan_equal!(
1613            plan,
1614            @r"
1615        Projection: customer.c_custkey [c_custkey:Int64]
1616          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1617            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1618            SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1619              Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1620                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1621        "
1622        )
1623    }
1624
1625    /// Test for correlated exists subquery filter with additional filters
1626    #[test]
1627    fn exists_subquery_should_support_additional_filters() -> Result<()> {
1628        let sq = Arc::new(
1629            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1630                .filter(
1631                    out_ref_col(DataType::Int64, "customer.c_custkey")
1632                        .eq(col("orders.o_custkey")),
1633                )?
1634                .project(vec![col("orders.o_custkey")])?
1635                .build()?,
1636        );
1637        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1638            .filter(exists(sq).and(col("c_custkey").eq(lit(1))))?
1639            .project(vec![col("customer.c_custkey")])?
1640            .build()?;
1641
1642        assert_optimized_plan_equal!(
1643            plan,
1644            @r"
1645        Projection: customer.c_custkey [c_custkey:Int64]
1646          Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1647            LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1648              TableScan: customer [c_custkey:Int64, c_name:Utf8]
1649              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1650                Projection: orders.o_custkey [o_custkey:Int64]
1651                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1652        "
1653        )
1654    }
1655
1656    /// Test for correlated exists subquery filter with disjunctions
1657    #[test]
1658    fn exists_subquery_disjunction() -> Result<()> {
1659        let sq = Arc::new(
1660            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1661                .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
1662                .project(vec![col("orders.o_custkey")])?
1663                .build()?,
1664        );
1665
1666        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1667            .filter(exists(sq).or(col("customer.c_custkey").eq(lit(1))))?
1668            .project(vec![col("customer.c_custkey")])?
1669            .build()?;
1670
1671        assert_optimized_plan_equal!(
1672            plan,
1673            @r"
1674        Projection: customer.c_custkey [c_custkey:Int64]
1675          Filter: __correlated_sq_1.mark OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, mark:Boolean]
1676            LeftMark Join:  Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, mark:Boolean]
1677              TableScan: customer [c_custkey:Int64, c_name:Utf8]
1678              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1679                Projection: orders.o_custkey [o_custkey:Int64]
1680                  Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1681                    TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1682        "
1683        )
1684    }
1685
1686    /// Test for correlated EXISTS subquery filter
1687    #[test]
1688    fn exists_subquery_correlated() -> Result<()> {
1689        let sq = Arc::new(
1690            LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1691                .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1692                .project(vec![col("c")])?
1693                .build()?,
1694        );
1695
1696        let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1697            .filter(exists(sq))?
1698            .project(vec![col("test.c")])?
1699            .build()?;
1700
1701        assert_optimized_plan_equal!(
1702            plan,
1703            @r"
1704        Projection: test.c [c:UInt32]
1705          LeftSemi Join:  Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1706            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1707            SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1708              Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1709                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1710        "
1711        )
1712    }
1713
1714    /// Test for single exists subquery filter
1715    #[test]
1716    fn exists_subquery_simple() -> Result<()> {
1717        let table_scan = test_table_scan()?;
1718        let plan = LogicalPlanBuilder::from(table_scan)
1719            .filter(exists(test_subquery_with_name("sq")?))?
1720            .project(vec![col("test.b")])?
1721            .build()?;
1722
1723        assert_optimized_plan_equal!(
1724            plan,
1725            @r"
1726        Projection: test.b [b:UInt32]
1727          LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1728            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1729            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1730              Projection: sq.c [c:UInt32]
1731                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1732        "
1733        )
1734    }
1735
1736    /// Test for single NOT exists subquery filter
1737    #[test]
1738    fn not_exists_subquery_simple() -> Result<()> {
1739        let table_scan = test_table_scan()?;
1740        let plan = LogicalPlanBuilder::from(table_scan)
1741            .filter(not_exists(test_subquery_with_name("sq")?))?
1742            .project(vec![col("test.b")])?
1743            .build()?;
1744
1745        assert_optimized_plan_equal!(
1746            plan,
1747            @r"
1748        Projection: test.b [b:UInt32]
1749          LeftAnti Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1750            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1751            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1752              Projection: sq.c [c:UInt32]
1753                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1754        "
1755        )
1756    }
1757
1758    #[test]
1759    fn two_exists_subquery_with_outer_filter() -> Result<()> {
1760        let table_scan = test_table_scan()?;
1761        let subquery_scan1 = test_table_scan_with_name("sq1")?;
1762        let subquery_scan2 = test_table_scan_with_name("sq2")?;
1763
1764        let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1765            .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq1.a")))?
1766            .project(vec![col("c")])?
1767            .build()?;
1768
1769        let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1770            .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq2.a")))?
1771            .project(vec![col("c")])?
1772            .build()?;
1773
1774        let plan = LogicalPlanBuilder::from(table_scan)
1775            .filter(
1776                exists(Arc::new(subquery1))
1777                    .and(exists(Arc::new(subquery2)).and(col("test.c").gt(lit(1u32)))),
1778            )?
1779            .project(vec![col("test.b")])?
1780            .build()?;
1781
1782        assert_optimized_plan_equal!(
1783            plan,
1784            @r"
1785        Projection: test.b [b:UInt32]
1786          Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]
1787            LeftSemi Join:  Filter: test.a = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
1788              LeftSemi Join:  Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1789                TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1790                SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1791                  Projection: sq1.c, sq1.a [c:UInt32, a:UInt32]
1792                    TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]
1793              SubqueryAlias: __correlated_sq_2 [c:UInt32, a:UInt32]
1794                Projection: sq2.c, sq2.a [c:UInt32, a:UInt32]
1795                  TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]
1796        "
1797        )
1798    }
1799
1800    #[test]
1801    fn exists_subquery_expr_filter() -> Result<()> {
1802        let table_scan = test_table_scan()?;
1803        let subquery_scan = test_table_scan_with_name("sq")?;
1804        let subquery = LogicalPlanBuilder::from(subquery_scan)
1805            .filter(
1806                (lit(1u32) + col("sq.a"))
1807                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1808            )?
1809            .project(vec![lit(1u32)])?
1810            .build()?;
1811        let plan = LogicalPlanBuilder::from(table_scan)
1812            .filter(exists(Arc::new(subquery)))?
1813            .project(vec![col("test.b")])?
1814            .build()?;
1815
1816        assert_optimized_plan_equal!(
1817            plan,
1818            @r"
1819        Projection: test.b [b:UInt32]
1820          LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1821            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1822            SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, a:UInt32]
1823              Projection: UInt32(1), sq.a [UInt32(1):UInt32, a:UInt32]
1824                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1825        "
1826        )
1827    }
1828
1829    #[test]
1830    fn exists_subquery_with_same_table() -> Result<()> {
1831        let outer_scan = test_table_scan()?;
1832        let subquery_scan = test_table_scan()?;
1833        let subquery = LogicalPlanBuilder::from(subquery_scan)
1834            .filter(col("test.a").gt(col("test.b")))?
1835            .project(vec![col("c")])?
1836            .build()?;
1837
1838        let plan = LogicalPlanBuilder::from(outer_scan)
1839            .filter(exists(Arc::new(subquery)))?
1840            .project(vec![col("test.b")])?
1841            .build()?;
1842
1843        // Subquery and outer query refer to the same table.
1844        assert_optimized_plan_equal!(
1845            plan,
1846            @r"
1847        Projection: test.b [b:UInt32]
1848          LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1849            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1850            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1851              Projection: test.c [c:UInt32]
1852                Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]
1853                  TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1854        "
1855        )
1856    }
1857
1858    #[test]
1859    fn exists_distinct_subquery() -> Result<()> {
1860        let table_scan = test_table_scan()?;
1861        let subquery_scan = test_table_scan_with_name("sq")?;
1862        let subquery = LogicalPlanBuilder::from(subquery_scan)
1863            .filter(
1864                (lit(1u32) + col("sq.a"))
1865                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1866            )?
1867            .project(vec![col("sq.c")])?
1868            .distinct()?
1869            .build()?;
1870        let plan = LogicalPlanBuilder::from(table_scan)
1871            .filter(exists(Arc::new(subquery)))?
1872            .project(vec![col("test.b")])?
1873            .build()?;
1874
1875        assert_optimized_plan_equal!(
1876            plan,
1877            @r"
1878        Projection: test.b [b:UInt32]
1879          LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1880            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1881            SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1882              Distinct: [c:UInt32, a:UInt32]
1883                Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1884                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1885        "
1886        )
1887    }
1888
1889    #[test]
1890    fn exists_distinct_expr_subquery() -> Result<()> {
1891        let table_scan = test_table_scan()?;
1892        let subquery_scan = test_table_scan_with_name("sq")?;
1893        let subquery = LogicalPlanBuilder::from(subquery_scan)
1894            .filter(
1895                (lit(1u32) + col("sq.a"))
1896                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1897            )?
1898            .project(vec![col("sq.b") + col("sq.c")])?
1899            .distinct()?
1900            .build()?;
1901        let plan = LogicalPlanBuilder::from(table_scan)
1902            .filter(exists(Arc::new(subquery)))?
1903            .project(vec![col("test.b")])?
1904            .build()?;
1905
1906        assert_optimized_plan_equal!(
1907            plan,
1908            @r"
1909        Projection: test.b [b:UInt32]
1910          LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1911            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1912            SubqueryAlias: __correlated_sq_1 [sq.b + sq.c:UInt32, a:UInt32]
1913              Distinct: [sq.b + sq.c:UInt32, a:UInt32]
1914                Projection: sq.b + sq.c, sq.a [sq.b + sq.c:UInt32, a:UInt32]
1915                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1916        "
1917        )
1918    }
1919
1920    #[test]
1921    fn exists_distinct_subquery_with_literal() -> Result<()> {
1922        let table_scan = test_table_scan()?;
1923        let subquery_scan = test_table_scan_with_name("sq")?;
1924        let subquery = LogicalPlanBuilder::from(subquery_scan)
1925            .filter(
1926                (lit(1u32) + col("sq.a"))
1927                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1928            )?
1929            .project(vec![lit(1u32), col("sq.c")])?
1930            .distinct()?
1931            .build()?;
1932        let plan = LogicalPlanBuilder::from(table_scan)
1933            .filter(exists(Arc::new(subquery)))?
1934            .project(vec![col("test.b")])?
1935            .build()?;
1936
1937        assert_optimized_plan_equal!(
1938            plan,
1939            @r"
1940        Projection: test.b [b:UInt32]
1941          LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1942            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1943            SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, c:UInt32, a:UInt32]
1944              Distinct: [UInt32(1):UInt32, c:UInt32, a:UInt32]
1945                Projection: UInt32(1), sq.c, sq.a [UInt32(1):UInt32, c:UInt32, a:UInt32]
1946                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1947        "
1948        )
1949    }
1950
1951    #[test]
1952    fn exists_uncorrelated_unnest() -> Result<()> {
1953        let subquery_table_source = table_source(&Schema::new(vec![Field::new(
1954            "arr",
1955            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
1956            true,
1957        )]));
1958        let subquery = LogicalPlanBuilder::scan_with_filters(
1959            "sq",
1960            subquery_table_source,
1961            None,
1962            vec![],
1963        )?
1964        .unnest_column("arr")?
1965        .build()?;
1966        let table_scan = test_table_scan()?;
1967        let plan = LogicalPlanBuilder::from(table_scan)
1968            .filter(exists(Arc::new(subquery)))?
1969            .project(vec![col("test.b")])?
1970            .build()?;
1971
1972        assert_optimized_plan_equal!(
1973            plan,
1974            @r"
1975        Projection: test.b [b:UInt32]
1976          LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1977            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1978            SubqueryAlias: __correlated_sq_1 [arr:Int32;N]
1979              Unnest: lists[sq.arr|depth=1] structs[] [arr:Int32;N]
1980                TableScan: sq [arr:List(Field { data_type: Int32, nullable: true });N]
1981        "
1982        )
1983    }
1984
1985    #[test]
1986    fn exists_correlated_unnest() -> Result<()> {
1987        let table_scan = test_table_scan()?;
1988        let subquery_table_source = table_source(&Schema::new(vec![Field::new(
1989            "a",
1990            DataType::List(Arc::new(Field::new_list_field(DataType::UInt32, true))),
1991            true,
1992        )]));
1993        let subquery = LogicalPlanBuilder::scan_with_filters(
1994            "sq",
1995            subquery_table_source,
1996            None,
1997            vec![],
1998        )?
1999        .unnest_column("a")?
2000        .filter(col("a").eq(out_ref_col(DataType::UInt32, "test.b")))?
2001        .build()?;
2002        let plan = LogicalPlanBuilder::from(table_scan)
2003            .filter(exists(Arc::new(subquery)))?
2004            .project(vec![col("test.b")])?
2005            .build()?;
2006
2007        assert_optimized_plan_equal!(
2008            plan,
2009            @r"
2010        Projection: test.b [b:UInt32]
2011          LeftSemi Join:  Filter: __correlated_sq_1.a = test.b [a:UInt32, b:UInt32, c:UInt32]
2012            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
2013            SubqueryAlias: __correlated_sq_1 [a:UInt32;N]
2014              Unnest: lists[sq.a|depth=1] structs[] [a:UInt32;N]
2015                TableScan: sq [a:List(Field { data_type: UInt32, nullable: true });N]
2016        "
2017        )
2018    }
2019
2020    #[test]
2021    fn upper_case_ident() -> Result<()> {
2022        let fields = vec![
2023            Field::new("A", DataType::UInt32, false),
2024            Field::new("B", DataType::UInt32, false),
2025        ];
2026
2027        let schema = Schema::new(fields);
2028        let table_scan_a = table_scan(Some("\"TEST_A\""), &schema, None)?.build()?;
2029        let table_scan_b = table_scan(Some("\"TEST_B\""), &schema, None)?.build()?;
2030
2031        let subquery = LogicalPlanBuilder::from(table_scan_b)
2032            .filter(col("\"A\"").eq(out_ref_col(DataType::UInt32, "\"TEST_A\".\"A\"")))?
2033            .project(vec![lit(1)])?
2034            .build()?;
2035
2036        let plan = LogicalPlanBuilder::from(table_scan_a)
2037            .filter(exists(Arc::new(subquery)))?
2038            .project(vec![col("\"TEST_A\".\"B\"")])?
2039            .build()?;
2040
2041        assert_optimized_plan_equal!(
2042            plan,
2043            @r"
2044        Projection: TEST_A.B [B:UInt32]
2045          LeftSemi Join:  Filter: __correlated_sq_1.A = TEST_A.A [A:UInt32, B:UInt32]
2046            TableScan: TEST_A [A:UInt32, B:UInt32]
2047            SubqueryAlias: __correlated_sq_1 [Int32(1):Int32, A:UInt32]
2048              Projection: Int32(1), TEST_B.A [Int32(1):Int32, A:UInt32]
2049                TableScan: TEST_B [A:UInt32, B:UInt32]
2050        "
2051        )
2052    }
2053}