1use crate::PhysicalOptimizerRule;
24use arrow::datatypes::{Fields, Schema, SchemaRef};
25use datafusion_common::alias::AliasGenerator;
26use std::collections::HashSet;
27use std::sync::Arc;
28
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::tree_node::{
31 Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
32};
33use datafusion_common::{JoinSide, JoinType, Result};
34use datafusion_physical_expr::expressions::Column;
35use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
36use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
37use datafusion_physical_plan::joins::NestedLoopJoinExec;
38use datafusion_physical_plan::projection::{
39 remove_unnecessary_projections, ProjectionExec,
40};
41use datafusion_physical_plan::ExecutionPlan;
42
43#[derive(Default, Debug)]
50pub struct ProjectionPushdown {}
51
52impl ProjectionPushdown {
53 #[allow(missing_docs)]
54 pub fn new() -> Self {
55 Self {}
56 }
57}
58
59impl PhysicalOptimizerRule for ProjectionPushdown {
60 fn optimize(
61 &self,
62 plan: Arc<dyn ExecutionPlan>,
63 _config: &ConfigOptions,
64 ) -> Result<Arc<dyn ExecutionPlan>> {
65 let alias_generator = AliasGenerator::new();
66 let plan = plan
67 .transform_up(|plan| {
68 match plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
69 None => Ok(Transformed::no(plan)),
70 Some(hash_join) => try_push_down_join_filter(
71 Arc::clone(&plan),
72 hash_join,
73 &alias_generator,
74 ),
75 }
76 })
77 .map(|t| t.data)?;
78
79 plan.transform_down(remove_unnecessary_projections).data()
80 }
81
82 fn name(&self) -> &str {
83 "ProjectionPushdown"
84 }
85
86 fn schema_check(&self) -> bool {
87 true
88 }
89}
90
91fn try_push_down_join_filter(
95 original_plan: Arc<dyn ExecutionPlan>,
96 join: &NestedLoopJoinExec,
97 alias_generator: &AliasGenerator,
98) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
99 if matches!(join.join_type(), JoinType::LeftMark | JoinType::RightMark) {
101 return Ok(Transformed::no(original_plan));
102 }
103
104 let projections = join.projection();
105 let Some(filter) = join.filter() else {
106 return Ok(Transformed::no(original_plan));
107 };
108
109 let original_lhs_length = join.left().schema().fields().len();
110 let original_rhs_length = join.right().schema().fields().len();
111
112 let lhs_rewrite = try_push_down_projection(
113 Arc::clone(&join.right().schema()),
114 Arc::clone(join.left()),
115 JoinSide::Left,
116 filter.clone(),
117 alias_generator,
118 )?;
119 let rhs_rewrite = try_push_down_projection(
120 Arc::clone(&lhs_rewrite.data.0.schema()),
121 Arc::clone(join.right()),
122 JoinSide::Right,
123 lhs_rewrite.data.1,
124 alias_generator,
125 )?;
126 if !lhs_rewrite.transformed && !rhs_rewrite.transformed {
127 return Ok(Transformed::no(original_plan));
128 }
129
130 let join_filter = minimize_join_filter(
131 Arc::clone(rhs_rewrite.data.1.expression()),
132 rhs_rewrite.data.1.column_indices().to_vec(),
133 lhs_rewrite.data.0.schema().as_ref(),
134 rhs_rewrite.data.0.schema().as_ref(),
135 );
136
137 let new_lhs_length = lhs_rewrite.data.0.schema().fields.len();
138 let projections = match projections {
139 None => match join.join_type() {
140 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
141 let mut projections = Vec::new();
143 projections.extend(0..original_lhs_length);
144 projections.extend(new_lhs_length..new_lhs_length + original_rhs_length);
145 projections
146 }
147 JoinType::LeftSemi | JoinType::LeftAnti => {
148 let mut projections = Vec::new();
150 projections.extend(0..original_lhs_length);
151 projections
152 }
153 JoinType::RightSemi | JoinType::RightAnti => {
154 let mut projections = Vec::new();
156 projections.extend(0..original_rhs_length);
157 projections
158 }
159 _ => unreachable!("Unsupported join type"),
160 },
161 Some(projections) => {
162 let rhs_offset = new_lhs_length - original_lhs_length;
163 projections
164 .iter()
165 .map(|idx| {
166 if *idx >= original_lhs_length {
167 idx + rhs_offset
168 } else {
169 *idx
170 }
171 })
172 .collect()
173 }
174 };
175
176 Ok(Transformed::yes(Arc::new(NestedLoopJoinExec::try_new(
177 lhs_rewrite.data.0,
178 rhs_rewrite.data.0,
179 Some(join_filter),
180 join.join_type(),
181 Some(projections),
182 )?)))
183}
184
185fn try_push_down_projection(
187 other_schema: SchemaRef,
188 plan: Arc<dyn ExecutionPlan>,
189 join_side: JoinSide,
190 join_filter: JoinFilter,
191 alias_generator: &AliasGenerator,
192) -> Result<Transformed<(Arc<dyn ExecutionPlan>, JoinFilter)>> {
193 let expr = Arc::clone(join_filter.expression());
194 let original_plan_schema = plan.schema();
195 let mut rewriter = JoinFilterRewriter::new(
196 join_side,
197 original_plan_schema.as_ref(),
198 join_filter.column_indices().to_vec(),
199 alias_generator,
200 );
201 let new_expr = rewriter.rewrite(expr)?;
202
203 if new_expr.transformed {
204 let new_join_side =
205 ProjectionExec::try_new(rewriter.join_side_projections, plan)?;
206 let new_schema = Arc::clone(&new_join_side.schema());
207
208 let (lhs_schema, rhs_schema) = match join_side {
209 JoinSide::Left => (new_schema, other_schema),
210 JoinSide::Right => (other_schema, new_schema),
211 JoinSide::None => unreachable!("Mark join not supported"),
212 };
213 let intermediate_schema = rewriter
214 .intermediate_column_indices
215 .iter()
216 .map(|ci| match ci.side {
217 JoinSide::Left => Arc::clone(&lhs_schema.fields[ci.index]),
218 JoinSide::Right => Arc::clone(&rhs_schema.fields[ci.index]),
219 JoinSide::None => unreachable!("Mark join not supported"),
220 })
221 .collect::<Fields>();
222
223 let join_filter = JoinFilter::new(
224 new_expr.data,
225 rewriter.intermediate_column_indices,
226 Arc::new(Schema::new(intermediate_schema)),
227 );
228 Ok(Transformed::yes((Arc::new(new_join_side), join_filter)))
229 } else {
230 Ok(Transformed::no((plan, join_filter)))
231 }
232}
233
234fn minimize_join_filter(
240 expr: Arc<dyn PhysicalExpr>,
241 old_column_indices: Vec<ColumnIndex>,
242 lhs_schema: &Schema,
243 rhs_schema: &Schema,
244) -> JoinFilter {
245 let mut used_columns = HashSet::new();
246 expr.apply(|expr| {
247 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
248 used_columns.insert(col.index());
249 }
250 Ok(TreeNodeRecursion::Continue)
251 })
252 .expect("Closure cannot fail");
253
254 let new_column_indices = old_column_indices
255 .iter()
256 .enumerate()
257 .filter(|(idx, _)| used_columns.contains(idx))
258 .map(|(_, ci)| ci.clone())
259 .collect::<Vec<_>>();
260 let fields = new_column_indices
261 .iter()
262 .map(|ci| match ci.side {
263 JoinSide::Left => lhs_schema.field(ci.index).clone(),
264 JoinSide::Right => rhs_schema.field(ci.index).clone(),
265 JoinSide::None => unreachable!("Mark join not supported"),
266 })
267 .collect::<Fields>();
268
269 let final_expr = expr
270 .transform_up(|expr| match expr.as_any().downcast_ref::<Column>() {
271 None => Ok(Transformed::no(expr)),
272 Some(column) => {
273 let new_idx = used_columns
274 .iter()
275 .filter(|idx| **idx < column.index())
276 .count();
277 let new_column = Column::new(column.name(), new_idx);
278 Ok(Transformed::yes(
279 Arc::new(new_column) as Arc<dyn PhysicalExpr>
280 ))
281 }
282 })
283 .expect("Closure cannot fail");
284
285 JoinFilter::new(
286 final_expr.data,
287 new_column_indices,
288 Arc::new(Schema::new(fields)),
289 )
290}
291
292struct JoinFilterRewriter<'a> {
300 join_side: JoinSide,
301 join_side_schema: &'a Schema,
302 join_side_projections: Vec<(Arc<dyn PhysicalExpr>, String)>,
303 intermediate_column_indices: Vec<ColumnIndex>,
304 alias_generator: &'a AliasGenerator,
305}
306
307impl<'a> JoinFilterRewriter<'a> {
308 fn new(
310 join_side: JoinSide,
311 join_side_schema: &'a Schema,
312 column_indices: Vec<ColumnIndex>,
313 alias_generator: &'a AliasGenerator,
314 ) -> Self {
315 let projections = join_side_schema
316 .fields()
317 .iter()
318 .enumerate()
319 .map(|(idx, field)| {
320 (
321 Arc::new(Column::new(field.name(), idx)) as Arc<dyn PhysicalExpr>,
322 field.name().to_string(),
323 )
324 })
325 .collect();
326
327 Self {
328 join_side,
329 join_side_schema,
330 join_side_projections: projections,
331 intermediate_column_indices: column_indices,
332 alias_generator,
333 }
334 }
335
336 fn rewrite(
340 &mut self,
341 expr: Arc<dyn PhysicalExpr>,
342 ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
343 let depends_on_this_side = self.depends_on_join_side(&expr, self.join_side)?;
344 if !depends_on_this_side {
346 return Ok(Transformed::no(expr));
347 }
348
349 let depends_on_other_side =
351 self.depends_on_join_side(&expr, self.join_side.negate())?;
352 let is_volatile = is_volatile_expression_tree(expr.as_ref());
353 if depends_on_other_side || is_volatile {
354 return expr.map_children(|expr| self.rewrite(expr));
355 }
356
357 if expr.children().is_empty() {
362 return Ok(Transformed::no(expr));
363 }
364
365 let alias = self.alias_generator.next("join_proj_push_down");
367 let idx = self.create_new_column(alias.clone(), expr)?;
368
369 Ok(Transformed::yes(
370 Arc::new(Column::new(&alias, idx)) as Arc<dyn PhysicalExpr>
371 ))
372 }
373
374 fn create_new_column(
376 &mut self,
377 name: String,
378 expr: Arc<dyn PhysicalExpr>,
379 ) -> Result<usize> {
380 let new_idx = self.join_side_projections.len();
383 let rewritten_expr = expr.transform_up(|expr| {
384 Ok(match expr.as_any().downcast_ref::<Column>() {
385 None => Transformed::no(expr),
386 Some(column) => {
387 let intermediate_column =
388 &self.intermediate_column_indices[column.index()];
389 assert_eq!(intermediate_column.side, self.join_side);
390
391 let join_side_index = intermediate_column.index;
392 let field = self.join_side_schema.field(join_side_index);
393 let new_column = Column::new(field.name(), join_side_index);
394 Transformed::yes(Arc::new(new_column) as Arc<dyn PhysicalExpr>)
395 }
396 })
397 })?;
398 self.join_side_projections.push((rewritten_expr.data, name));
399
400 let new_intermediate_idx = self.intermediate_column_indices.len();
402 let idx = ColumnIndex {
403 index: new_idx,
404 side: self.join_side,
405 };
406 self.intermediate_column_indices.push(idx);
407
408 Ok(new_intermediate_idx)
409 }
410
411 fn depends_on_join_side(
413 &mut self,
414 expr: &Arc<dyn PhysicalExpr>,
415 join_side: JoinSide,
416 ) -> Result<bool> {
417 let mut result = false;
418 expr.apply(|expr| match expr.as_any().downcast_ref::<Column>() {
419 None => Ok(TreeNodeRecursion::Continue),
420 Some(c) => {
421 let column_index = &self.intermediate_column_indices[c.index()];
422 if column_index.side == join_side {
423 result = true;
424 return Ok(TreeNodeRecursion::Stop);
425 }
426 Ok(TreeNodeRecursion::Continue)
427 }
428 })?;
429
430 Ok(result)
431 }
432}
433
434fn is_volatile_expression_tree(expr: &dyn PhysicalExpr) -> bool {
435 if expr.is_volatile_node() {
436 return true;
437 }
438
439 expr.children()
440 .iter()
441 .map(|expr| is_volatile_expression_tree(expr.as_ref()))
442 .reduce(|lhs, rhs| lhs || rhs)
443 .unwrap_or(false)
444}
445
446#[cfg(test)]
447mod test {
448 use super::*;
449 use arrow::datatypes::{DataType, Field, FieldRef, Schema};
450 use datafusion_expr_common::operator::Operator;
451 use datafusion_functions::math::random;
452 use datafusion_physical_expr::expressions::{binary, lit};
453 use datafusion_physical_expr::ScalarFunctionExpr;
454 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
455 use datafusion_physical_plan::displayable;
456 use datafusion_physical_plan::empty::EmptyExec;
457 use insta::assert_snapshot;
458 use std::sync::Arc;
459
460 #[tokio::test]
461 async fn no_computation_does_not_project() -> Result<()> {
462 let (left_schema, right_schema) = create_simple_schemas();
463 let optimized_plan = run_test(
464 left_schema,
465 right_schema,
466 a_x(),
467 None,
468 a_greater_than_x,
469 JoinType::Inner,
470 )?;
471
472 assert_snapshot!(optimized_plan, @r"
473 NestedLoopJoinExec: join_type=Inner, filter=a@0 > x@1
474 EmptyExec
475 EmptyExec
476 ");
477 Ok(())
478 }
479
480 #[tokio::test]
481 async fn simple_push_down() -> Result<()> {
482 let (left_schema, right_schema) = create_simple_schemas();
483 let optimized_plan = run_test(
484 left_schema,
485 right_schema,
486 a_x(),
487 None,
488 a_plus_one_greater_than_x_plus_one,
489 JoinType::Inner,
490 )?;
491
492 assert_snapshot!(optimized_plan, @r"
493 NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, x@2]
494 ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
495 EmptyExec
496 ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
497 EmptyExec
498 ");
499 Ok(())
500 }
501
502 #[tokio::test]
503 async fn does_not_push_down_short_circuiting_expressions() -> Result<()> {
504 let (left_schema, right_schema) = create_simple_schemas();
505 let optimized_plan = run_test(
506 left_schema,
507 right_schema,
508 a_x(),
509 None,
510 |schema| {
511 binary(
512 lit(false),
513 Operator::And,
514 a_plus_one_greater_than_x_plus_one(schema)?,
515 schema,
516 )
517 },
518 JoinType::Inner,
519 )?;
520
521 assert_snapshot!(optimized_plan, @r"
522 NestedLoopJoinExec: join_type=Inner, filter=false AND join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, x@2]
523 ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
524 EmptyExec
525 ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
526 EmptyExec
527 ");
528 Ok(())
529 }
530
531 #[tokio::test]
532 async fn does_not_push_down_volatile_functions() -> Result<()> {
533 let (left_schema, right_schema) = create_simple_schemas();
534 let optimized_plan = run_test(
535 left_schema,
536 right_schema,
537 a_x(),
538 None,
539 a_plus_rand_greater_than_x,
540 JoinType::Inner,
541 )?;
542
543 assert_snapshot!(optimized_plan, @r"
544 NestedLoopJoinExec: join_type=Inner, filter=a@0 + rand() > x@1
545 EmptyExec
546 EmptyExec
547 ");
548 Ok(())
549 }
550
551 #[tokio::test]
552 async fn complex_schema_push_down() -> Result<()> {
553 let (left_schema, right_schema) = create_complex_schemas();
554
555 let optimized_plan = run_test(
556 left_schema,
557 right_schema,
558 a_b_x_z(),
559 None,
560 a_plus_b_greater_than_x_plus_z,
561 JoinType::Inner,
562 )?;
563
564 assert_snapshot!(optimized_plan, @r"
565 NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, b@1, c@2, x@4, y@5, z@6]
566 ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, a@0 + b@1 as join_proj_push_down_1]
567 EmptyExec
568 ProjectionExec: expr=[x@0 as x, y@1 as y, z@2 as z, x@0 + z@2 as join_proj_push_down_2]
569 EmptyExec
570 ");
571 Ok(())
572 }
573
574 #[tokio::test]
575 async fn push_down_with_existing_projections() -> Result<()> {
576 let (left_schema, right_schema) = create_complex_schemas();
577
578 let optimized_plan = run_test(
579 left_schema,
580 right_schema,
581 a_b_x_z(),
582 Some(vec![1, 3, 5]), a_plus_b_greater_than_x_plus_z,
584 JoinType::Inner,
585 )?;
586
587 assert_snapshot!(optimized_plan, @r"
588 NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[b@1, x@4, z@6]
589 ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, a@0 + b@1 as join_proj_push_down_1]
590 EmptyExec
591 ProjectionExec: expr=[x@0 as x, y@1 as y, z@2 as z, x@0 + z@2 as join_proj_push_down_2]
592 EmptyExec
593 ");
594 Ok(())
595 }
596
597 #[tokio::test]
598 async fn left_semi_join_projection() -> Result<()> {
599 let (left_schema, right_schema) = create_simple_schemas();
600
601 let left_semi_join_plan = run_test(
602 left_schema.clone(),
603 right_schema.clone(),
604 a_x(),
605 None,
606 a_plus_one_greater_than_x_plus_one,
607 JoinType::LeftSemi,
608 )?;
609
610 assert_snapshot!(left_semi_join_plan, @r"
611 NestedLoopJoinExec: join_type=LeftSemi, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0]
612 ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
613 EmptyExec
614 ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
615 EmptyExec
616 ");
617 Ok(())
618 }
619
620 #[tokio::test]
621 async fn right_semi_join_projection() -> Result<()> {
622 let (left_schema, right_schema) = create_simple_schemas();
623 let right_semi_join_plan = run_test(
624 left_schema,
625 right_schema,
626 a_x(),
627 None,
628 a_plus_one_greater_than_x_plus_one,
629 JoinType::RightSemi,
630 )?;
631 assert_snapshot!(right_semi_join_plan, @r"
632 NestedLoopJoinExec: join_type=RightSemi, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[x@0]
633 ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
634 EmptyExec
635 ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
636 EmptyExec
637 ");
638 Ok(())
639 }
640
641 fn run_test(
642 left_schema: Schema,
643 right_schema: Schema,
644 column_indices: Vec<ColumnIndex>,
645 existing_projections: Option<Vec<usize>>,
646 filter_expr_builder: impl FnOnce(&Schema) -> Result<Arc<dyn PhysicalExpr>>,
647 join_type: JoinType,
648 ) -> Result<String> {
649 let left = Arc::new(EmptyExec::new(Arc::new(left_schema.clone())));
650 let right = Arc::new(EmptyExec::new(Arc::new(right_schema.clone())));
651
652 let join_fields: Vec<_> = column_indices
653 .iter()
654 .map(|ci| match ci.side {
655 JoinSide::Left => left_schema.field(ci.index).clone(),
656 JoinSide::Right => right_schema.field(ci.index).clone(),
657 JoinSide::None => unreachable!(),
658 })
659 .collect();
660 let join_schema = Arc::new(Schema::new(join_fields));
661
662 let filter_expr = filter_expr_builder(join_schema.as_ref())?;
663
664 let join_filter = JoinFilter::new(filter_expr, column_indices, join_schema);
665
666 let join = NestedLoopJoinExec::try_new(
667 left,
668 right,
669 Some(join_filter),
670 &join_type,
671 existing_projections,
672 )?;
673
674 let optimizer = ProjectionPushdown::new();
675 let optimized_plan = optimizer.optimize(Arc::new(join), &Default::default())?;
676
677 let displayable_plan = displayable(optimized_plan.as_ref()).indent(false);
678 Ok(displayable_plan.to_string())
679 }
680
681 fn create_simple_schemas() -> (Schema, Schema) {
682 let left_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
683 let right_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
684
685 (left_schema, right_schema)
686 }
687
688 fn create_complex_schemas() -> (Schema, Schema) {
689 let left_schema = Schema::new(vec![
690 Field::new("a", DataType::Int32, false),
691 Field::new("b", DataType::Int32, false),
692 Field::new("c", DataType::Int32, false),
693 ]);
694
695 let right_schema = Schema::new(vec![
696 Field::new("x", DataType::Int32, false),
697 Field::new("y", DataType::Int32, false),
698 Field::new("z", DataType::Int32, false),
699 ]);
700
701 (left_schema, right_schema)
702 }
703
704 fn a_x() -> Vec<ColumnIndex> {
705 vec![
706 ColumnIndex {
707 index: 0,
708 side: JoinSide::Left,
709 },
710 ColumnIndex {
711 index: 0,
712 side: JoinSide::Right,
713 },
714 ]
715 }
716
717 fn a_b_x_z() -> Vec<ColumnIndex> {
718 vec![
719 ColumnIndex {
720 index: 0,
721 side: JoinSide::Left,
722 },
723 ColumnIndex {
724 index: 1,
725 side: JoinSide::Left,
726 },
727 ColumnIndex {
728 index: 0,
729 side: JoinSide::Right,
730 },
731 ColumnIndex {
732 index: 2,
733 side: JoinSide::Right,
734 },
735 ]
736 }
737
738 fn a_plus_one_greater_than_x_plus_one(
739 join_schema: &Schema,
740 ) -> Result<Arc<dyn PhysicalExpr>> {
741 let left_expr = binary(
742 Arc::new(Column::new("a", 0)),
743 Operator::Plus,
744 lit(1),
745 join_schema,
746 )?;
747 let right_expr = binary(
748 Arc::new(Column::new("x", 1)),
749 Operator::Plus,
750 lit(1),
751 join_schema,
752 )?;
753 binary(left_expr, Operator::Gt, right_expr, join_schema)
754 }
755
756 fn a_plus_rand_greater_than_x(join_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
757 let left_expr = binary(
758 Arc::new(Column::new("a", 0)),
759 Operator::Plus,
760 Arc::new(ScalarFunctionExpr::new(
761 "rand",
762 random(),
763 vec![],
764 FieldRef::new(Field::new("out", DataType::Float64, false)),
765 Arc::new(ConfigOptions::default()),
766 )),
767 join_schema,
768 )?;
769 let right_expr = Arc::new(Column::new("x", 1));
770 binary(left_expr, Operator::Gt, right_expr, join_schema)
771 }
772
773 fn a_greater_than_x(join_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
774 binary(
775 Arc::new(Column::new("a", 0)),
776 Operator::Gt,
777 Arc::new(Column::new("x", 1)),
778 join_schema,
779 )
780 }
781
782 fn a_plus_b_greater_than_x_plus_z(
783 join_schema: &Schema,
784 ) -> Result<Arc<dyn PhysicalExpr>> {
785 let lhs = binary(
786 Arc::new(Column::new("a", 0)),
787 Operator::Plus,
788 Arc::new(Column::new("b", 1)),
789 join_schema,
790 )?;
791 let rhs = binary(
792 Arc::new(Column::new("x", 2)),
793 Operator::Plus,
794 Arc::new(Column::new("z", 3)),
795 join_schema,
796 )?;
797 binary(lhs, Operator::Gt, rhs, join_schema)
798 }
799}