1use crate::PhysicalOptimizerRule;
24use arrow::datatypes::{Fields, Schema, SchemaRef};
25use datafusion_common::alias::AliasGenerator;
26use std::collections::HashSet;
27use std::sync::Arc;
28
29use datafusion_common::config::ConfigOptions;
30use datafusion_common::tree_node::{
31 Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
32};
33use datafusion_common::{JoinSide, JoinType, Result};
34use datafusion_physical_expr::expressions::Column;
35use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, is_volatile};
36use datafusion_physical_plan::ExecutionPlan;
37use datafusion_physical_plan::joins::NestedLoopJoinExec;
38use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
39use datafusion_physical_plan::projection::{
40 ProjectionExec, remove_unnecessary_projections,
41};
42
43#[derive(Default, Debug)]
50pub struct ProjectionPushdown {}
51
52impl ProjectionPushdown {
53 #[expect(missing_docs)]
54 pub fn new() -> Self {
55 Self {}
56 }
57}
58
59impl PhysicalOptimizerRule for ProjectionPushdown {
60 fn optimize(
61 &self,
62 plan: Arc<dyn ExecutionPlan>,
63 _config: &ConfigOptions,
64 ) -> Result<Arc<dyn ExecutionPlan>> {
65 let alias_generator = AliasGenerator::new();
66 let plan = plan
67 .transform_up(|plan| {
68 match plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
69 None => Ok(Transformed::no(plan)),
70 Some(hash_join) => try_push_down_join_filter(
71 Arc::clone(&plan),
72 hash_join,
73 &alias_generator,
74 ),
75 }
76 })
77 .map(|t| t.data)?;
78
79 plan.transform_down(remove_unnecessary_projections).data()
80 }
81
82 fn name(&self) -> &str {
83 "ProjectionPushdown"
84 }
85
86 fn schema_check(&self) -> bool {
87 true
88 }
89}
90
91fn try_push_down_join_filter(
95 original_plan: Arc<dyn ExecutionPlan>,
96 join: &NestedLoopJoinExec,
97 alias_generator: &AliasGenerator,
98) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
99 if matches!(join.join_type(), JoinType::LeftMark | JoinType::RightMark) {
101 return Ok(Transformed::no(original_plan));
102 }
103
104 let projections = join.projection();
105 let Some(filter) = join.filter() else {
106 return Ok(Transformed::no(original_plan));
107 };
108
109 let original_lhs_length = join.left().schema().fields().len();
110 let original_rhs_length = join.right().schema().fields().len();
111
112 let lhs_rewrite = try_push_down_projection(
113 Arc::clone(&join.right().schema()),
114 Arc::clone(join.left()),
115 JoinSide::Left,
116 filter.clone(),
117 alias_generator,
118 )?;
119 let rhs_rewrite = try_push_down_projection(
120 Arc::clone(&lhs_rewrite.data.0.schema()),
121 Arc::clone(join.right()),
122 JoinSide::Right,
123 lhs_rewrite.data.1,
124 alias_generator,
125 )?;
126 if !lhs_rewrite.transformed && !rhs_rewrite.transformed {
127 return Ok(Transformed::no(original_plan));
128 }
129
130 let join_filter = minimize_join_filter(
131 Arc::clone(rhs_rewrite.data.1.expression()),
132 rhs_rewrite.data.1.column_indices(),
133 lhs_rewrite.data.0.schema().as_ref(),
134 rhs_rewrite.data.0.schema().as_ref(),
135 );
136
137 let new_lhs_length = lhs_rewrite.data.0.schema().fields.len();
138 let projections = match projections.as_ref() {
139 None => match join.join_type() {
140 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
141 let mut projections = Vec::new();
143 projections.extend(0..original_lhs_length);
144 projections.extend(new_lhs_length..new_lhs_length + original_rhs_length);
145 projections
146 }
147 JoinType::LeftSemi | JoinType::LeftAnti => {
148 let mut projections = Vec::new();
150 projections.extend(0..original_lhs_length);
151 projections
152 }
153 JoinType::RightSemi | JoinType::RightAnti => {
154 let mut projections = Vec::new();
156 projections.extend(0..original_rhs_length);
157 projections
158 }
159 _ => unreachable!("Unsupported join type"),
160 },
161 Some(projections) => {
162 let rhs_offset = new_lhs_length - original_lhs_length;
163 projections
164 .iter()
165 .map(|idx| {
166 if *idx >= original_lhs_length {
167 idx + rhs_offset
168 } else {
169 *idx
170 }
171 })
172 .collect()
173 }
174 };
175
176 Ok(Transformed::yes(Arc::new(NestedLoopJoinExec::try_new(
177 lhs_rewrite.data.0,
178 rhs_rewrite.data.0,
179 Some(join_filter),
180 join.join_type(),
181 Some(projections),
182 )?)))
183}
184
185fn try_push_down_projection(
187 other_schema: SchemaRef,
188 plan: Arc<dyn ExecutionPlan>,
189 join_side: JoinSide,
190 join_filter: JoinFilter,
191 alias_generator: &AliasGenerator,
192) -> Result<Transformed<(Arc<dyn ExecutionPlan>, JoinFilter)>> {
193 let expr = Arc::clone(join_filter.expression());
194 let original_plan_schema = plan.schema();
195 let mut rewriter = JoinFilterRewriter::new(
196 join_side,
197 original_plan_schema.as_ref(),
198 join_filter.column_indices().to_vec(),
199 alias_generator,
200 );
201 let new_expr = rewriter.rewrite(expr)?;
202
203 if new_expr.transformed {
204 let new_join_side =
205 ProjectionExec::try_new(rewriter.join_side_projections, plan)?;
206 let new_schema = Arc::clone(&new_join_side.schema());
207
208 let (lhs_schema, rhs_schema) = match join_side {
209 JoinSide::Left => (new_schema, other_schema),
210 JoinSide::Right => (other_schema, new_schema),
211 JoinSide::None => unreachable!("Mark join not supported"),
212 };
213 let intermediate_schema = rewriter
214 .intermediate_column_indices
215 .iter()
216 .map(|ci| match ci.side {
217 JoinSide::Left => Arc::clone(&lhs_schema.fields[ci.index]),
218 JoinSide::Right => Arc::clone(&rhs_schema.fields[ci.index]),
219 JoinSide::None => unreachable!("Mark join not supported"),
220 })
221 .collect::<Fields>();
222
223 let join_filter = JoinFilter::new(
224 new_expr.data,
225 rewriter.intermediate_column_indices,
226 Arc::new(Schema::new(intermediate_schema)),
227 );
228 Ok(Transformed::yes((Arc::new(new_join_side), join_filter)))
229 } else {
230 Ok(Transformed::no((plan, join_filter)))
231 }
232}
233
234fn minimize_join_filter(
240 expr: Arc<dyn PhysicalExpr>,
241 old_column_indices: &[ColumnIndex],
242 lhs_schema: &Schema,
243 rhs_schema: &Schema,
244) -> JoinFilter {
245 let mut used_columns = HashSet::new();
246 expr.apply(|expr| {
247 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
248 used_columns.insert(col.index());
249 }
250 Ok(TreeNodeRecursion::Continue)
251 })
252 .expect("Closure cannot fail");
253
254 let new_column_indices = old_column_indices
255 .iter()
256 .enumerate()
257 .filter(|(idx, _)| used_columns.contains(idx))
258 .map(|(_, ci)| ci.clone())
259 .collect::<Vec<_>>();
260 let fields = new_column_indices
261 .iter()
262 .map(|ci| match ci.side {
263 JoinSide::Left => lhs_schema.field(ci.index).clone(),
264 JoinSide::Right => rhs_schema.field(ci.index).clone(),
265 JoinSide::None => unreachable!("Mark join not supported"),
266 })
267 .collect::<Fields>();
268
269 let final_expr = expr
270 .transform_up(|expr| match expr.as_any().downcast_ref::<Column>() {
271 None => Ok(Transformed::no(expr)),
272 Some(column) => {
273 let new_idx = used_columns
274 .iter()
275 .filter(|idx| **idx < column.index())
276 .count();
277 let new_column = Column::new(column.name(), new_idx);
278 Ok(Transformed::yes(
279 Arc::new(new_column) as Arc<dyn PhysicalExpr>
280 ))
281 }
282 })
283 .expect("Closure cannot fail");
284
285 JoinFilter::new(
286 final_expr.data,
287 new_column_indices,
288 Arc::new(Schema::new(fields)),
289 )
290}
291
292struct JoinFilterRewriter<'a> {
300 join_side: JoinSide,
301 join_side_schema: &'a Schema,
302 join_side_projections: Vec<(Arc<dyn PhysicalExpr>, String)>,
303 intermediate_column_indices: Vec<ColumnIndex>,
304 alias_generator: &'a AliasGenerator,
305}
306
307impl<'a> JoinFilterRewriter<'a> {
308 fn new(
310 join_side: JoinSide,
311 join_side_schema: &'a Schema,
312 column_indices: Vec<ColumnIndex>,
313 alias_generator: &'a AliasGenerator,
314 ) -> Self {
315 let projections = join_side_schema
316 .fields()
317 .iter()
318 .enumerate()
319 .map(|(idx, field)| {
320 (
321 Arc::new(Column::new(field.name(), idx)) as Arc<dyn PhysicalExpr>,
322 field.name().to_string(),
323 )
324 })
325 .collect();
326
327 Self {
328 join_side,
329 join_side_schema,
330 join_side_projections: projections,
331 intermediate_column_indices: column_indices,
332 alias_generator,
333 }
334 }
335
336 fn rewrite(
340 &mut self,
341 expr: Arc<dyn PhysicalExpr>,
342 ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
343 let depends_on_this_side = self.depends_on_join_side(&expr, self.join_side)?;
344 if !depends_on_this_side {
346 return Ok(Transformed::no(expr));
347 }
348
349 let depends_on_other_side =
351 self.depends_on_join_side(&expr, self.join_side.negate())?;
352 if depends_on_other_side || is_volatile(&expr) {
353 return expr.map_children(|expr| self.rewrite(expr));
354 }
355
356 if expr.children().is_empty() {
361 return Ok(Transformed::no(expr));
362 }
363
364 let alias = self.alias_generator.next("join_proj_push_down");
366 let idx = self.create_new_column(alias.clone(), expr)?;
367
368 Ok(Transformed::yes(
369 Arc::new(Column::new(&alias, idx)) as Arc<dyn PhysicalExpr>
370 ))
371 }
372
373 fn create_new_column(
375 &mut self,
376 name: String,
377 expr: Arc<dyn PhysicalExpr>,
378 ) -> Result<usize> {
379 let new_idx = self.join_side_projections.len();
382 let rewritten_expr = expr.transform_up(|expr| {
383 Ok(match expr.as_any().downcast_ref::<Column>() {
384 None => Transformed::no(expr),
385 Some(column) => {
386 let intermediate_column =
387 &self.intermediate_column_indices[column.index()];
388 assert_eq!(intermediate_column.side, self.join_side);
389
390 let join_side_index = intermediate_column.index;
391 let field = self.join_side_schema.field(join_side_index);
392 let new_column = Column::new(field.name(), join_side_index);
393 Transformed::yes(Arc::new(new_column) as Arc<dyn PhysicalExpr>)
394 }
395 })
396 })?;
397 self.join_side_projections.push((rewritten_expr.data, name));
398
399 let new_intermediate_idx = self.intermediate_column_indices.len();
401 let idx = ColumnIndex {
402 index: new_idx,
403 side: self.join_side,
404 };
405 self.intermediate_column_indices.push(idx);
406
407 Ok(new_intermediate_idx)
408 }
409
410 fn depends_on_join_side(
412 &mut self,
413 expr: &Arc<dyn PhysicalExpr>,
414 join_side: JoinSide,
415 ) -> Result<bool> {
416 let mut result = false;
417 expr.apply(|expr| match expr.as_any().downcast_ref::<Column>() {
418 None => Ok(TreeNodeRecursion::Continue),
419 Some(c) => {
420 let column_index = &self.intermediate_column_indices[c.index()];
421 if column_index.side == join_side {
422 result = true;
423 return Ok(TreeNodeRecursion::Stop);
424 }
425 Ok(TreeNodeRecursion::Continue)
426 }
427 })?;
428
429 Ok(result)
430 }
431}
432
433#[cfg(test)]
434mod test {
435 use super::*;
436 use arrow::datatypes::{DataType, Field, FieldRef, Schema};
437 use datafusion_expr_common::operator::Operator;
438 use datafusion_functions::math::random;
439 use datafusion_physical_expr::ScalarFunctionExpr;
440 use datafusion_physical_expr::expressions::{binary, lit};
441 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
442 use datafusion_physical_plan::displayable;
443 use datafusion_physical_plan::empty::EmptyExec;
444 use insta::assert_snapshot;
445 use std::sync::Arc;
446
447 #[tokio::test]
448 async fn no_computation_does_not_project() -> Result<()> {
449 let (left_schema, right_schema) = create_simple_schemas();
450 let optimized_plan = run_test(
451 left_schema,
452 right_schema,
453 a_x(),
454 None,
455 a_greater_than_x,
456 JoinType::Inner,
457 )?;
458
459 assert_snapshot!(optimized_plan, @r"
460 NestedLoopJoinExec: join_type=Inner, filter=a@0 > x@1
461 EmptyExec
462 EmptyExec
463 ");
464 Ok(())
465 }
466
467 #[tokio::test]
468 async fn simple_push_down() -> Result<()> {
469 let (left_schema, right_schema) = create_simple_schemas();
470 let optimized_plan = run_test(
471 left_schema,
472 right_schema,
473 a_x(),
474 None,
475 a_plus_one_greater_than_x_plus_one,
476 JoinType::Inner,
477 )?;
478
479 assert_snapshot!(optimized_plan, @r"
480 NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, x@2]
481 ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
482 EmptyExec
483 ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
484 EmptyExec
485 ");
486 Ok(())
487 }
488
489 #[tokio::test]
490 async fn does_not_push_down_short_circuiting_expressions() -> Result<()> {
491 let (left_schema, right_schema) = create_simple_schemas();
492 let optimized_plan = run_test(
493 left_schema,
494 right_schema,
495 a_x(),
496 None,
497 |schema| {
498 binary(
499 lit(false),
500 Operator::And,
501 a_plus_one_greater_than_x_plus_one(schema)?,
502 schema,
503 )
504 },
505 JoinType::Inner,
506 )?;
507
508 assert_snapshot!(optimized_plan, @r"
509 NestedLoopJoinExec: join_type=Inner, filter=false AND join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, x@2]
510 ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
511 EmptyExec
512 ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
513 EmptyExec
514 ");
515 Ok(())
516 }
517
518 #[tokio::test]
519 async fn does_not_push_down_volatile_functions() -> Result<()> {
520 let (left_schema, right_schema) = create_simple_schemas();
521 let optimized_plan = run_test(
522 left_schema,
523 right_schema,
524 a_x(),
525 None,
526 a_plus_rand_greater_than_x,
527 JoinType::Inner,
528 )?;
529
530 assert_snapshot!(optimized_plan, @r"
531 NestedLoopJoinExec: join_type=Inner, filter=a@0 + rand() > x@1
532 EmptyExec
533 EmptyExec
534 ");
535 Ok(())
536 }
537
538 #[tokio::test]
539 async fn complex_schema_push_down() -> Result<()> {
540 let (left_schema, right_schema) = create_complex_schemas();
541
542 let optimized_plan = run_test(
543 left_schema,
544 right_schema,
545 a_b_x_z(),
546 None,
547 a_plus_b_greater_than_x_plus_z,
548 JoinType::Inner,
549 )?;
550
551 assert_snapshot!(optimized_plan, @r"
552 NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0, b@1, c@2, x@4, y@5, z@6]
553 ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, a@0 + b@1 as join_proj_push_down_1]
554 EmptyExec
555 ProjectionExec: expr=[x@0 as x, y@1 as y, z@2 as z, x@0 + z@2 as join_proj_push_down_2]
556 EmptyExec
557 ");
558 Ok(())
559 }
560
561 #[tokio::test]
562 async fn push_down_with_existing_projections() -> Result<()> {
563 let (left_schema, right_schema) = create_complex_schemas();
564
565 let optimized_plan = run_test(
566 left_schema,
567 right_schema,
568 a_b_x_z(),
569 Some(vec![1, 3, 5]), a_plus_b_greater_than_x_plus_z,
571 JoinType::Inner,
572 )?;
573
574 assert_snapshot!(optimized_plan, @r"
575 NestedLoopJoinExec: join_type=Inner, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[b@1, x@4, z@6]
576 ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c, a@0 + b@1 as join_proj_push_down_1]
577 EmptyExec
578 ProjectionExec: expr=[x@0 as x, y@1 as y, z@2 as z, x@0 + z@2 as join_proj_push_down_2]
579 EmptyExec
580 ");
581 Ok(())
582 }
583
584 #[tokio::test]
585 async fn left_semi_join_projection() -> Result<()> {
586 let (left_schema, right_schema) = create_simple_schemas();
587
588 let left_semi_join_plan = run_test(
589 left_schema.clone(),
590 right_schema.clone(),
591 a_x(),
592 None,
593 a_plus_one_greater_than_x_plus_one,
594 JoinType::LeftSemi,
595 )?;
596
597 assert_snapshot!(left_semi_join_plan, @r"
598 NestedLoopJoinExec: join_type=LeftSemi, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[a@0]
599 ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
600 EmptyExec
601 ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
602 EmptyExec
603 ");
604 Ok(())
605 }
606
607 #[tokio::test]
608 async fn right_semi_join_projection() -> Result<()> {
609 let (left_schema, right_schema) = create_simple_schemas();
610 let right_semi_join_plan = run_test(
611 left_schema,
612 right_schema,
613 a_x(),
614 None,
615 a_plus_one_greater_than_x_plus_one,
616 JoinType::RightSemi,
617 )?;
618 assert_snapshot!(right_semi_join_plan, @r"
619 NestedLoopJoinExec: join_type=RightSemi, filter=join_proj_push_down_1@0 > join_proj_push_down_2@1, projection=[x@0]
620 ProjectionExec: expr=[a@0 as a, a@0 + 1 as join_proj_push_down_1]
621 EmptyExec
622 ProjectionExec: expr=[x@0 as x, x@0 + 1 as join_proj_push_down_2]
623 EmptyExec
624 ");
625 Ok(())
626 }
627
628 fn run_test(
629 left_schema: Schema,
630 right_schema: Schema,
631 column_indices: Vec<ColumnIndex>,
632 existing_projections: Option<Vec<usize>>,
633 filter_expr_builder: impl FnOnce(&Schema) -> Result<Arc<dyn PhysicalExpr>>,
634 join_type: JoinType,
635 ) -> Result<String> {
636 let left = Arc::new(EmptyExec::new(Arc::new(left_schema.clone())));
637 let right = Arc::new(EmptyExec::new(Arc::new(right_schema.clone())));
638
639 let join_fields: Vec<_> = column_indices
640 .iter()
641 .map(|ci| match ci.side {
642 JoinSide::Left => left_schema.field(ci.index).clone(),
643 JoinSide::Right => right_schema.field(ci.index).clone(),
644 JoinSide::None => unreachable!(),
645 })
646 .collect();
647 let join_schema = Arc::new(Schema::new(join_fields));
648
649 let filter_expr = filter_expr_builder(join_schema.as_ref())?;
650
651 let join_filter = JoinFilter::new(filter_expr, column_indices, join_schema);
652
653 let join = NestedLoopJoinExec::try_new(
654 left,
655 right,
656 Some(join_filter),
657 &join_type,
658 existing_projections,
659 )?;
660
661 let optimizer = ProjectionPushdown::new();
662 let optimized_plan = optimizer.optimize(Arc::new(join), &Default::default())?;
663
664 let displayable_plan = displayable(optimized_plan.as_ref()).indent(false);
665 Ok(displayable_plan.to_string())
666 }
667
668 fn create_simple_schemas() -> (Schema, Schema) {
669 let left_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
670 let right_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
671
672 (left_schema, right_schema)
673 }
674
675 fn create_complex_schemas() -> (Schema, Schema) {
676 let left_schema = Schema::new(vec![
677 Field::new("a", DataType::Int32, false),
678 Field::new("b", DataType::Int32, false),
679 Field::new("c", DataType::Int32, false),
680 ]);
681
682 let right_schema = Schema::new(vec![
683 Field::new("x", DataType::Int32, false),
684 Field::new("y", DataType::Int32, false),
685 Field::new("z", DataType::Int32, false),
686 ]);
687
688 (left_schema, right_schema)
689 }
690
691 fn a_x() -> Vec<ColumnIndex> {
692 vec![
693 ColumnIndex {
694 index: 0,
695 side: JoinSide::Left,
696 },
697 ColumnIndex {
698 index: 0,
699 side: JoinSide::Right,
700 },
701 ]
702 }
703
704 fn a_b_x_z() -> Vec<ColumnIndex> {
705 vec![
706 ColumnIndex {
707 index: 0,
708 side: JoinSide::Left,
709 },
710 ColumnIndex {
711 index: 1,
712 side: JoinSide::Left,
713 },
714 ColumnIndex {
715 index: 0,
716 side: JoinSide::Right,
717 },
718 ColumnIndex {
719 index: 2,
720 side: JoinSide::Right,
721 },
722 ]
723 }
724
725 fn a_plus_one_greater_than_x_plus_one(
726 join_schema: &Schema,
727 ) -> Result<Arc<dyn PhysicalExpr>> {
728 let left_expr = binary(
729 Arc::new(Column::new("a", 0)),
730 Operator::Plus,
731 lit(1),
732 join_schema,
733 )?;
734 let right_expr = binary(
735 Arc::new(Column::new("x", 1)),
736 Operator::Plus,
737 lit(1),
738 join_schema,
739 )?;
740 binary(left_expr, Operator::Gt, right_expr, join_schema)
741 }
742
743 fn a_plus_rand_greater_than_x(join_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
744 let left_expr = binary(
745 Arc::new(Column::new("a", 0)),
746 Operator::Plus,
747 Arc::new(ScalarFunctionExpr::new(
748 "rand",
749 random(),
750 vec![],
751 FieldRef::new(Field::new("out", DataType::Float64, false)),
752 Arc::new(ConfigOptions::default()),
753 )),
754 join_schema,
755 )?;
756 let right_expr = Arc::new(Column::new("x", 1));
757 binary(left_expr, Operator::Gt, right_expr, join_schema)
758 }
759
760 fn a_greater_than_x(join_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
761 binary(
762 Arc::new(Column::new("a", 0)),
763 Operator::Gt,
764 Arc::new(Column::new("x", 1)),
765 join_schema,
766 )
767 }
768
769 fn a_plus_b_greater_than_x_plus_z(
770 join_schema: &Schema,
771 ) -> Result<Arc<dyn PhysicalExpr>> {
772 let lhs = binary(
773 Arc::new(Column::new("a", 0)),
774 Operator::Plus,
775 Arc::new(Column::new("b", 1)),
776 join_schema,
777 )?;
778 let rhs = binary(
779 Arc::new(Column::new("x", 2)),
780 Operator::Plus,
781 Arc::new(Column::new("z", 3)),
782 join_schema,
783 )?;
784 binary(lhs, Operator::Gt, rhs, join_schema)
785 }
786}