datafusion_physical_optimizer/
projection_pushdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This file implements the `ProjectionPushdown` physical optimization rule.
19//! The function [`remove_unnecessary_projections`] tries to push down all
20//! projections one by one if the operator below is amenable to this. If a
21//! projection reaches a source, it can even disappear from the plan entirely.
22
23use crate::PhysicalOptimizerRule;
24use arrow::datatypes::{Fields, Schema, SchemaRef};
25use datafusion_common::alias::AliasGenerator;
26use std::collections::HashSet;
27use std::sync::Arc;
28
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::tree_node::{
31    Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
32};
33use datafusion_common::{JoinSide, JoinType, Result};
34use datafusion_physical_expr::expressions::Column;
35use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
36use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
37use datafusion_physical_plan::joins::NestedLoopJoinExec;
38use datafusion_physical_plan::projection::{
39    remove_unnecessary_projections, ProjectionExec,
40};
41use datafusion_physical_plan::ExecutionPlan;
42
43/// This rule inspects `ProjectionExec`'s in the given physical plan and tries to
44/// remove or swap with its child.
45///
46/// Furthermore, tries to push down projections from nested loop join filters that only depend on
47/// one side of the join. By pushing these projections down, functions that only depend on one side
48/// of the join must be evaluated for the cartesian product of the two sides.
49#[derive(Default, Debug)]
50pub struct ProjectionPushdown {}
51
52impl ProjectionPushdown {
53    #[allow(missing_docs)]
54    pub fn new() -> Self {
55        Self {}
56    }
57}
58
59impl PhysicalOptimizerRule for ProjectionPushdown {
60    fn optimize(
61        &self,
62        plan: Arc<dyn ExecutionPlan>,
63        _config: &ConfigOptions,
64    ) -> Result<Arc<dyn ExecutionPlan>> {
65        let alias_generator = AliasGenerator::new();
66        let plan = plan
67            .transform_up(|plan| {
68                match plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
69                    None => Ok(Transformed::no(plan)),
70                    Some(hash_join) => try_push_down_join_filter(
71                        Arc::clone(&plan),
72                        hash_join,
73                        &alias_generator,
74                    ),
75                }
76            })
77            .map(|t| t.data)?;
78
79        plan.transform_down(remove_unnecessary_projections).data()
80    }
81
82    fn name(&self) -> &str {
83        "ProjectionPushdown"
84    }
85
86    fn schema_check(&self) -> bool {
87        true
88    }
89}
90
91/// Tries to push down parts of the filter.
92///
93/// See [JoinFilterRewriter] for details.
94fn try_push_down_join_filter(
95    original_plan: Arc<dyn ExecutionPlan>,
96    join: &NestedLoopJoinExec,
97    alias_generator: &AliasGenerator,
98) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
99    // Mark joins are currently not supported.
100    if matches!(join.join_type(), JoinType::LeftMark | JoinType::RightMark) {
101        return Ok(Transformed::no(original_plan));
102    }
103
104    let projections = join.projection();
105    let Some(filter) = join.filter() else {
106        return Ok(Transformed::no(original_plan));
107    };
108
109    let original_lhs_length = join.left().schema().fields().len();
110    let original_rhs_length = join.right().schema().fields().len();
111
112    let lhs_rewrite = try_push_down_projection(
113        Arc::clone(&join.right().schema()),
114        Arc::clone(join.left()),
115        JoinSide::Left,
116        filter.clone(),
117        alias_generator,
118    )?;
119    let rhs_rewrite = try_push_down_projection(
120        Arc::clone(&lhs_rewrite.data.0.schema()),
121        Arc::clone(join.right()),
122        JoinSide::Right,
123        lhs_rewrite.data.1,
124        alias_generator,
125    )?;
126    if !lhs_rewrite.transformed && !rhs_rewrite.transformed {
127        return Ok(Transformed::no(original_plan));
128    }
129
130    let join_filter = minimize_join_filter(
131        Arc::clone(rhs_rewrite.data.1.expression()),
132        rhs_rewrite.data.1.column_indices().to_vec(),
133        lhs_rewrite.data.0.schema().as_ref(),
134        rhs_rewrite.data.0.schema().as_ref(),
135    );
136
137    let new_lhs_length = lhs_rewrite.data.0.schema().fields.len();
138    let projections = match projections {
139        None => match join.join_type() {
140            JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
141                // Build projections that ignore the newly projected columns.
142                let mut projections = Vec::new();
143                projections.extend(0..original_lhs_length);
144                projections.extend(new_lhs_length..new_lhs_length + original_rhs_length);
145                projections
146            }
147            JoinType::LeftSemi | JoinType::LeftAnti => {
148                // Only return original left columns
149                let mut projections = Vec::new();
150                projections.extend(0..original_lhs_length);
151                projections
152            }
153            JoinType::RightSemi | JoinType::RightAnti => {
154                // Only return original right columns
155                let mut projections = Vec::new();
156                projections.extend(0..original_rhs_length);
157                projections
158            }
159            _ => unreachable!("Unsupported join type"),
160        },
161        Some(projections) => {
162            let rhs_offset = new_lhs_length - original_lhs_length;
163            projections
164                .iter()
165                .map(|idx| {
166                    if *idx >= original_lhs_length {
167                        idx + rhs_offset
168                    } else {
169                        *idx
170                    }
171                })
172                .collect()
173        }
174    };
175
176    Ok(Transformed::yes(Arc::new(NestedLoopJoinExec::try_new(
177        lhs_rewrite.data.0,
178        rhs_rewrite.data.0,
179        Some(join_filter),
180        join.join_type(),
181        Some(projections),
182    )?)))
183}
184
185/// Tries to push down parts of `expr` into the `join_side`.
186fn try_push_down_projection(
187    other_schema: SchemaRef,
188    plan: Arc<dyn ExecutionPlan>,
189    join_side: JoinSide,
190    join_filter: JoinFilter,
191    alias_generator: &AliasGenerator,
192) -> Result<Transformed<(Arc<dyn ExecutionPlan>, JoinFilter)>> {
193    let expr = Arc::clone(join_filter.expression());
194    let original_plan_schema = plan.schema();
195    let mut rewriter = JoinFilterRewriter::new(
196        join_side,
197        original_plan_schema.as_ref(),
198        join_filter.column_indices().to_vec(),
199        alias_generator,
200    );
201    let new_expr = rewriter.rewrite(expr)?;
202
203    if new_expr.transformed {
204        let new_join_side =
205            ProjectionExec::try_new(rewriter.join_side_projections, plan)?;
206        let new_schema = Arc::clone(&new_join_side.schema());
207
208        let (lhs_schema, rhs_schema) = match join_side {
209            JoinSide::Left => (new_schema, other_schema),
210            JoinSide::Right => (other_schema, new_schema),
211            JoinSide::None => unreachable!("Mark join not supported"),
212        };
213        let intermediate_schema = rewriter
214            .intermediate_column_indices
215            .iter()
216            .map(|ci| match ci.side {
217                JoinSide::Left => Arc::clone(&lhs_schema.fields[ci.index]),
218                JoinSide::Right => Arc::clone(&rhs_schema.fields[ci.index]),
219                JoinSide::None => unreachable!("Mark join not supported"),
220            })
221            .collect::<Fields>();
222
223        let join_filter = JoinFilter::new(
224            new_expr.data,
225            rewriter.intermediate_column_indices,
226            Arc::new(Schema::new(intermediate_schema)),
227        );
228        Ok(Transformed::yes((Arc::new(new_join_side), join_filter)))
229    } else {
230        Ok(Transformed::no((plan, join_filter)))
231    }
232}
233
234/// Creates a new [JoinFilter] and tries to minimize the internal schema.
235///
236/// This could eliminate some columns that were only part of a computation that has been pushed
237/// down. As this computation is now materialized on one side of the join, the original input
238/// columns are not needed anymore.
239fn minimize_join_filter(
240    expr: Arc<dyn PhysicalExpr>,
241    old_column_indices: Vec<ColumnIndex>,
242    lhs_schema: &Schema,
243    rhs_schema: &Schema,
244) -> JoinFilter {
245    let mut used_columns = HashSet::new();
246    expr.apply(|expr| {
247        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
248            used_columns.insert(col.index());
249        }
250        Ok(TreeNodeRecursion::Continue)
251    })
252    .expect("Closure cannot fail");
253
254    let new_column_indices = old_column_indices
255        .iter()
256        .enumerate()
257        .filter(|(idx, _)| used_columns.contains(idx))
258        .map(|(_, ci)| ci.clone())
259        .collect::<Vec<_>>();
260    let fields = new_column_indices
261        .iter()
262        .map(|ci| match ci.side {
263            JoinSide::Left => lhs_schema.field(ci.index).clone(),
264            JoinSide::Right => rhs_schema.field(ci.index).clone(),
265            JoinSide::None => unreachable!("Mark join not supported"),
266        })
267        .collect::<Fields>();
268
269    let final_expr = expr
270        .transform_up(|expr| match expr.as_any().downcast_ref::<Column>() {
271            None => Ok(Transformed::no(expr)),
272            Some(column) => {
273                let new_idx = used_columns
274                    .iter()
275                    .filter(|idx| **idx < column.index())
276                    .count();
277                let new_column = Column::new(column.name(), new_idx);
278                Ok(Transformed::yes(
279                    Arc::new(new_column) as Arc<dyn PhysicalExpr>
280                ))
281            }
282        })
283        .expect("Closure cannot fail");
284
285    JoinFilter::new(
286        final_expr.data,
287        new_column_indices,
288        Arc::new(Schema::new(fields)),
289    )
290}
291
292/// Implements the push-down machinery.
293///
294/// The rewriter starts at the top of the filter expression and traverses the expression tree. For
295/// each (sub-)expression, the rewriter checks whether it only refers to one side of the join. If
296/// this is never the case, no subexpressions of the filter can be pushed down. If there is a
297/// subexpression that can be computed using only one side of the join, the entire subexpression is
298/// pushed down to the join side.
299struct JoinFilterRewriter<'a> {
300    join_side: JoinSide,
301    join_side_schema: &'a Schema,
302    join_side_projections: Vec<(Arc<dyn PhysicalExpr>, String)>,
303    intermediate_column_indices: Vec<ColumnIndex>,
304    alias_generator: &'a AliasGenerator,
305}
306
307impl<'a> JoinFilterRewriter<'a> {
308    /// Creates a new [JoinFilterRewriter].
309    fn new(
310        join_side: JoinSide,
311        join_side_schema: &'a Schema,
312        column_indices: Vec<ColumnIndex>,
313        alias_generator: &'a AliasGenerator,
314    ) -> Self {
315        let projections = join_side_schema
316            .fields()
317            .iter()
318            .enumerate()
319            .map(|(idx, field)| {
320                (
321                    Arc::new(Column::new(field.name(), idx)) as Arc<dyn PhysicalExpr>,
322                    field.name().to_string(),
323                )
324            })
325            .collect();
326
327        Self {
328            join_side,
329            join_side_schema,
330            join_side_projections: projections,
331            intermediate_column_indices: column_indices,
332            alias_generator,
333        }
334    }
335
336    /// Executes the push-down machinery on `expr`.
337    ///
338    /// See the [JoinFilterRewriter] for further information.
339    fn rewrite(
340        &mut self,
341        expr: Arc<dyn PhysicalExpr>,
342    ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
343        let depends_on_this_side = self.depends_on_join_side(&expr, self.join_side)?;
344        // We don't push down things that do not depend on this side (other side or no side).
345        if !depends_on_this_side {
346            return Ok(Transformed::no(expr));
347        }
348
349        // Recurse if there is a dependency to both sides or if the entire expression is volatile.
350        let depends_on_other_side =
351            self.depends_on_join_side(&expr, self.join_side.negate())?;
352        let is_volatile = is_volatile_expression_tree(expr.as_ref());
353        if depends_on_other_side || is_volatile {
354            return expr.map_children(|expr| self.rewrite(expr));
355        }
356
357        // There is only a dependency on this side.
358
359        // If this expression has no children, we do not push down, as it should already be a column
360        // reference.
361        if expr.children().is_empty() {
362            return Ok(Transformed::no(expr));
363        }
364
365        // Otherwise, we push down a projection.
366        let alias = self.alias_generator.next("join_proj_push_down");
367        let idx = self.create_new_column(alias.clone(), expr)?;
368
369        Ok(Transformed::yes(
370            Arc::new(Column::new(&alias, idx)) as Arc<dyn PhysicalExpr>
371        ))
372    }
373
374    /// Creates a new column in the current join side.
375    fn create_new_column(
376        &mut self,
377        name: String,
378        expr: Arc<dyn PhysicalExpr>,
379    ) -> Result<usize> {
380        // First, add a new projection. The expression must be rewritten, as it is no longer
381        // executed against the filter schema.
382        let new_idx = self.join_side_projections.len();
383        let rewritten_expr = expr.transform_up(|expr| {
384            Ok(match expr.as_any().downcast_ref::<Column>() {
385                None => Transformed::no(expr),
386                Some(column) => {
387                    let intermediate_column =
388                        &self.intermediate_column_indices[column.index()];
389                    assert_eq!(intermediate_column.side, self.join_side);
390
391                    let join_side_index = intermediate_column.index;
392                    let field = self.join_side_schema.field(join_side_index);
393                    let new_column = Column::new(field.name(), join_side_index);
394                    Transformed::yes(Arc::new(new_column) as Arc<dyn PhysicalExpr>)
395                }
396            })
397        })?;
398        self.join_side_projections.push((rewritten_expr.data, name));
399
400        // Then, update the column indices
401        let new_intermediate_idx = self.intermediate_column_indices.len();
402        let idx = ColumnIndex {
403            index: new_idx,
404            side: self.join_side,
405        };
406        self.intermediate_column_indices.push(idx);
407
408        Ok(new_intermediate_idx)
409    }
410
411    /// Checks whether the entire expression depends on the given `join_side`.
412    fn depends_on_join_side(
413        &mut self,
414        expr: &Arc<dyn PhysicalExpr>,
415        join_side: JoinSide,
416    ) -> Result<bool> {
417        let mut result = false;
418        expr.apply(|expr| match expr.as_any().downcast_ref::<Column>() {
419            None => Ok(TreeNodeRecursion::Continue),
420            Some(c) => {
421                let column_index = &self.intermediate_column_indices[c.index()];
422                if column_index.side == join_side {
423                    result = true;
424                    return Ok(TreeNodeRecursion::Stop);
425                }
426                Ok(TreeNodeRecursion::Continue)
427            }
428        })?;
429
430        Ok(result)
431    }
432}
433
434fn is_volatile_expression_tree(expr: &dyn PhysicalExpr) -> bool {
435    if expr.is_volatile_node() {
436        return true;
437    }
438
439    expr.children()
440        .iter()
441        .map(|expr| is_volatile_expression_tree(expr.as_ref()))
442        .reduce(|lhs, rhs| lhs || rhs)
443        .unwrap_or(false)
444}
445
446#[cfg(test)]
447mod test {
448    use super::*;
449    use arrow::datatypes::{DataType, Field, FieldRef, Schema};
450    use datafusion_expr_common::operator::Operator;
451    use datafusion_functions::math::random;
452    use datafusion_physical_expr::expressions::{binary, lit};
453    use datafusion_physical_expr::ScalarFunctionExpr;
454    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
455    use datafusion_physical_plan::displayable;
456    use datafusion_physical_plan::empty::EmptyExec;
457    use insta::assert_snapshot;
458    use std::sync::Arc;
459
460    #[tokio::test]
461    async fn no_computation_does_not_project() -> Result<()> {
462        let (left_schema, right_schema) = create_simple_schemas();
463        let optimized_plan = run_test(
464            left_schema,
465            right_schema,
466            a_x(),
467            None,
468            a_greater_than_x,
469            JoinType::Inner,
470        )?;
471
472        assert_snapshot!(optimized_plan, @r"
473        NestedLoopJoinExec: join_type=Inner, filter=a@0 > x@1
474          EmptyExec
475          EmptyExec
476        ");
477        Ok(())
478    }
479
480    #[tokio::test]
481    async fn simple_push_down() -> Result<()> {
482        let (left_schema, right_schema) = create_simple_schemas();
483        let optimized_plan = run_test(
484            left_schema,
485            right_schema,
486            a_x(),
487            None,
488            a_plus_one_greater_than_x_plus_one,
489            JoinType::Inner,
490        )?;
491
492        assert_snapshot!(optimized_plan, @r"
493        NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, x@2]
494          ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
495            EmptyExec
496          ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
497            EmptyExec
498        ");
499        Ok(())
500    }
501
502    #[tokio::test]
503    async fn does_not_push_down_short_circuiting_expressions() -> Result<()> {
504        let (left_schema, right_schema) = create_simple_schemas();
505        let optimized_plan = run_test(
506            left_schema,
507            right_schema,
508            a_x(),
509            None,
510            |schema| {
511                binary(
512                    lit(false),
513                    Operator::And,
514                    a_plus_one_greater_than_x_plus_one(schema)?,
515                    schema,
516                )
517            },
518            JoinType::Inner,
519        )?;
520
521        assert_snapshot!(optimized_plan, @r"
522        NestedLoopJoinExec: join_type=Inner, filter=false AND join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, x@2]
523          ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
524            EmptyExec
525          ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
526            EmptyExec
527        ");
528        Ok(())
529    }
530
531    #[tokio::test]
532    async fn does_not_push_down_volatile_functions() -> Result<()> {
533        let (left_schema, right_schema) = create_simple_schemas();
534        let optimized_plan = run_test(
535            left_schema,
536            right_schema,
537            a_x(),
538            None,
539            a_plus_rand_greater_than_x,
540            JoinType::Inner,
541        )?;
542
543        assert_snapshot!(optimized_plan, @r"
544        NestedLoopJoinExec: join_type=Inner, filter=a@0 + rand() > x@1
545          EmptyExec
546          EmptyExec
547        ");
548        Ok(())
549    }
550
551    #[tokio::test]
552    async fn complex_schema_push_down() -> Result<()> {
553        let (left_schema, right_schema) = create_complex_schemas();
554
555        let optimized_plan = run_test(
556            left_schema,
557            right_schema,
558            a_b_x_z(),
559            None,
560            a_plus_b_greater_than_x_plus_z,
561            JoinType::Inner,
562        )?;
563
564        assert_snapshot!(optimized_plan, @r"
565        NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, b@1, c@2, x@4, y@5, z@6]
566          ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, a@0 + b@1 as join_proj_push_down_1]
567            EmptyExec
568          ProjectionExec: expr=[x@0 as x, y@1 as y, z@2 as z, x@0 + z@2 as join_proj_push_down_2]
569            EmptyExec
570        ");
571        Ok(())
572    }
573
574    #[tokio::test]
575    async fn push_down_with_existing_projections() -> Result<()> {
576        let (left_schema, right_schema) = create_complex_schemas();
577
578        let optimized_plan = run_test(
579            left_schema,
580            right_schema,
581            a_b_x_z(),
582            Some(vec![1, 3, 5]), // ("b", "x", "z")
583            a_plus_b_greater_than_x_plus_z,
584            JoinType::Inner,
585        )?;
586
587        assert_snapshot!(optimized_plan, @r"
588        NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[b@1, x@4, z@6]
589          ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, a@0 + b@1 as join_proj_push_down_1]
590            EmptyExec
591          ProjectionExec: expr=[x@0 as x, y@1 as y, z@2 as z, x@0 + z@2 as join_proj_push_down_2]
592            EmptyExec
593        ");
594        Ok(())
595    }
596
597    #[tokio::test]
598    async fn left_semi_join_projection() -> Result<()> {
599        let (left_schema, right_schema) = create_simple_schemas();
600
601        let left_semi_join_plan = run_test(
602            left_schema.clone(),
603            right_schema.clone(),
604            a_x(),
605            None,
606            a_plus_one_greater_than_x_plus_one,
607            JoinType::LeftSemi,
608        )?;
609
610        assert_snapshot!(left_semi_join_plan, @r"
611        NestedLoopJoinExec: join_type=LeftSemi, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0]
612          ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
613            EmptyExec
614          ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
615            EmptyExec
616        ");
617        Ok(())
618    }
619
620    #[tokio::test]
621    async fn right_semi_join_projection() -> Result<()> {
622        let (left_schema, right_schema) = create_simple_schemas();
623        let right_semi_join_plan = run_test(
624            left_schema,
625            right_schema,
626            a_x(),
627            None,
628            a_plus_one_greater_than_x_plus_one,
629            JoinType::RightSemi,
630        )?;
631        assert_snapshot!(right_semi_join_plan, @r"
632        NestedLoopJoinExec: join_type=RightSemi, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[x@0]
633          ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
634            EmptyExec
635          ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
636            EmptyExec
637        ");
638        Ok(())
639    }
640
641    fn run_test(
642        left_schema: Schema,
643        right_schema: Schema,
644        column_indices: Vec<ColumnIndex>,
645        existing_projections: Option<Vec<usize>>,
646        filter_expr_builder: impl FnOnce(&Schema) -> Result<Arc<dyn PhysicalExpr>>,
647        join_type: JoinType,
648    ) -> Result<String> {
649        let left = Arc::new(EmptyExec::new(Arc::new(left_schema.clone())));
650        let right = Arc::new(EmptyExec::new(Arc::new(right_schema.clone())));
651
652        let join_fields: Vec<_> = column_indices
653            .iter()
654            .map(|ci| match ci.side {
655                JoinSide::Left => left_schema.field(ci.index).clone(),
656                JoinSide::Right => right_schema.field(ci.index).clone(),
657                JoinSide::None => unreachable!(),
658            })
659            .collect();
660        let join_schema = Arc::new(Schema::new(join_fields));
661
662        let filter_expr = filter_expr_builder(join_schema.as_ref())?;
663
664        let join_filter = JoinFilter::new(filter_expr, column_indices, join_schema);
665
666        let join = NestedLoopJoinExec::try_new(
667            left,
668            right,
669            Some(join_filter),
670            &join_type,
671            existing_projections,
672        )?;
673
674        let optimizer = ProjectionPushdown::new();
675        let optimized_plan = optimizer.optimize(Arc::new(join), &Default::default())?;
676
677        let displayable_plan = displayable(optimized_plan.as_ref()).indent(false);
678        Ok(displayable_plan.to_string())
679    }
680
681    fn create_simple_schemas() -> (Schema, Schema) {
682        let left_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
683        let right_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
684
685        (left_schema, right_schema)
686    }
687
688    fn create_complex_schemas() -> (Schema, Schema) {
689        let left_schema = Schema::new(vec![
690            Field::new("a", DataType::Int32, false),
691            Field::new("b", DataType::Int32, false),
692            Field::new("c", DataType::Int32, false),
693        ]);
694
695        let right_schema = Schema::new(vec![
696            Field::new("x", DataType::Int32, false),
697            Field::new("y", DataType::Int32, false),
698            Field::new("z", DataType::Int32, false),
699        ]);
700
701        (left_schema, right_schema)
702    }
703
704    fn a_x() -> Vec<ColumnIndex> {
705        vec![
706            ColumnIndex {
707                index: 0,
708                side: JoinSide::Left,
709            },
710            ColumnIndex {
711                index: 0,
712                side: JoinSide::Right,
713            },
714        ]
715    }
716
717    fn a_b_x_z() -> Vec<ColumnIndex> {
718        vec![
719            ColumnIndex {
720                index: 0,
721                side: JoinSide::Left,
722            },
723            ColumnIndex {
724                index: 1,
725                side: JoinSide::Left,
726            },
727            ColumnIndex {
728                index: 0,
729                side: JoinSide::Right,
730            },
731            ColumnIndex {
732                index: 2,
733                side: JoinSide::Right,
734            },
735        ]
736    }
737
738    fn a_plus_one_greater_than_x_plus_one(
739        join_schema: &Schema,
740    ) -> Result<Arc<dyn PhysicalExpr>> {
741        let left_expr = binary(
742            Arc::new(Column::new("a", 0)),
743            Operator::Plus,
744            lit(1),
745            join_schema,
746        )?;
747        let right_expr = binary(
748            Arc::new(Column::new("x", 1)),
749            Operator::Plus,
750            lit(1),
751            join_schema,
752        )?;
753        binary(left_expr, Operator::Gt, right_expr, join_schema)
754    }
755
756    fn a_plus_rand_greater_than_x(join_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
757        let left_expr = binary(
758            Arc::new(Column::new("a", 0)),
759            Operator::Plus,
760            Arc::new(ScalarFunctionExpr::new(
761                "rand",
762                random(),
763                vec![],
764                FieldRef::new(Field::new("out", DataType::Float64, false)),
765                Arc::new(ConfigOptions::default()),
766            )),
767            join_schema,
768        )?;
769        let right_expr = Arc::new(Column::new("x", 1));
770        binary(left_expr, Operator::Gt, right_expr, join_schema)
771    }
772
773    fn a_greater_than_x(join_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
774        binary(
775            Arc::new(Column::new("a", 0)),
776            Operator::Gt,
777            Arc::new(Column::new("x", 1)),
778            join_schema,
779        )
780    }
781
782    fn a_plus_b_greater_than_x_plus_z(
783        join_schema: &Schema,
784    ) -> Result<Arc<dyn PhysicalExpr>> {
785        let lhs = binary(
786            Arc::new(Column::new("a", 0)),
787            Operator::Plus,
788            Arc::new(Column::new("b", 1)),
789            join_schema,
790        )?;
791        let rhs = binary(
792            Arc::new(Column::new("x", 2)),
793            Operator::Plus,
794            Arc::new(Column::new("z", 3)),
795            join_schema,
796        )?;
797        binary(lhs, Operator::Gt, rhs, join_schema)
798    }
799}