Skip to main content

datafusion_optimizer/
decorrelate_predicate_subquery.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DecorrelatePredicateSubquery`] converts `IN`/`EXISTS` subquery predicates to `SEMI`/`ANTI` joins
19use std::collections::BTreeSet;
20use std::ops::Deref;
21use std::sync::Arc;
22
23use crate::decorrelate::PullUpCorrelatedExpr;
24use crate::optimizer::ApplyOrder;
25use crate::utils::replace_qualified_name;
26use crate::{OptimizerConfig, OptimizerRule};
27
28use datafusion_common::alias::AliasGenerator;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30use datafusion_common::{
31    Column, DFSchemaRef, ExprSchema, NullEquality, Result, assert_or_internal_err,
32    plan_err,
33};
34use datafusion_expr::expr::{Exists, InSubquery};
35use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
36use datafusion_expr::logical_plan::{JoinType, Subquery};
37use datafusion_expr::utils::{conjunction, expr_to_columns, split_conjunction_owned};
38use datafusion_expr::{
39    BinaryExpr, Expr, Filter, LogicalPlan, LogicalPlanBuilder, Operator, exists,
40    in_subquery, lit, not, not_exists, not_in_subquery,
41};
42
43use log::debug;
44
45/// Optimizer rule for rewriting predicate(IN/EXISTS) subquery to left semi/anti joins
46#[derive(Default, Debug)]
47pub struct DecorrelatePredicateSubquery {}
48
49impl DecorrelatePredicateSubquery {
50    #[expect(missing_docs)]
51    pub fn new() -> Self {
52        Self::default()
53    }
54}
55
56impl OptimizerRule for DecorrelatePredicateSubquery {
57    fn supports_rewrite(&self) -> bool {
58        true
59    }
60
61    fn rewrite(
62        &self,
63        plan: LogicalPlan,
64        config: &dyn OptimizerConfig,
65    ) -> Result<Transformed<LogicalPlan>> {
66        let plan = plan
67            .map_subqueries(|subquery| {
68                subquery.transform_down(|p| self.rewrite(p, config))
69            })?
70            .data;
71
72        let LogicalPlan::Filter(filter) = plan else {
73            return Ok(Transformed::no(plan));
74        };
75
76        if !has_subquery(&filter.predicate) {
77            return Ok(Transformed::no(LogicalPlan::Filter(filter)));
78        }
79
80        let (with_subqueries, mut other_exprs): (Vec<_>, Vec<_>) =
81            split_conjunction_owned(filter.predicate)
82                .into_iter()
83                .partition(has_subquery);
84
85        assert_or_internal_err!(
86            !with_subqueries.is_empty(),
87            "can not find expected subqueries in DecorrelatePredicateSubquery"
88        );
89
90        // iterate through all exists clauses in predicate, turning each into a join
91        let mut cur_input = Arc::unwrap_or_clone(filter.input);
92        let original_schema = cur_input.schema().columns();
93        for subquery_expr in with_subqueries {
94            match extract_subquery_info(subquery_expr) {
95                // The subquery expression is at the top level of the filter
96                SubqueryPredicate::Top(subquery) => {
97                    match build_join_top(&subquery, &cur_input, config.alias_generator())?
98                    {
99                        Some(plan) => cur_input = plan,
100                        // If the subquery can not be converted to a Join, reconstruct the subquery expression and add it to the Filter
101                        None => other_exprs.push(subquery.expr()),
102                    }
103                }
104                // The subquery expression is embedded within another expression
105                SubqueryPredicate::Embedded(expr) => {
106                    let (plan, expr_without_subqueries) =
107                        rewrite_inner_subqueries(cur_input, expr, config)?;
108                    cur_input = plan;
109                    other_exprs.push(expr_without_subqueries);
110                }
111            }
112        }
113
114        let expr = conjunction(other_exprs);
115        if let Some(expr) = expr {
116            let new_filter = Filter::try_new(expr, Arc::new(cur_input))?;
117            cur_input = LogicalPlan::Filter(new_filter);
118        }
119
120        if cur_input.schema().fields().len() != original_schema.len() {
121            cur_input = LogicalPlanBuilder::from(cur_input)
122                .project(original_schema.into_iter().map(Expr::from))?
123                .build()?;
124        }
125
126        Ok(Transformed::yes(cur_input))
127    }
128
129    fn name(&self) -> &str {
130        "decorrelate_predicate_subquery"
131    }
132
133    fn apply_order(&self) -> Option<ApplyOrder> {
134        Some(ApplyOrder::TopDown)
135    }
136}
137
138fn rewrite_inner_subqueries(
139    outer: LogicalPlan,
140    expr: Expr,
141    config: &dyn OptimizerConfig,
142) -> Result<(LogicalPlan, Expr)> {
143    let mut cur_input = outer;
144    let alias = config.alias_generator();
145    let expr_without_subqueries = expr.transform(|e| match e {
146        Expr::Exists(Exists {
147            subquery: Subquery { subquery, .. },
148            negated,
149        }) => match mark_join(&cur_input, &subquery, None, negated, alias)? {
150            Some((plan, exists_expr)) => {
151                cur_input = plan;
152                Ok(Transformed::yes(exists_expr))
153            }
154            None if negated => Ok(Transformed::no(not_exists(subquery))),
155            None => Ok(Transformed::no(exists(subquery))),
156        },
157        Expr::InSubquery(InSubquery {
158            expr,
159            subquery: Subquery { subquery, .. },
160            negated,
161        }) => {
162            let in_predicate = subquery
163                .head_output_expr()?
164                .map_or(plan_err!("single expression required."), |output_expr| {
165                    Ok(Expr::eq(*expr.clone(), output_expr))
166                })?;
167            match mark_join(&cur_input, &subquery, Some(&in_predicate), negated, alias)? {
168                Some((plan, exists_expr)) => {
169                    cur_input = plan;
170                    Ok(Transformed::yes(exists_expr))
171                }
172                None if negated => Ok(Transformed::no(not_in_subquery(*expr, subquery))),
173                None => Ok(Transformed::no(in_subquery(*expr, subquery))),
174            }
175        }
176        _ => Ok(Transformed::no(e)),
177    })?;
178    Ok((cur_input, expr_without_subqueries.data))
179}
180
181enum SubqueryPredicate {
182    // The subquery expression is at the top level of the filter and can be fully replaced by a
183    // semi/anti join
184    Top(SubqueryInfo),
185    // The subquery expression is embedded within another expression and is replaced using an
186    // existence join
187    Embedded(Expr),
188}
189
190fn extract_subquery_info(expr: Expr) -> SubqueryPredicate {
191    match expr {
192        Expr::Not(not_expr) => match *not_expr {
193            Expr::InSubquery(InSubquery {
194                expr,
195                subquery,
196                negated,
197            }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
198                subquery, *expr, !negated,
199            )),
200            Expr::Exists(Exists { subquery, negated }) => {
201                SubqueryPredicate::Top(SubqueryInfo::new(subquery, !negated))
202            }
203            expr => SubqueryPredicate::Embedded(not(expr)),
204        },
205        Expr::InSubquery(InSubquery {
206            expr,
207            subquery,
208            negated,
209        }) => SubqueryPredicate::Top(SubqueryInfo::new_with_in_expr(
210            subquery, *expr, negated,
211        )),
212        Expr::Exists(Exists { subquery, negated }) => {
213            SubqueryPredicate::Top(SubqueryInfo::new(subquery, negated))
214        }
215        expr => SubqueryPredicate::Embedded(expr),
216    }
217}
218
219fn has_subquery(expr: &Expr) -> bool {
220    expr.exists(|e| match e {
221        Expr::InSubquery(_) | Expr::Exists(_) => Ok(true),
222        _ => Ok(false),
223    })
224    .unwrap()
225}
226
227/// Optimize the subquery to left-anti/left-semi join.
228/// If the subquery is a correlated subquery, we need extract the join predicate from the subquery.
229///
230/// For example, given a query like:
231/// `select t1.a, t1.b from t1 where t1 in (select t2.a from t2 where t1.b = t2.b and t1.c > t2.c)`
232///
233/// The optimized plan will be:
234///
235/// ```text
236/// Projection: t1.a, t1.b
237///   LeftSemi Join:  Filter: t1.a = __correlated_sq_1.a AND t1.b = __correlated_sq_1.b AND t1.c > __correlated_sq_1.c
238///     TableScan: t1
239///     SubqueryAlias: __correlated_sq_1
240///       Projection: t2.a, t2.b, t2.c
241///         TableScan: t2
242/// ```
243///
244/// Given another query like:
245/// `select t1.id from t1 where exists(SELECT t2.id FROM t2 WHERE t1.id = t2.id)`
246///
247/// The optimized plan will be:
248///
249/// ```text
250/// Projection: t1.id
251///   LeftSemi Join:  Filter: t1.id = __correlated_sq_1.id
252///     TableScan: t1
253///     SubqueryAlias: __correlated_sq_1
254///       Projection: t2.id
255///         TableScan: t2
256/// ```
257fn build_join_top(
258    query_info: &SubqueryInfo,
259    left: &LogicalPlan,
260    alias: &Arc<AliasGenerator>,
261) -> Result<Option<LogicalPlan>> {
262    let where_in_expr_opt = &query_info.where_in_expr;
263    let in_predicate_opt = where_in_expr_opt
264        .clone()
265        .map(|where_in_expr| {
266            query_info
267                .query
268                .subquery
269                .head_output_expr()?
270                .map_or(plan_err!("single expression required."), |expr| {
271                    Ok(Expr::eq(where_in_expr, expr))
272                })
273        })
274        .map_or(Ok(None), |v| v.map(Some))?;
275
276    let join_type = match query_info.negated {
277        true => JoinType::LeftAnti,
278        false => JoinType::LeftSemi,
279    };
280    let subquery = query_info.query.subquery.as_ref();
281    let subquery_alias = alias.next("__correlated_sq");
282    build_join(
283        left,
284        subquery,
285        in_predicate_opt.as_ref(),
286        join_type,
287        subquery_alias,
288    )
289}
290
291/// This is used to handle the case when the subquery is embedded in a more complex boolean
292/// expression like and OR. For example
293///
294/// `select t1.id from t1 where t1.id < 0 OR exists(SELECT t2.id FROM t2 WHERE t1.id = t2.id)`
295///
296/// The optimized plan will be:
297///
298/// ```text
299/// Projection: t1.id
300///   Filter: t1.id < 0 OR __correlated_sq_1.mark
301///     LeftMark Join:  Filter: t1.id = __correlated_sq_1.id
302///       TableScan: t1
303///       SubqueryAlias: __correlated_sq_1
304///         Projection: t2.id
305///           TableScan: t2
306fn mark_join(
307    left: &LogicalPlan,
308    subquery: &LogicalPlan,
309    in_predicate_opt: Option<&Expr>,
310    negated: bool,
311    alias_generator: &Arc<AliasGenerator>,
312) -> Result<Option<(LogicalPlan, Expr)>> {
313    let alias = alias_generator.next("__correlated_sq");
314
315    let exists_col = Expr::Column(Column::new(Some(alias.clone()), "mark"));
316    let exists_expr = if negated { !exists_col } else { exists_col };
317
318    Ok(
319        build_join(left, subquery, in_predicate_opt, JoinType::LeftMark, alias)?
320            .map(|plan| (plan, exists_expr)),
321    )
322}
323
324/// Check if join keys in the join filter may contain NULL values
325///
326/// Returns true if any join key column is nullable on either side.
327/// This is used to optimize null-aware anti joins: if all join keys are non-nullable,
328/// we can use a regular anti join instead of the more expensive null-aware variant.
329fn join_keys_may_be_null(
330    join_filter: &Expr,
331    left_schema: &DFSchemaRef,
332    right_schema: &DFSchemaRef,
333) -> Result<bool> {
334    // Extract columns from the join filter
335    let mut columns = std::collections::HashSet::new();
336    expr_to_columns(join_filter, &mut columns)?;
337
338    // Check if any column is nullable
339    for col in columns {
340        // Check in left schema
341        if let Ok(field) = left_schema.field_from_column(&col)
342            && field.as_ref().is_nullable()
343        {
344            return Ok(true);
345        }
346        // Check in right schema
347        if let Ok(field) = right_schema.field_from_column(&col)
348            && field.as_ref().is_nullable()
349        {
350            return Ok(true);
351        }
352    }
353
354    Ok(false)
355}
356
357fn build_join(
358    left: &LogicalPlan,
359    subquery: &LogicalPlan,
360    in_predicate_opt: Option<&Expr>,
361    join_type: JoinType,
362    alias: String,
363) -> Result<Option<LogicalPlan>> {
364    let mut pull_up = PullUpCorrelatedExpr::new()
365        .with_in_predicate_opt(in_predicate_opt.cloned())
366        .with_exists_sub_query(in_predicate_opt.is_none());
367
368    let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
369    if !pull_up.can_pull_up {
370        return Ok(None);
371    }
372
373    let sub_query_alias = LogicalPlanBuilder::from(new_plan)
374        .alias(alias.to_string())?
375        .build()?;
376    let mut all_correlated_cols = BTreeSet::new();
377    pull_up
378        .correlated_subquery_cols_map
379        .values()
380        .for_each(|cols| all_correlated_cols.extend(cols.clone()));
381
382    // alias the join filter
383    let join_filter_opt = conjunction(pull_up.join_filters)
384        .map_or(Ok(None), |filter| {
385            replace_qualified_name(filter, &all_correlated_cols, &alias).map(Some)
386        })?;
387
388    let join_filter = match (join_filter_opt, in_predicate_opt.cloned()) {
389        (
390            Some(join_filter),
391            Some(Expr::BinaryExpr(BinaryExpr {
392                left,
393                op: Operator::Eq,
394                right,
395            })),
396        ) => {
397            let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
398            let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col));
399            in_predicate.and(join_filter)
400        }
401        (Some(join_filter), _) => join_filter,
402        (
403            _,
404            Some(Expr::BinaryExpr(BinaryExpr {
405                left,
406                op: Operator::Eq,
407                right,
408            })),
409        ) => {
410            let right_col = create_col_from_scalar_expr(right.deref(), alias)?;
411
412            Expr::eq(left.deref().clone(), Expr::Column(right_col))
413        }
414        (None, None) => lit(true),
415        _ => return Ok(None),
416    };
417
418    if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) {
419        let right_schema = sub_query_alias.schema();
420
421        // Gather all columns needed for the join filter + predicates
422        let mut needed = std::collections::HashSet::new();
423        expr_to_columns(&join_filter, &mut needed)?;
424        if let Some(in_pred) = in_predicate_opt {
425            expr_to_columns(in_pred, &mut needed)?;
426        }
427
428        // Keep only columns that actually belong to the RIGHT child, and sort by their
429        // position in the right schema for deterministic order.
430        let mut right_cols_idx_and_col: Vec<(usize, Column)> = needed
431            .into_iter()
432            .filter_map(|c| right_schema.index_of_column(&c).ok().map(|idx| (idx, c)))
433            .collect();
434
435        right_cols_idx_and_col.sort_by_key(|(idx, _)| *idx);
436
437        let right_proj_exprs: Vec<Expr> = right_cols_idx_and_col
438            .into_iter()
439            .map(|(_, c)| Expr::Column(c))
440            .collect();
441
442        let right_projected = if !right_proj_exprs.is_empty() {
443            LogicalPlanBuilder::from(sub_query_alias.clone())
444                .project(right_proj_exprs)?
445                .build()?
446        } else {
447            // Degenerate case: no right columns referenced by the predicate(s)
448            sub_query_alias.clone()
449        };
450
451        // Mark joins don't use null-aware semantics (they use three-valued logic with mark column)
452        let new_plan = LogicalPlanBuilder::from(left.clone())
453            .join_on(right_projected, join_type, Some(join_filter))?
454            .build()?;
455
456        debug!(
457            "predicate subquery optimized:\n{}",
458            new_plan.display_indent()
459        );
460
461        return Ok(Some(new_plan));
462    }
463
464    // Determine if this should be a null-aware anti join
465    // Null-aware semantics are only needed for NOT IN subqueries, not NOT EXISTS:
466    // - NOT IN: Uses three-valued logic, requires null-aware handling
467    // - NOT EXISTS: Uses two-valued logic, regular anti join is correct
468    // We can distinguish them: NOT IN has in_predicate_opt, NOT EXISTS does not
469    //
470    // Additionally, if the join keys are non-nullable on both sides, we don't need
471    // null-aware semantics because NULLs cannot exist in the data.
472    let null_aware = join_type == JoinType::LeftAnti
473        && in_predicate_opt.is_some()
474        && join_keys_may_be_null(&join_filter, left.schema(), sub_query_alias.schema())?;
475
476    // join our sub query into the main plan
477    let new_plan = if null_aware {
478        // Use join_detailed_with_options to set null_aware flag
479        LogicalPlanBuilder::from(left.clone())
480            .join_detailed_with_options(
481                sub_query_alias,
482                join_type,
483                (Vec::<Column>::new(), Vec::<Column>::new()), // No equijoin keys, filter-based join
484                Some(join_filter),
485                NullEquality::NullEqualsNothing,
486                true, // null_aware
487            )?
488            .build()?
489    } else {
490        LogicalPlanBuilder::from(left.clone())
491            .join_on(sub_query_alias, join_type, Some(join_filter))?
492            .build()?
493    };
494    debug!(
495        "predicate subquery optimized:\n{}",
496        new_plan.display_indent()
497    );
498    Ok(Some(new_plan))
499}
500
501#[derive(Debug)]
502struct SubqueryInfo {
503    query: Subquery,
504    where_in_expr: Option<Expr>,
505    negated: bool,
506}
507
508impl SubqueryInfo {
509    pub fn new(query: Subquery, negated: bool) -> Self {
510        Self {
511            query,
512            where_in_expr: None,
513            negated,
514        }
515    }
516
517    pub fn new_with_in_expr(query: Subquery, expr: Expr, negated: bool) -> Self {
518        Self {
519            query,
520            where_in_expr: Some(expr),
521            negated,
522        }
523    }
524
525    pub fn expr(self) -> Expr {
526        match self.where_in_expr {
527            Some(expr) => match self.negated {
528                true => not_in_subquery(expr, self.query.subquery),
529                false => in_subquery(expr, self.query.subquery),
530            },
531            None => match self.negated {
532                true => not_exists(self.query.subquery),
533                false => exists(self.query.subquery),
534            },
535        }
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use std::ops::Add;
542
543    use super::*;
544    use crate::test::*;
545
546    use crate::assert_optimized_plan_eq_display_indent_snapshot;
547    use arrow::datatypes::{DataType, Field, Schema};
548    use datafusion_expr::builder::table_source;
549    use datafusion_expr::{and, binary_expr, col, out_ref_col, table_scan};
550
551    macro_rules! assert_optimized_plan_equal {
552        (
553            $plan:expr,
554            @ $expected:literal $(,)?
555        ) => {{
556            let rule: Arc<dyn crate::OptimizerRule + Send + Sync> = Arc::new(DecorrelatePredicateSubquery::new());
557            assert_optimized_plan_eq_display_indent_snapshot!(
558                rule,
559                $plan,
560                @ $expected,
561            )
562        }};
563    }
564
565    fn test_subquery_with_name(name: &str) -> Result<Arc<LogicalPlan>> {
566        let table_scan = test_table_scan_with_name(name)?;
567        Ok(Arc::new(
568            LogicalPlanBuilder::from(table_scan)
569                .project(vec![col("c")])?
570                .build()?,
571        ))
572    }
573
574    /// Test for several IN subquery expressions
575    #[test]
576    fn in_subquery_multiple() -> Result<()> {
577        let table_scan = test_table_scan()?;
578        let plan = LogicalPlanBuilder::from(table_scan)
579            .filter(and(
580                in_subquery(col("c"), test_subquery_with_name("sq_1")?),
581                in_subquery(col("b"), test_subquery_with_name("sq_2")?),
582            ))?
583            .project(vec![col("test.b")])?
584            .build()?;
585
586        assert_optimized_plan_equal!(
587            plan,
588            @r"
589        Projection: test.b [b:UInt32]
590          LeftSemi Join:  Filter: test.b = __correlated_sq_2.c [a:UInt32, b:UInt32, c:UInt32]
591            LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
592              TableScan: test [a:UInt32, b:UInt32, c:UInt32]
593              SubqueryAlias: __correlated_sq_1 [c:UInt32]
594                Projection: sq_1.c [c:UInt32]
595                  TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]
596            SubqueryAlias: __correlated_sq_2 [c:UInt32]
597              Projection: sq_2.c [c:UInt32]
598                TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]
599        "
600        )
601    }
602
603    /// Test for IN subquery with additional AND filter
604    #[test]
605    fn in_subquery_with_and_filters() -> Result<()> {
606        let table_scan = test_table_scan()?;
607        let plan = LogicalPlanBuilder::from(table_scan)
608            .filter(and(
609                in_subquery(col("c"), test_subquery_with_name("sq")?),
610                and(
611                    binary_expr(col("a"), Operator::Eq, lit(1_u32)),
612                    binary_expr(col("b"), Operator::Lt, lit(30_u32)),
613                ),
614            ))?
615            .project(vec![col("test.b")])?
616            .build()?;
617
618        assert_optimized_plan_equal!(
619            plan,
620            @r"
621        Projection: test.b [b:UInt32]
622          Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, b:UInt32, c:UInt32]
623            LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
624              TableScan: test [a:UInt32, b:UInt32, c:UInt32]
625              SubqueryAlias: __correlated_sq_1 [c:UInt32]
626                Projection: sq.c [c:UInt32]
627                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
628        "
629        )
630    }
631
632    /// Test for nested IN subqueries
633    #[test]
634    fn in_subquery_nested() -> Result<()> {
635        let table_scan = test_table_scan()?;
636
637        let subquery = LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
638            .filter(in_subquery(col("a"), test_subquery_with_name("sq_nested")?))?
639            .project(vec![col("a")])?
640            .build()?;
641
642        let plan = LogicalPlanBuilder::from(table_scan)
643            .filter(in_subquery(col("b"), Arc::new(subquery)))?
644            .project(vec![col("test.b")])?
645            .build()?;
646
647        assert_optimized_plan_equal!(
648            plan,
649            @r"
650        Projection: test.b [b:UInt32]
651          LeftSemi Join:  Filter: test.b = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
652            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
653            SubqueryAlias: __correlated_sq_2 [a:UInt32]
654              Projection: sq.a [a:UInt32]
655                LeftSemi Join:  Filter: sq.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
656                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
657                  SubqueryAlias: __correlated_sq_1 [c:UInt32]
658                    Projection: sq_nested.c [c:UInt32]
659                      TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]
660        "
661        )
662    }
663
664    /// Test multiple correlated subqueries
665    /// See subqueries.rs where_in_multiple()
666    #[test]
667    fn multiple_subqueries() -> Result<()> {
668        let orders = Arc::new(
669            LogicalPlanBuilder::from(scan_tpch_table("orders"))
670                .filter(
671                    col("orders.o_custkey")
672                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
673                )?
674                .project(vec![col("orders.o_custkey")])?
675                .build()?,
676        );
677        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
678            .filter(
679                in_subquery(col("customer.c_custkey"), Arc::clone(&orders))
680                    .and(in_subquery(col("customer.c_custkey"), orders)),
681            )?
682            .project(vec![col("customer.c_custkey")])?
683            .build()?;
684        debug!("plan to optimize:\n{}", plan.display_indent());
685
686        assert_optimized_plan_equal!(
687                plan,
688                @r"
689        Projection: customer.c_custkey [c_custkey:Int64]
690          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
691            LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
692              TableScan: customer [c_custkey:Int64, c_name:Utf8]
693              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
694                Projection: orders.o_custkey [o_custkey:Int64]
695                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
696            SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
697              Projection: orders.o_custkey [o_custkey:Int64]
698                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
699        "    
700        )
701    }
702
703    /// Test recursive correlated subqueries
704    /// See subqueries.rs where_in_recursive()
705    #[test]
706    fn recursive_subqueries() -> Result<()> {
707        let lineitem = Arc::new(
708            LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
709                .filter(
710                    col("lineitem.l_orderkey")
711                        .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
712                )?
713                .project(vec![col("lineitem.l_orderkey")])?
714                .build()?,
715        );
716
717        let orders = Arc::new(
718            LogicalPlanBuilder::from(scan_tpch_table("orders"))
719                .filter(
720                    in_subquery(col("orders.o_orderkey"), lineitem).and(
721                        col("orders.o_custkey")
722                            .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
723                    ),
724                )?
725                .project(vec![col("orders.o_custkey")])?
726                .build()?,
727        );
728
729        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
730            .filter(in_subquery(col("customer.c_custkey"), orders))?
731            .project(vec![col("customer.c_custkey")])?
732            .build()?;
733
734        assert_optimized_plan_equal!(
735            plan,
736            @r"
737        Projection: customer.c_custkey [c_custkey:Int64]
738          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_2.o_custkey [c_custkey:Int64, c_name:Utf8]
739            TableScan: customer [c_custkey:Int64, c_name:Utf8]
740            SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
741              Projection: orders.o_custkey [o_custkey:Int64]
742                LeftSemi Join:  Filter: orders.o_orderkey = __correlated_sq_1.l_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
743                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
744                  SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]
745                    Projection: lineitem.l_orderkey [l_orderkey:Int64]
746                      TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]
747        "
748        )
749    }
750
751    /// Test for correlated IN subquery filter with additional subquery filters
752    #[test]
753    fn in_subquery_with_subquery_filters() -> Result<()> {
754        let sq = Arc::new(
755            LogicalPlanBuilder::from(scan_tpch_table("orders"))
756                .filter(
757                    out_ref_col(DataType::Int64, "customer.c_custkey")
758                        .eq(col("orders.o_custkey"))
759                        .and(col("o_orderkey").eq(lit(1))),
760                )?
761                .project(vec![col("orders.o_custkey")])?
762                .build()?,
763        );
764
765        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
766            .filter(in_subquery(col("customer.c_custkey"), sq))?
767            .project(vec![col("customer.c_custkey")])?
768            .build()?;
769
770        assert_optimized_plan_equal!(
771            plan,
772            @r"
773        Projection: customer.c_custkey [c_custkey:Int64]
774          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
775            TableScan: customer [c_custkey:Int64, c_name:Utf8]
776            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
777              Projection: orders.o_custkey [o_custkey:Int64]
778                Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
779                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
780        "
781        )
782    }
783
784    /// Test for correlated IN subquery with no columns in schema
785    #[test]
786    fn in_subquery_no_cols() -> Result<()> {
787        let sq = Arc::new(
788            LogicalPlanBuilder::from(scan_tpch_table("orders"))
789                .filter(
790                    out_ref_col(DataType::Int64, "customer.c_custkey")
791                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
792                )?
793                .project(vec![col("orders.o_custkey")])?
794                .build()?,
795        );
796
797        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
798            .filter(in_subquery(col("customer.c_custkey"), sq))?
799            .project(vec![col("customer.c_custkey")])?
800            .build()?;
801
802        assert_optimized_plan_equal!(
803            plan,
804            @r"
805        Projection: customer.c_custkey [c_custkey:Int64]
806          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
807            TableScan: customer [c_custkey:Int64, c_name:Utf8]
808            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
809              Projection: orders.o_custkey [o_custkey:Int64]
810                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
811        "
812        )
813    }
814
815    /// Test for IN subquery with both columns in schema
816    #[test]
817    fn in_subquery_with_no_correlated_cols() -> Result<()> {
818        let sq = Arc::new(
819            LogicalPlanBuilder::from(scan_tpch_table("orders"))
820                .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
821                .project(vec![col("orders.o_custkey")])?
822                .build()?,
823        );
824
825        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
826            .filter(in_subquery(col("customer.c_custkey"), sq))?
827            .project(vec![col("customer.c_custkey")])?
828            .build()?;
829
830        assert_optimized_plan_equal!(
831            plan,
832            @r"
833        Projection: customer.c_custkey [c_custkey:Int64]
834          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
835            TableScan: customer [c_custkey:Int64, c_name:Utf8]
836            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
837              Projection: orders.o_custkey [o_custkey:Int64]
838                Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
839                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
840        "
841        )
842    }
843
844    /// Test for correlated IN subquery not equal
845    #[test]
846    fn in_subquery_where_not_eq() -> Result<()> {
847        let sq = Arc::new(
848            LogicalPlanBuilder::from(scan_tpch_table("orders"))
849                .filter(
850                    out_ref_col(DataType::Int64, "customer.c_custkey")
851                        .not_eq(col("orders.o_custkey")),
852                )?
853                .project(vec![col("orders.o_custkey")])?
854                .build()?,
855        );
856
857        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
858            .filter(in_subquery(col("customer.c_custkey"), sq))?
859            .project(vec![col("customer.c_custkey")])?
860            .build()?;
861
862        assert_optimized_plan_equal!(
863            plan,
864            @r"
865        Projection: customer.c_custkey [c_custkey:Int64]
866          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
867            TableScan: customer [c_custkey:Int64, c_name:Utf8]
868            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
869              Projection: orders.o_custkey [o_custkey:Int64]
870                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
871        "
872        )
873    }
874
875    /// Test for correlated IN subquery less than
876    #[test]
877    fn in_subquery_where_less_than() -> Result<()> {
878        let sq = Arc::new(
879            LogicalPlanBuilder::from(scan_tpch_table("orders"))
880                .filter(
881                    out_ref_col(DataType::Int64, "customer.c_custkey")
882                        .lt(col("orders.o_custkey")),
883                )?
884                .project(vec![col("orders.o_custkey")])?
885                .build()?,
886        );
887
888        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
889            .filter(in_subquery(col("customer.c_custkey"), sq))?
890            .project(vec![col("customer.c_custkey")])?
891            .build()?;
892
893        assert_optimized_plan_equal!(
894            plan,
895            @r"
896        Projection: customer.c_custkey [c_custkey:Int64]
897          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
898            TableScan: customer [c_custkey:Int64, c_name:Utf8]
899            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
900              Projection: orders.o_custkey [o_custkey:Int64]
901                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
902        "
903        )
904    }
905
906    /// Test for correlated IN subquery filter with subquery disjunction
907    #[test]
908    fn in_subquery_with_subquery_disjunction() -> Result<()> {
909        let sq = Arc::new(
910            LogicalPlanBuilder::from(scan_tpch_table("orders"))
911                .filter(
912                    out_ref_col(DataType::Int64, "customer.c_custkey")
913                        .eq(col("orders.o_custkey"))
914                        .or(col("o_orderkey").eq(lit(1))),
915                )?
916                .project(vec![col("orders.o_custkey")])?
917                .build()?,
918        );
919
920        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
921            .filter(in_subquery(col("customer.c_custkey"), sq))?
922            .project(vec![col("customer.c_custkey")])?
923            .build()?;
924
925        assert_optimized_plan_equal!(
926            plan,
927            @r"
928        Projection: customer.c_custkey [c_custkey:Int64]
929          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey AND (customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1)) [c_custkey:Int64, c_name:Utf8]
930            TableScan: customer [c_custkey:Int64, c_name:Utf8]
931            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]
932              Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]
933                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
934        "
935        )
936    }
937
938    /// Test for correlated IN without projection
939    #[test]
940    fn in_subquery_no_projection() -> Result<()> {
941        let sq = Arc::new(
942            LogicalPlanBuilder::from(scan_tpch_table("orders"))
943                .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
944                .build()?,
945        );
946
947        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
948            .filter(in_subquery(col("customer.c_custkey"), sq))?
949            .project(vec![col("customer.c_custkey")])?
950            .build()?;
951
952        // Maybe okay if the table only has a single column?
953        let expected = "Invalid (non-executable) plan after Analyzer\
954        \ncaused by\
955        \nError during planning: InSubquery should only return one column, but found 4";
956        assert_analyzer_check_err(vec![], plan, expected);
957
958        Ok(())
959    }
960
961    /// Test for correlated IN subquery join on expression
962    #[test]
963    fn in_subquery_join_expr() -> Result<()> {
964        let sq = Arc::new(
965            LogicalPlanBuilder::from(scan_tpch_table("orders"))
966                .filter(
967                    out_ref_col(DataType::Int64, "customer.c_custkey")
968                        .eq(col("orders.o_custkey")),
969                )?
970                .project(vec![col("orders.o_custkey")])?
971                .build()?,
972        );
973
974        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
975            .filter(in_subquery(col("customer.c_custkey").add(lit(1)), sq))?
976            .project(vec![col("customer.c_custkey")])?
977            .build()?;
978
979        assert_optimized_plan_equal!(
980            plan,
981            @r"
982        Projection: customer.c_custkey [c_custkey:Int64]
983          LeftSemi Join:  Filter: customer.c_custkey + Int32(1) = __correlated_sq_1.o_custkey AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
984            TableScan: customer [c_custkey:Int64, c_name:Utf8]
985            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
986              Projection: orders.o_custkey [o_custkey:Int64]
987                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
988        "
989        )
990    }
991
992    /// Test for correlated IN expressions
993    #[test]
994    fn in_subquery_project_expr() -> Result<()> {
995        let sq = Arc::new(
996            LogicalPlanBuilder::from(scan_tpch_table("orders"))
997                .filter(
998                    out_ref_col(DataType::Int64, "customer.c_custkey")
999                        .eq(col("orders.o_custkey")),
1000                )?
1001                .project(vec![col("orders.o_custkey").add(lit(1))])?
1002                .build()?,
1003        );
1004
1005        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1006            .filter(in_subquery(col("customer.c_custkey"), sq))?
1007            .project(vec![col("customer.c_custkey")])?
1008            .build()?;
1009
1010        assert_optimized_plan_equal!(
1011            plan,
1012            @r"
1013        Projection: customer.c_custkey [c_custkey:Int64]
1014          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.orders.o_custkey + Int32(1) AND customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1015            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1016            SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1017              Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1018                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1019        "
1020        )
1021    }
1022
1023    /// Test for correlated IN subquery multiple projected columns
1024    #[test]
1025    fn in_subquery_multi_col() -> Result<()> {
1026        let sq = Arc::new(
1027            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1028                .filter(
1029                    out_ref_col(DataType::Int64, "customer.c_custkey")
1030                        .eq(col("orders.o_custkey")),
1031                )?
1032                .project(vec![col("orders.o_custkey"), col("orders.o_orderkey")])?
1033                .build()?,
1034        );
1035
1036        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1037            .filter(
1038                in_subquery(col("customer.c_custkey"), sq)
1039                    .and(col("c_custkey").eq(lit(1))),
1040            )?
1041            .project(vec![col("customer.c_custkey")])?
1042            .build()?;
1043
1044        let expected = "Invalid (non-executable) plan after Analyzer\
1045        \ncaused by\
1046        \nError during planning: InSubquery should only return one column";
1047        assert_analyzer_check_err(vec![], plan, expected);
1048
1049        Ok(())
1050    }
1051
1052    /// Test for correlated IN subquery filter with additional filters
1053    #[test]
1054    fn should_support_additional_filters() -> Result<()> {
1055        let sq = Arc::new(
1056            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1057                .filter(
1058                    out_ref_col(DataType::Int64, "customer.c_custkey")
1059                        .eq(col("orders.o_custkey")),
1060                )?
1061                .project(vec![col("orders.o_custkey")])?
1062                .build()?,
1063        );
1064
1065        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1066            .filter(
1067                in_subquery(col("customer.c_custkey"), sq)
1068                    .and(col("c_custkey").eq(lit(1))),
1069            )?
1070            .project(vec![col("customer.c_custkey")])?
1071            .build()?;
1072
1073        assert_optimized_plan_equal!(
1074            plan,
1075            @r"
1076        Projection: customer.c_custkey [c_custkey:Int64]
1077          Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1078            LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1079              TableScan: customer [c_custkey:Int64, c_name:Utf8]
1080              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1081                Projection: orders.o_custkey [o_custkey:Int64]
1082                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1083        "
1084        )
1085    }
1086
1087    /// Test for correlated IN subquery filter
1088    #[test]
1089    fn in_subquery_correlated() -> Result<()> {
1090        let sq = Arc::new(
1091            LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1092                .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1093                .project(vec![col("c")])?
1094                .build()?,
1095        );
1096
1097        let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1098            .filter(in_subquery(col("c"), sq))?
1099            .project(vec![col("test.b")])?
1100            .build()?;
1101
1102        assert_optimized_plan_equal!(
1103            plan,
1104            @r"
1105        Projection: test.b [b:UInt32]
1106          LeftSemi Join:  Filter: test.c = __correlated_sq_1.c AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1107            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1108            SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1109              Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1110                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1111        "
1112        )
1113    }
1114
1115    /// Test for single IN subquery filter
1116    #[test]
1117    fn in_subquery_simple() -> Result<()> {
1118        let table_scan = test_table_scan()?;
1119        let plan = LogicalPlanBuilder::from(table_scan)
1120            .filter(in_subquery(col("c"), test_subquery_with_name("sq")?))?
1121            .project(vec![col("test.b")])?
1122            .build()?;
1123
1124        assert_optimized_plan_equal!(
1125            plan,
1126            @r"
1127        Projection: test.b [b:UInt32]
1128          LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1129            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1130            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1131              Projection: sq.c [c:UInt32]
1132                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1133        "
1134        )
1135    }
1136
1137    /// Test for single NOT IN subquery filter
1138    #[test]
1139    fn not_in_subquery_simple() -> Result<()> {
1140        let table_scan = test_table_scan()?;
1141        let plan = LogicalPlanBuilder::from(table_scan)
1142            .filter(not_in_subquery(col("c"), test_subquery_with_name("sq")?))?
1143            .project(vec![col("test.b")])?
1144            .build()?;
1145
1146        assert_optimized_plan_equal!(
1147            plan,
1148            @r"
1149        Projection: test.b [b:UInt32]
1150          LeftAnti Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1151            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1152            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1153              Projection: sq.c [c:UInt32]
1154                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1155        "
1156        )
1157    }
1158
1159    #[test]
1160    fn wrapped_not_in_subquery() -> Result<()> {
1161        let table_scan = test_table_scan()?;
1162        let plan = LogicalPlanBuilder::from(table_scan)
1163            .filter(not(in_subquery(col("c"), test_subquery_with_name("sq")?)))?
1164            .project(vec![col("test.b")])?
1165            .build()?;
1166
1167        assert_optimized_plan_equal!(
1168            plan,
1169            @r"
1170        Projection: test.b [b:UInt32]
1171          LeftAnti Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1172            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1173            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1174              Projection: sq.c [c:UInt32]
1175                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1176        "
1177        )
1178    }
1179
1180    #[test]
1181    fn wrapped_not_not_in_subquery() -> Result<()> {
1182        let table_scan = test_table_scan()?;
1183        let plan = LogicalPlanBuilder::from(table_scan)
1184            .filter(not(not_in_subquery(
1185                col("c"),
1186                test_subquery_with_name("sq")?,
1187            )))?
1188            .project(vec![col("test.b")])?
1189            .build()?;
1190
1191        assert_optimized_plan_equal!(
1192            plan,
1193            @r"
1194        Projection: test.b [b:UInt32]
1195          LeftSemi Join:  Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1196            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1197            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1198              Projection: sq.c [c:UInt32]
1199                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1200        "
1201        )
1202    }
1203
1204    #[test]
1205    fn in_subquery_both_side_expr() -> Result<()> {
1206        let table_scan = test_table_scan()?;
1207        let subquery_scan = test_table_scan_with_name("sq")?;
1208
1209        let subquery = LogicalPlanBuilder::from(subquery_scan)
1210            .project(vec![col("c") * lit(2u32)])?
1211            .build()?;
1212
1213        let plan = LogicalPlanBuilder::from(table_scan)
1214            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1215            .project(vec![col("test.b")])?
1216            .build()?;
1217
1218        assert_optimized_plan_equal!(
1219            plan,
1220            @r"
1221        Projection: test.b [b:UInt32]
1222          LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1223            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1224            SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32]
1225              Projection: sq.c * UInt32(2) [sq.c * UInt32(2):UInt32]
1226                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1227        "
1228        )
1229    }
1230
1231    #[test]
1232    fn in_subquery_join_filter_and_inner_filter() -> Result<()> {
1233        let table_scan = test_table_scan()?;
1234        let subquery_scan = test_table_scan_with_name("sq")?;
1235
1236        let subquery = LogicalPlanBuilder::from(subquery_scan)
1237            .filter(
1238                out_ref_col(DataType::UInt32, "test.a")
1239                    .eq(col("sq.a"))
1240                    .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1241            )?
1242            .project(vec![col("c") * lit(2u32)])?
1243            .build()?;
1244
1245        let plan = LogicalPlanBuilder::from(table_scan)
1246            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1247            .project(vec![col("test.b")])?
1248            .build()?;
1249
1250        assert_optimized_plan_equal!(
1251            plan,
1252            @r"
1253        Projection: test.b [b:UInt32]
1254          LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1255            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1256            SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32]
1257              Projection: sq.c * UInt32(2), sq.a [sq.c * UInt32(2):UInt32, a:UInt32]
1258                Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]
1259                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1260        "
1261        )
1262    }
1263
1264    #[test]
1265    fn in_subquery_multi_project_subquery_cols() -> Result<()> {
1266        let table_scan = test_table_scan()?;
1267        let subquery_scan = test_table_scan_with_name("sq")?;
1268
1269        let subquery = LogicalPlanBuilder::from(subquery_scan)
1270            .filter(
1271                out_ref_col(DataType::UInt32, "test.a")
1272                    .add(out_ref_col(DataType::UInt32, "test.b"))
1273                    .eq(col("sq.a").add(col("sq.b")))
1274                    .and(col("sq.a").add(lit(1u32)).eq(col("sq.b"))),
1275            )?
1276            .project(vec![col("c") * lit(2u32)])?
1277            .build()?;
1278
1279        let plan = LogicalPlanBuilder::from(table_scan)
1280            .filter(in_subquery(col("c") + lit(1u32), Arc::new(subquery)))?
1281            .project(vec![col("test.b")])?
1282            .build()?;
1283
1284        assert_optimized_plan_equal!(
1285            plan,
1286            @r"
1287        Projection: test.b [b:UInt32]
1288          LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq.c * UInt32(2) AND test.a + test.b = __correlated_sq_1.a + __correlated_sq_1.b [a:UInt32, b:UInt32, c:UInt32]
1289            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1290            SubqueryAlias: __correlated_sq_1 [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]
1291              Projection: sq.c * UInt32(2), sq.a, sq.b [sq.c * UInt32(2):UInt32, a:UInt32, b:UInt32]
1292                Filter: sq.a + UInt32(1) = sq.b [a:UInt32, b:UInt32, c:UInt32]
1293                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1294        "
1295        )
1296    }
1297
1298    #[test]
1299    fn two_in_subquery_with_outer_filter() -> Result<()> {
1300        let table_scan = test_table_scan()?;
1301        let subquery_scan1 = test_table_scan_with_name("sq1")?;
1302        let subquery_scan2 = test_table_scan_with_name("sq2")?;
1303
1304        let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1305            .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq1.a")))?
1306            .project(vec![col("c") * lit(2u32)])?
1307            .build()?;
1308
1309        let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1310            .filter(out_ref_col(DataType::UInt32, "test.a").gt(col("sq2.a")))?
1311            .project(vec![col("c") * lit(2u32)])?
1312            .build()?;
1313
1314        let plan = LogicalPlanBuilder::from(table_scan)
1315            .filter(
1316                in_subquery(col("c") + lit(1u32), Arc::new(subquery1)).and(
1317                    in_subquery(col("c") * lit(2u32), Arc::new(subquery2))
1318                        .and(col("test.c").gt(lit(1u32))),
1319                ),
1320            )?
1321            .project(vec![col("test.b")])?
1322            .build()?;
1323
1324        assert_optimized_plan_equal!(
1325            plan,
1326            @r"
1327        Projection: test.b [b:UInt32]
1328          Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]
1329            LeftSemi Join:  Filter: test.c * UInt32(2) = __correlated_sq_2.sq2.c * UInt32(2) AND test.a > __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
1330              LeftSemi Join:  Filter: test.c + UInt32(1) = __correlated_sq_1.sq1.c * UInt32(2) AND test.a > __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1331                TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1332                SubqueryAlias: __correlated_sq_1 [sq1.c * UInt32(2):UInt32, a:UInt32]
1333                  Projection: sq1.c * UInt32(2), sq1.a [sq1.c * UInt32(2):UInt32, a:UInt32]
1334                    TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]
1335              SubqueryAlias: __correlated_sq_2 [sq2.c * UInt32(2):UInt32, a:UInt32]
1336                Projection: sq2.c * UInt32(2), sq2.a [sq2.c * UInt32(2):UInt32, a:UInt32]
1337                  TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]
1338        "
1339        )
1340    }
1341
1342    #[test]
1343    fn in_subquery_with_same_table() -> Result<()> {
1344        let outer_scan = test_table_scan()?;
1345        let subquery_scan = test_table_scan()?;
1346        let subquery = LogicalPlanBuilder::from(subquery_scan)
1347            .filter(col("test.a").gt(col("test.b")))?
1348            .project(vec![col("c")])?
1349            .build()?;
1350
1351        let plan = LogicalPlanBuilder::from(outer_scan)
1352            .filter(in_subquery(col("test.a"), Arc::new(subquery)))?
1353            .project(vec![col("test.b")])?
1354            .build()?;
1355
1356        // Subquery and outer query refer to the same table.
1357        assert_optimized_plan_equal!(
1358            plan,
1359            @r"
1360        Projection: test.b [b:UInt32]
1361          LeftSemi Join:  Filter: test.a = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]
1362            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1363            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1364              Projection: test.c [c:UInt32]
1365                Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]
1366                  TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1367        "
1368        )
1369    }
1370
1371    /// Test for multiple exists subqueries in the same filter expression
1372    #[test]
1373    fn multiple_exists_subqueries() -> Result<()> {
1374        let orders = Arc::new(
1375            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1376                .filter(
1377                    col("orders.o_custkey")
1378                        .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1379                )?
1380                .project(vec![col("orders.o_custkey")])?
1381                .build()?,
1382        );
1383
1384        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1385            .filter(exists(Arc::clone(&orders)).and(exists(orders)))?
1386            .project(vec![col("customer.c_custkey")])?
1387            .build()?;
1388
1389        assert_optimized_plan_equal!(
1390            plan,
1391            @r"
1392        Projection: customer.c_custkey [c_custkey:Int64]
1393          LeftSemi Join:  Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1394            LeftSemi Join:  Filter: __correlated_sq_1.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1395              TableScan: customer [c_custkey:Int64, c_name:Utf8]
1396              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1397                Projection: orders.o_custkey [o_custkey:Int64]
1398                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1399            SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
1400              Projection: orders.o_custkey [o_custkey:Int64]
1401                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1402        "
1403        )
1404    }
1405
1406    /// Test recursive correlated subqueries
1407    #[test]
1408    fn recursive_exists_subqueries() -> Result<()> {
1409        let lineitem = Arc::new(
1410            LogicalPlanBuilder::from(scan_tpch_table("lineitem"))
1411                .filter(
1412                    col("lineitem.l_orderkey")
1413                        .eq(out_ref_col(DataType::Int64, "orders.o_orderkey")),
1414                )?
1415                .project(vec![col("lineitem.l_orderkey")])?
1416                .build()?,
1417        );
1418
1419        let orders = Arc::new(
1420            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1421                .filter(
1422                    exists(lineitem).and(
1423                        col("orders.o_custkey")
1424                            .eq(out_ref_col(DataType::Int64, "customer.c_custkey")),
1425                    ),
1426                )?
1427                .project(vec![col("orders.o_custkey")])?
1428                .build()?,
1429        );
1430
1431        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1432            .filter(exists(orders))?
1433            .project(vec![col("customer.c_custkey")])?
1434            .build()?;
1435
1436        assert_optimized_plan_equal!(
1437            plan,
1438            @r"
1439        Projection: customer.c_custkey [c_custkey:Int64]
1440          LeftSemi Join:  Filter: __correlated_sq_2.o_custkey = customer.c_custkey [c_custkey:Int64, c_name:Utf8]
1441            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1442            SubqueryAlias: __correlated_sq_2 [o_custkey:Int64]
1443              Projection: orders.o_custkey [o_custkey:Int64]
1444                LeftSemi Join:  Filter: __correlated_sq_1.l_orderkey = orders.o_orderkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1445                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1446                  SubqueryAlias: __correlated_sq_1 [l_orderkey:Int64]
1447                    Projection: lineitem.l_orderkey [l_orderkey:Int64]
1448                      TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]
1449        "
1450        )
1451    }
1452
1453    /// Test for correlated exists subquery filter with additional subquery filters
1454    #[test]
1455    fn exists_subquery_with_subquery_filters() -> Result<()> {
1456        let sq = Arc::new(
1457            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1458                .filter(
1459                    out_ref_col(DataType::Int64, "customer.c_custkey")
1460                        .eq(col("orders.o_custkey"))
1461                        .and(col("o_orderkey").eq(lit(1))),
1462                )?
1463                .project(vec![col("orders.o_custkey")])?
1464                .build()?,
1465        );
1466
1467        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1468            .filter(exists(sq))?
1469            .project(vec![col("customer.c_custkey")])?
1470            .build()?;
1471
1472        assert_optimized_plan_equal!(
1473            plan,
1474            @r"
1475        Projection: customer.c_custkey [c_custkey:Int64]
1476          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1477            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1478            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1479              Projection: orders.o_custkey [o_custkey:Int64]
1480                Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1481                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1482        "
1483        )
1484    }
1485
1486    #[test]
1487    fn exists_subquery_no_cols() -> Result<()> {
1488        let sq = Arc::new(
1489            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1490                .filter(out_ref_col(DataType::Int64, "customer.c_custkey").eq(lit(1u32)))?
1491                .project(vec![col("orders.o_custkey")])?
1492                .build()?,
1493        );
1494
1495        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1496            .filter(exists(sq))?
1497            .project(vec![col("customer.c_custkey")])?
1498            .build()?;
1499
1500        // Other rule will pushdown `customer.c_custkey = 1`,
1501        assert_optimized_plan_equal!(
1502            plan,
1503            @r"
1504        Projection: customer.c_custkey [c_custkey:Int64]
1505          LeftSemi Join:  Filter: customer.c_custkey = UInt32(1) [c_custkey:Int64, c_name:Utf8]
1506            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1507            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1508              Projection: orders.o_custkey [o_custkey:Int64]
1509                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1510        "
1511        )
1512    }
1513
1514    /// Test for exists subquery with both columns in schema
1515    #[test]
1516    fn exists_subquery_with_no_correlated_cols() -> Result<()> {
1517        let sq = Arc::new(
1518            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1519                .filter(col("orders.o_custkey").eq(col("orders.o_custkey")))?
1520                .project(vec![col("orders.o_custkey")])?
1521                .build()?,
1522        );
1523
1524        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1525            .filter(exists(sq))?
1526            .project(vec![col("customer.c_custkey")])?
1527            .build()?;
1528
1529        assert_optimized_plan_equal!(
1530            plan,
1531            @r"
1532        Projection: customer.c_custkey [c_custkey:Int64]
1533          LeftSemi Join:  Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8]
1534            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1535            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1536              Projection: orders.o_custkey [o_custkey:Int64]
1537                Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1538                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1539        "
1540        )
1541    }
1542
1543    /// Test for correlated exists subquery not equal
1544    #[test]
1545    fn exists_subquery_where_not_eq() -> Result<()> {
1546        let sq = Arc::new(
1547            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1548                .filter(
1549                    out_ref_col(DataType::Int64, "customer.c_custkey")
1550                        .not_eq(col("orders.o_custkey")),
1551                )?
1552                .project(vec![col("orders.o_custkey")])?
1553                .build()?,
1554        );
1555
1556        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1557            .filter(exists(sq))?
1558            .project(vec![col("customer.c_custkey")])?
1559            .build()?;
1560
1561        assert_optimized_plan_equal!(
1562            plan,
1563            @r"
1564        Projection: customer.c_custkey [c_custkey:Int64]
1565          LeftSemi Join:  Filter: customer.c_custkey != __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1566            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1567            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1568              Projection: orders.o_custkey [o_custkey:Int64]
1569                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1570        "
1571        )
1572    }
1573
1574    /// Test for correlated exists subquery less than
1575    #[test]
1576    fn exists_subquery_where_less_than() -> Result<()> {
1577        let sq = Arc::new(
1578            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1579                .filter(
1580                    out_ref_col(DataType::Int64, "customer.c_custkey")
1581                        .lt(col("orders.o_custkey")),
1582                )?
1583                .project(vec![col("orders.o_custkey")])?
1584                .build()?,
1585        );
1586
1587        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1588            .filter(exists(sq))?
1589            .project(vec![col("customer.c_custkey")])?
1590            .build()?;
1591
1592        assert_optimized_plan_equal!(
1593            plan,
1594            @r"
1595        Projection: customer.c_custkey [c_custkey:Int64]
1596          LeftSemi Join:  Filter: customer.c_custkey < __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1597            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1598            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1599              Projection: orders.o_custkey [o_custkey:Int64]
1600                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1601        "
1602        )
1603    }
1604
1605    /// Test for correlated exists subquery filter with subquery disjunction
1606    #[test]
1607    fn exists_subquery_with_subquery_disjunction() -> Result<()> {
1608        let sq = Arc::new(
1609            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1610                .filter(
1611                    out_ref_col(DataType::Int64, "customer.c_custkey")
1612                        .eq(col("orders.o_custkey"))
1613                        .or(col("o_orderkey").eq(lit(1))),
1614                )?
1615                .project(vec![col("orders.o_custkey")])?
1616                .build()?,
1617        );
1618
1619        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1620            .filter(exists(sq))?
1621            .project(vec![col("customer.c_custkey")])?
1622            .build()?;
1623
1624        assert_optimized_plan_equal!(
1625            plan,
1626            @r"
1627        Projection: customer.c_custkey [c_custkey:Int64]
1628          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey OR __correlated_sq_1.o_orderkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1629            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1630            SubqueryAlias: __correlated_sq_1 [o_custkey:Int64, o_orderkey:Int64]
1631              Projection: orders.o_custkey, orders.o_orderkey [o_custkey:Int64, o_orderkey:Int64]
1632                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1633        "
1634        )
1635    }
1636
1637    /// Test for correlated exists without projection
1638    #[test]
1639    fn exists_subquery_no_projection() -> Result<()> {
1640        let sq = Arc::new(
1641            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1642                .filter(
1643                    out_ref_col(DataType::Int64, "customer.c_custkey")
1644                        .eq(col("orders.o_custkey")),
1645                )?
1646                .build()?,
1647        );
1648
1649        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1650            .filter(exists(sq))?
1651            .project(vec![col("customer.c_custkey")])?
1652            .build()?;
1653
1654        assert_optimized_plan_equal!(
1655            plan,
1656            @r"
1657        Projection: customer.c_custkey [c_custkey:Int64]
1658          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1659            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1660            SubqueryAlias: __correlated_sq_1 [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1661              TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1662        "
1663        )
1664    }
1665
1666    /// Test for correlated exists expressions
1667    #[test]
1668    fn exists_subquery_project_expr() -> Result<()> {
1669        let sq = Arc::new(
1670            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1671                .filter(
1672                    out_ref_col(DataType::Int64, "customer.c_custkey")
1673                        .eq(col("orders.o_custkey")),
1674                )?
1675                .project(vec![col("orders.o_custkey").add(lit(1))])?
1676                .build()?,
1677        );
1678
1679        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1680            .filter(exists(sq))?
1681            .project(vec![col("customer.c_custkey")])?
1682            .build()?;
1683
1684        assert_optimized_plan_equal!(
1685            plan,
1686            @r"
1687        Projection: customer.c_custkey [c_custkey:Int64]
1688          LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1689            TableScan: customer [c_custkey:Int64, c_name:Utf8]
1690            SubqueryAlias: __correlated_sq_1 [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1691              Projection: orders.o_custkey + Int32(1), orders.o_custkey [orders.o_custkey + Int32(1):Int64, o_custkey:Int64]
1692                TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1693        "
1694        )
1695    }
1696
1697    /// Test for correlated exists subquery filter with additional filters
1698    #[test]
1699    fn exists_subquery_should_support_additional_filters() -> Result<()> {
1700        let sq = Arc::new(
1701            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1702                .filter(
1703                    out_ref_col(DataType::Int64, "customer.c_custkey")
1704                        .eq(col("orders.o_custkey")),
1705                )?
1706                .project(vec![col("orders.o_custkey")])?
1707                .build()?,
1708        );
1709        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1710            .filter(exists(sq).and(col("c_custkey").eq(lit(1))))?
1711            .project(vec![col("customer.c_custkey")])?
1712            .build()?;
1713
1714        assert_optimized_plan_equal!(
1715            plan,
1716            @r"
1717        Projection: customer.c_custkey [c_custkey:Int64]
1718          Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
1719            LeftSemi Join:  Filter: customer.c_custkey = __correlated_sq_1.o_custkey [c_custkey:Int64, c_name:Utf8]
1720              TableScan: customer [c_custkey:Int64, c_name:Utf8]
1721              SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1722                Projection: orders.o_custkey [o_custkey:Int64]
1723                  TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1724        "
1725        )
1726    }
1727
1728    /// Test for correlated exists subquery filter with disjunctions
1729    #[test]
1730    fn exists_subquery_disjunction() -> Result<()> {
1731        let sq = Arc::new(
1732            LogicalPlanBuilder::from(scan_tpch_table("orders"))
1733                .filter(col("customer.c_custkey").eq(col("orders.o_custkey")))?
1734                .project(vec![col("orders.o_custkey")])?
1735                .build()?,
1736        );
1737
1738        let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1739            .filter(exists(sq).or(col("customer.c_custkey").eq(lit(1))))?
1740            .project(vec![col("customer.c_custkey")])?
1741            .build()?;
1742
1743        assert_optimized_plan_equal!(
1744            plan,
1745            @r"
1746        Projection: customer.c_custkey [c_custkey:Int64]
1747          Projection: customer.c_custkey, customer.c_name [c_custkey:Int64, c_name:Utf8]
1748            Filter: __correlated_sq_1.mark OR customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8, mark:Boolean]
1749              LeftMark Join:  Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, mark:Boolean]
1750                TableScan: customer [c_custkey:Int64, c_name:Utf8]
1751                SubqueryAlias: __correlated_sq_1 [o_custkey:Int64]
1752                  Projection: orders.o_custkey [o_custkey:Int64]
1753                    Filter: customer.c_custkey = orders.o_custkey [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1754                      TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1755        "
1756        )
1757    }
1758
1759    /// Test for correlated EXISTS subquery filter
1760    #[test]
1761    fn exists_subquery_correlated() -> Result<()> {
1762        let sq = Arc::new(
1763            LogicalPlanBuilder::from(test_table_scan_with_name("sq")?)
1764                .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq.a")))?
1765                .project(vec![col("c")])?
1766                .build()?,
1767        );
1768
1769        let plan = LogicalPlanBuilder::from(test_table_scan_with_name("test")?)
1770            .filter(exists(sq))?
1771            .project(vec![col("test.c")])?
1772            .build()?;
1773
1774        assert_optimized_plan_equal!(
1775            plan,
1776            @r"
1777        Projection: test.c [c:UInt32]
1778          LeftSemi Join:  Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1779            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1780            SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1781              Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1782                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1783        "
1784        )
1785    }
1786
1787    /// Test for single exists subquery filter
1788    #[test]
1789    fn exists_subquery_simple() -> Result<()> {
1790        let table_scan = test_table_scan()?;
1791        let plan = LogicalPlanBuilder::from(table_scan)
1792            .filter(exists(test_subquery_with_name("sq")?))?
1793            .project(vec![col("test.b")])?
1794            .build()?;
1795
1796        assert_optimized_plan_equal!(
1797            plan,
1798            @r"
1799        Projection: test.b [b:UInt32]
1800          LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1801            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1802            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1803              Projection: sq.c [c:UInt32]
1804                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1805        "
1806        )
1807    }
1808
1809    /// Test for single NOT exists subquery filter
1810    #[test]
1811    fn not_exists_subquery_simple() -> Result<()> {
1812        let table_scan = test_table_scan()?;
1813        let plan = LogicalPlanBuilder::from(table_scan)
1814            .filter(not_exists(test_subquery_with_name("sq")?))?
1815            .project(vec![col("test.b")])?
1816            .build()?;
1817
1818        assert_optimized_plan_equal!(
1819            plan,
1820            @r"
1821        Projection: test.b [b:UInt32]
1822          LeftAnti Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1823            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1824            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1825              Projection: sq.c [c:UInt32]
1826                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1827        "
1828        )
1829    }
1830
1831    #[test]
1832    fn two_exists_subquery_with_outer_filter() -> Result<()> {
1833        let table_scan = test_table_scan()?;
1834        let subquery_scan1 = test_table_scan_with_name("sq1")?;
1835        let subquery_scan2 = test_table_scan_with_name("sq2")?;
1836
1837        let subquery1 = LogicalPlanBuilder::from(subquery_scan1)
1838            .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq1.a")))?
1839            .project(vec![col("c")])?
1840            .build()?;
1841
1842        let subquery2 = LogicalPlanBuilder::from(subquery_scan2)
1843            .filter(out_ref_col(DataType::UInt32, "test.a").eq(col("sq2.a")))?
1844            .project(vec![col("c")])?
1845            .build()?;
1846
1847        let plan = LogicalPlanBuilder::from(table_scan)
1848            .filter(
1849                exists(Arc::new(subquery1))
1850                    .and(exists(Arc::new(subquery2)).and(col("test.c").gt(lit(1u32)))),
1851            )?
1852            .project(vec![col("test.b")])?
1853            .build()?;
1854
1855        assert_optimized_plan_equal!(
1856            plan,
1857            @r"
1858        Projection: test.b [b:UInt32]
1859          Filter: test.c > UInt32(1) [a:UInt32, b:UInt32, c:UInt32]
1860            LeftSemi Join:  Filter: test.a = __correlated_sq_2.a [a:UInt32, b:UInt32, c:UInt32]
1861              LeftSemi Join:  Filter: test.a = __correlated_sq_1.a [a:UInt32, b:UInt32, c:UInt32]
1862                TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1863                SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1864                  Projection: sq1.c, sq1.a [c:UInt32, a:UInt32]
1865                    TableScan: sq1 [a:UInt32, b:UInt32, c:UInt32]
1866              SubqueryAlias: __correlated_sq_2 [c:UInt32, a:UInt32]
1867                Projection: sq2.c, sq2.a [c:UInt32, a:UInt32]
1868                  TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]
1869        "
1870        )
1871    }
1872
1873    #[test]
1874    fn exists_subquery_expr_filter() -> Result<()> {
1875        let table_scan = test_table_scan()?;
1876        let subquery_scan = test_table_scan_with_name("sq")?;
1877        let subquery = LogicalPlanBuilder::from(subquery_scan)
1878            .filter(
1879                (lit(1u32) + col("sq.a"))
1880                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1881            )?
1882            .project(vec![lit(1u32)])?
1883            .build()?;
1884        let plan = LogicalPlanBuilder::from(table_scan)
1885            .filter(exists(Arc::new(subquery)))?
1886            .project(vec![col("test.b")])?
1887            .build()?;
1888
1889        assert_optimized_plan_equal!(
1890            plan,
1891            @r"
1892        Projection: test.b [b:UInt32]
1893          LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1894            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1895            SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, a:UInt32]
1896              Projection: UInt32(1), sq.a [UInt32(1):UInt32, a:UInt32]
1897                TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1898        "
1899        )
1900    }
1901
1902    #[test]
1903    fn exists_subquery_with_same_table() -> Result<()> {
1904        let outer_scan = test_table_scan()?;
1905        let subquery_scan = test_table_scan()?;
1906        let subquery = LogicalPlanBuilder::from(subquery_scan)
1907            .filter(col("test.a").gt(col("test.b")))?
1908            .project(vec![col("c")])?
1909            .build()?;
1910
1911        let plan = LogicalPlanBuilder::from(outer_scan)
1912            .filter(exists(Arc::new(subquery)))?
1913            .project(vec![col("test.b")])?
1914            .build()?;
1915
1916        // Subquery and outer query refer to the same table.
1917        assert_optimized_plan_equal!(
1918            plan,
1919            @r"
1920        Projection: test.b [b:UInt32]
1921          LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
1922            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1923            SubqueryAlias: __correlated_sq_1 [c:UInt32]
1924              Projection: test.c [c:UInt32]
1925                Filter: test.a > test.b [a:UInt32, b:UInt32, c:UInt32]
1926                  TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1927        "
1928        )
1929    }
1930
1931    #[test]
1932    fn exists_distinct_subquery() -> Result<()> {
1933        let table_scan = test_table_scan()?;
1934        let subquery_scan = test_table_scan_with_name("sq")?;
1935        let subquery = LogicalPlanBuilder::from(subquery_scan)
1936            .filter(
1937                (lit(1u32) + col("sq.a"))
1938                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1939            )?
1940            .project(vec![col("sq.c")])?
1941            .distinct()?
1942            .build()?;
1943        let plan = LogicalPlanBuilder::from(table_scan)
1944            .filter(exists(Arc::new(subquery)))?
1945            .project(vec![col("test.b")])?
1946            .build()?;
1947
1948        assert_optimized_plan_equal!(
1949            plan,
1950            @r"
1951        Projection: test.b [b:UInt32]
1952          LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1953            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1954            SubqueryAlias: __correlated_sq_1 [c:UInt32, a:UInt32]
1955              Distinct: [c:UInt32, a:UInt32]
1956                Projection: sq.c, sq.a [c:UInt32, a:UInt32]
1957                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1958        "
1959        )
1960    }
1961
1962    #[test]
1963    fn exists_distinct_expr_subquery() -> Result<()> {
1964        let table_scan = test_table_scan()?;
1965        let subquery_scan = test_table_scan_with_name("sq")?;
1966        let subquery = LogicalPlanBuilder::from(subquery_scan)
1967            .filter(
1968                (lit(1u32) + col("sq.a"))
1969                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
1970            )?
1971            .project(vec![col("sq.b") + col("sq.c")])?
1972            .distinct()?
1973            .build()?;
1974        let plan = LogicalPlanBuilder::from(table_scan)
1975            .filter(exists(Arc::new(subquery)))?
1976            .project(vec![col("test.b")])?
1977            .build()?;
1978
1979        assert_optimized_plan_equal!(
1980            plan,
1981            @r"
1982        Projection: test.b [b:UInt32]
1983          LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
1984            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
1985            SubqueryAlias: __correlated_sq_1 [sq.b + sq.c:UInt32, a:UInt32]
1986              Distinct: [sq.b + sq.c:UInt32, a:UInt32]
1987                Projection: sq.b + sq.c, sq.a [sq.b + sq.c:UInt32, a:UInt32]
1988                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
1989        "
1990        )
1991    }
1992
1993    #[test]
1994    fn exists_distinct_subquery_with_literal() -> Result<()> {
1995        let table_scan = test_table_scan()?;
1996        let subquery_scan = test_table_scan_with_name("sq")?;
1997        let subquery = LogicalPlanBuilder::from(subquery_scan)
1998            .filter(
1999                (lit(1u32) + col("sq.a"))
2000                    .gt(out_ref_col(DataType::UInt32, "test.a") * lit(2u32)),
2001            )?
2002            .project(vec![lit(1u32), col("sq.c")])?
2003            .distinct()?
2004            .build()?;
2005        let plan = LogicalPlanBuilder::from(table_scan)
2006            .filter(exists(Arc::new(subquery)))?
2007            .project(vec![col("test.b")])?
2008            .build()?;
2009
2010        assert_optimized_plan_equal!(
2011            plan,
2012            @r"
2013        Projection: test.b [b:UInt32]
2014          LeftSemi Join:  Filter: UInt32(1) + __correlated_sq_1.a > test.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32]
2015            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
2016            SubqueryAlias: __correlated_sq_1 [UInt32(1):UInt32, c:UInt32, a:UInt32]
2017              Distinct: [UInt32(1):UInt32, c:UInt32, a:UInt32]
2018                Projection: UInt32(1), sq.c, sq.a [UInt32(1):UInt32, c:UInt32, a:UInt32]
2019                  TableScan: sq [a:UInt32, b:UInt32, c:UInt32]
2020        "
2021        )
2022    }
2023
2024    #[test]
2025    fn exists_uncorrelated_unnest() -> Result<()> {
2026        let subquery_table_source = table_source(&Schema::new(vec![Field::new(
2027            "arr",
2028            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
2029            true,
2030        )]));
2031        let subquery = LogicalPlanBuilder::scan_with_filters(
2032            "sq",
2033            subquery_table_source,
2034            None,
2035            vec![],
2036        )?
2037        .unnest_column("arr")?
2038        .build()?;
2039        let table_scan = test_table_scan()?;
2040        let plan = LogicalPlanBuilder::from(table_scan)
2041            .filter(exists(Arc::new(subquery)))?
2042            .project(vec![col("test.b")])?
2043            .build()?;
2044
2045        assert_optimized_plan_equal!(
2046            plan,
2047            @r"
2048        Projection: test.b [b:UInt32]
2049          LeftSemi Join:  Filter: Boolean(true) [a:UInt32, b:UInt32, c:UInt32]
2050            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
2051            SubqueryAlias: __correlated_sq_1 [arr:Int32;N]
2052              Unnest: lists[sq.arr|depth=1] structs[] [arr:Int32;N]
2053                TableScan: sq [arr:List(Int32);N]
2054        "
2055        )
2056    }
2057
2058    #[test]
2059    fn exists_correlated_unnest() -> Result<()> {
2060        let table_scan = test_table_scan()?;
2061        let subquery_table_source = table_source(&Schema::new(vec![Field::new(
2062            "a",
2063            DataType::List(Arc::new(Field::new_list_field(DataType::UInt32, true))),
2064            true,
2065        )]));
2066        let subquery = LogicalPlanBuilder::scan_with_filters(
2067            "sq",
2068            subquery_table_source,
2069            None,
2070            vec![],
2071        )?
2072        .unnest_column("a")?
2073        .filter(col("a").eq(out_ref_col(DataType::UInt32, "test.b")))?
2074        .build()?;
2075        let plan = LogicalPlanBuilder::from(table_scan)
2076            .filter(exists(Arc::new(subquery)))?
2077            .project(vec![col("test.b")])?
2078            .build()?;
2079
2080        assert_optimized_plan_equal!(
2081            plan,
2082            @r"
2083        Projection: test.b [b:UInt32]
2084          LeftSemi Join:  Filter: __correlated_sq_1.a = test.b [a:UInt32, b:UInt32, c:UInt32]
2085            TableScan: test [a:UInt32, b:UInt32, c:UInt32]
2086            SubqueryAlias: __correlated_sq_1 [a:UInt32;N]
2087              Unnest: lists[sq.a|depth=1] structs[] [a:UInt32;N]
2088                TableScan: sq [a:List(UInt32);N]
2089        "
2090        )
2091    }
2092
2093    #[test]
2094    fn upper_case_ident() -> Result<()> {
2095        let fields = vec![
2096            Field::new("A", DataType::UInt32, false),
2097            Field::new("B", DataType::UInt32, false),
2098        ];
2099
2100        let schema = Schema::new(fields);
2101        let table_scan_a = table_scan(Some("\"TEST_A\""), &schema, None)?.build()?;
2102        let table_scan_b = table_scan(Some("\"TEST_B\""), &schema, None)?.build()?;
2103
2104        let subquery = LogicalPlanBuilder::from(table_scan_b)
2105            .filter(col("\"A\"").eq(out_ref_col(DataType::UInt32, "\"TEST_A\".\"A\"")))?
2106            .project(vec![lit(1)])?
2107            .build()?;
2108
2109        let plan = LogicalPlanBuilder::from(table_scan_a)
2110            .filter(exists(Arc::new(subquery)))?
2111            .project(vec![col("\"TEST_A\".\"B\"")])?
2112            .build()?;
2113
2114        assert_optimized_plan_equal!(
2115            plan,
2116            @r"
2117        Projection: TEST_A.B [B:UInt32]
2118          LeftSemi Join:  Filter: __correlated_sq_1.A = TEST_A.A [A:UInt32, B:UInt32]
2119            TableScan: TEST_A [A:UInt32, B:UInt32]
2120            SubqueryAlias: __correlated_sq_1 [Int32(1):Int32, A:UInt32]
2121              Projection: Int32(1), TEST_B.A [Int32(1):Int32, A:UInt32]
2122                TableScan: TEST_B [A:UInt32, B:UInt32]
2123        "
2124        )
2125    }
2126}