1use super::expressions::Column;
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26 DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27 SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
28};
29use crate::column_rewriter::PhysicalColumnRewriter;
30use crate::execution_plan::CardinalityEffect;
31use crate::filter_pushdown::{
32 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33 FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
34};
35use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
36use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
37use std::collections::HashMap;
38use std::pin::Pin;
39use std::sync::Arc;
40use std::task::{Context, Poll};
41
42use arrow::datatypes::SchemaRef;
43use arrow::record_batch::RecordBatch;
44use datafusion_common::config::ConfigOptions;
45use datafusion_common::tree_node::{
46 Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
47};
48use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
49use datafusion_execution::TaskContext;
50use datafusion_expr::ExpressionPlacement;
51use datafusion_physical_expr::equivalence::ProjectionMapping;
52use datafusion_physical_expr::projection::Projector;
53use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
54use datafusion_physical_expr_common::sort_expr::{
55 LexOrdering, LexRequirement, PhysicalSortExpr,
56};
57pub use datafusion_physical_expr::projection::{
60 ProjectionExpr, ProjectionExprs, update_expr,
61};
62
63use futures::stream::{Stream, StreamExt};
64use log::trace;
65
66#[derive(Debug, Clone)]
71pub struct ProjectionExec {
72 projector: Projector,
75 input: Arc<dyn ExecutionPlan>,
77 metrics: ExecutionPlanMetricsSet,
79 cache: Arc<PlanProperties>,
81}
82
83impl ProjectionExec {
84 pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
134 where
135 I: IntoIterator<Item = E>,
136 E: Into<ProjectionExpr>,
137 {
138 let input_schema = input.schema();
139 let expr_arc = expr.into_iter().map(Into::into).collect::<Arc<_>>();
140 let projection = ProjectionExprs::from_expressions(expr_arc);
141 let projector = projection.make_projector(&input_schema)?;
142 Self::try_from_projector(projector, input)
143 }
144
145 fn try_from_projector(
146 projector: Projector,
147 input: Arc<dyn ExecutionPlan>,
148 ) -> Result<Self> {
149 let projection_mapping =
151 projector.projection().projection_mapping(&input.schema())?;
152 let cache = Self::compute_properties(
153 &input,
154 &projection_mapping,
155 Arc::clone(projector.output_schema()),
156 )?;
157 Ok(Self {
158 projector,
159 input,
160 metrics: ExecutionPlanMetricsSet::new(),
161 cache: Arc::new(cache),
162 })
163 }
164
165 pub fn expr(&self) -> &[ProjectionExpr] {
167 self.projector.projection().as_ref()
168 }
169
170 pub fn projection_expr(&self) -> &ProjectionExprs {
172 self.projector.projection()
173 }
174
175 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
177 &self.input
178 }
179
180 fn compute_properties(
182 input: &Arc<dyn ExecutionPlan>,
183 projection_mapping: &ProjectionMapping,
184 schema: SchemaRef,
185 ) -> Result<PlanProperties> {
186 let input_eq_properties = input.equivalence_properties();
188 let eq_properties = input_eq_properties.project(projection_mapping, schema);
189 let output_partitioning = input
191 .output_partitioning()
192 .project(projection_mapping, input_eq_properties);
193
194 Ok(PlanProperties::new(
195 eq_properties,
196 output_partitioning,
197 input.pipeline_behavior(),
198 input.boundedness(),
199 ))
200 }
201
202 fn collect_reverse_alias(
205 &self,
206 ) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
207 let mut alias_map = datafusion_common::HashMap::new();
208 for projection in self.projection_expr().iter() {
209 let (aliased_index, _output_field) = self
210 .projector
211 .output_schema()
212 .column_with_name(&projection.alias)
213 .ok_or_else(|| {
214 DataFusionError::Internal(format!(
215 "Expr {} with alias {} not found in output schema",
216 projection.expr, projection.alias
217 ))
218 })?;
219 let aliased_col = Column::new(&projection.alias, aliased_index);
220 alias_map.insert(aliased_col, Arc::clone(&projection.expr));
221 }
222 Ok(alias_map)
223 }
224
225 fn with_new_children_and_same_properties(
226 &self,
227 mut children: Vec<Arc<dyn ExecutionPlan>>,
228 ) -> Self {
229 Self {
230 input: children.swap_remove(0),
231 metrics: ExecutionPlanMetricsSet::new(),
232 ..Self::clone(self)
233 }
234 }
235}
236
237impl DisplayAs for ProjectionExec {
238 fn fmt_as(
239 &self,
240 t: DisplayFormatType,
241 f: &mut std::fmt::Formatter,
242 ) -> std::fmt::Result {
243 match t {
244 DisplayFormatType::Default | DisplayFormatType::Verbose => {
245 let expr: Vec<String> = self
246 .projector
247 .projection()
248 .as_ref()
249 .iter()
250 .map(|proj_expr| {
251 let e = proj_expr.expr.to_string();
252 if e != proj_expr.alias {
253 format!("{e} as {}", proj_expr.alias)
254 } else {
255 e
256 }
257 })
258 .collect();
259
260 write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
261 }
262 DisplayFormatType::TreeRender => {
263 for (i, proj_expr) in self.expr().iter().enumerate() {
264 let expr_sql = fmt_sql(proj_expr.expr.as_ref());
265 if proj_expr.expr.to_string() == proj_expr.alias {
266 writeln!(f, "expr{i}={expr_sql}")?;
267 } else {
268 writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
269 }
270 }
271
272 Ok(())
273 }
274 }
275 }
276}
277
278impl ExecutionPlan for ProjectionExec {
279 fn name(&self) -> &'static str {
280 "ProjectionExec"
281 }
282
283 fn properties(&self) -> &Arc<PlanProperties> {
285 &self.cache
286 }
287
288 fn maintains_input_order(&self) -> Vec<bool> {
289 vec![true]
291 }
292
293 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
294 let all_simple_exprs =
295 self.projector
296 .projection()
297 .as_ref()
298 .iter()
299 .all(|proj_expr| {
300 !matches!(
301 proj_expr.expr.placement(),
302 ExpressionPlacement::KeepInPlace
303 )
304 });
305 vec![!all_simple_exprs]
309 }
310
311 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
312 vec![&self.input]
313 }
314
315 fn with_new_children(
316 self: Arc<Self>,
317 mut children: Vec<Arc<dyn ExecutionPlan>>,
318 ) -> Result<Arc<dyn ExecutionPlan>> {
319 check_if_same_properties!(self, children);
320 ProjectionExec::try_from_projector(
321 self.projector.clone(),
322 children.swap_remove(0),
323 )
324 .map(|p| Arc::new(p) as _)
325 }
326
327 fn execute(
328 &self,
329 partition: usize,
330 context: Arc<TaskContext>,
331 ) -> Result<SendableRecordBatchStream> {
332 trace!(
333 "Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}",
334 partition,
335 context.session_id(),
336 context.task_id()
337 );
338
339 let projector = self.projector.with_metrics(&self.metrics, partition);
340 Ok(Box::pin(ProjectionStream::new(
341 projector,
342 self.input.execute(partition, context)?,
343 BaselineMetrics::new(&self.metrics, partition),
344 )?))
345 }
346
347 fn metrics(&self) -> Option<MetricsSet> {
348 Some(self.metrics.clone_inner())
349 }
350
351 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
352 let input_stats =
353 Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
354 let output_schema = self.schema();
355 Ok(Arc::new(
356 self.projector
357 .projection()
358 .project_statistics(input_stats, &output_schema)?,
359 ))
360 }
361
362 fn supports_limit_pushdown(&self) -> bool {
363 true
364 }
365
366 fn cardinality_effect(&self) -> CardinalityEffect {
367 CardinalityEffect::Equal
368 }
369
370 fn try_swapping_with_projection(
371 &self,
372 projection: &ProjectionExec,
373 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
374 let maybe_unified = try_unifying_projections(projection, self)?;
375 if let Some(new_plan) = maybe_unified {
376 remove_unnecessary_projections(new_plan).data().map(Some)
378 } else {
379 Ok(Some(Arc::new(projection.clone())))
380 }
381 }
382
383 fn gather_filters_for_pushdown(
384 &self,
385 _phase: FilterPushdownPhase,
386 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
387 _config: &ConfigOptions,
388 ) -> Result<FilterDescription> {
389 let invert_alias_map = self.collect_reverse_alias()?;
391 let output_schema = self.schema();
392 let remapper = FilterRemapper::new(output_schema);
393 let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
394
395 for filter in parent_filters {
396 if let Some(reassigned) = remapper.try_remap(&filter)? {
398 let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
400 let rewritten = reassigned.rewrite(&mut rewriter)?.data;
401 child_parent_filters.push(PushedDownPredicate::supported(rewritten));
402 } else {
403 child_parent_filters.push(PushedDownPredicate::unsupported(filter));
404 }
405 }
406
407 Ok(FilterDescription::new().with_child(ChildFilterDescription {
408 parent_filters: child_parent_filters,
409 self_filters: vec![],
410 }))
411 }
412
413 fn handle_child_pushdown_result(
414 &self,
415 _phase: FilterPushdownPhase,
416 child_pushdown_result: ChildPushdownResult,
417 _config: &ConfigOptions,
418 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
419 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
420 }
421
422 fn try_pushdown_sort(
423 &self,
424 order: &[PhysicalSortExpr],
425 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
426 let child = self.input();
427 let mut child_order = Vec::new();
428
429 for sort_expr in order {
431 let mut can_pushdown = true;
433 let transformed = Arc::clone(&sort_expr.expr).transform(|expr| {
434 if let Some(col) = expr.downcast_ref::<Column>() {
435 if col.index() >= self.expr().len() {
438 can_pushdown = false;
439 return Ok(Transformed::no(expr));
440 }
441
442 let proj_expr = &self.expr()[col.index()];
443
444 if let Some(child_col) = proj_expr.expr.downcast_ref::<Column>() {
448 Ok(Transformed::yes(Arc::new(child_col.clone()) as _))
450 } else {
451 can_pushdown = false;
453 Ok(Transformed::no(expr))
454 }
455 } else {
456 Ok(Transformed::no(expr))
457 }
458 })?;
459
460 if !can_pushdown {
461 return Ok(SortOrderPushdownResult::Unsupported);
462 }
463
464 child_order.push(PhysicalSortExpr {
465 expr: transformed.data,
466 options: sort_expr.options,
467 });
468 }
469
470 match child.try_pushdown_sort(&child_order)? {
472 SortOrderPushdownResult::Exact { inner } => {
473 let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
474 Ok(SortOrderPushdownResult::Exact { inner: new_exec })
475 }
476 SortOrderPushdownResult::Inexact { inner } => {
477 let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
478 Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
479 }
480 SortOrderPushdownResult::Unsupported => {
481 Ok(SortOrderPushdownResult::Unsupported)
482 }
483 }
484 }
485
486 fn with_preserve_order(
487 &self,
488 preserve_order: bool,
489 ) -> Option<Arc<dyn ExecutionPlan>> {
490 self.input
491 .with_preserve_order(preserve_order)
492 .and_then(|new_input| {
493 Arc::new(self.clone())
494 .with_new_children(vec![new_input])
495 .ok()
496 })
497 }
498}
499
500impl ProjectionStream {
501 fn new(
503 projector: Projector,
504 input: SendableRecordBatchStream,
505 baseline_metrics: BaselineMetrics,
506 ) -> Result<Self> {
507 Ok(Self {
508 projector,
509 input,
510 baseline_metrics,
511 })
512 }
513
514 fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
515 let _timer = self.baseline_metrics.elapsed_compute().timer();
517 self.projector.project_batch(batch)
518 }
519}
520
521struct ProjectionStream {
523 projector: Projector,
524 input: SendableRecordBatchStream,
525 baseline_metrics: BaselineMetrics,
526}
527
528impl Stream for ProjectionStream {
529 type Item = Result<RecordBatch>;
530
531 fn poll_next(
532 mut self: Pin<&mut Self>,
533 cx: &mut Context<'_>,
534 ) -> Poll<Option<Self::Item>> {
535 let poll = self.input.poll_next_unpin(cx).map(|x| match x {
536 Some(Ok(batch)) => Some(self.batch_project(&batch)),
537 other => other,
538 });
539
540 self.baseline_metrics.record_poll(poll)
541 }
542
543 fn size_hint(&self) -> (usize, Option<usize>) {
544 self.input.size_hint()
546 }
547}
548
549impl RecordBatchStream for ProjectionStream {
550 fn schema(&self) -> SchemaRef {
552 Arc::clone(self.projector.output_schema())
553 }
554}
555
556pub trait EmbeddedProjection: ExecutionPlan + Sized {
566 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
567}
568
569pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
572 projection: &ProjectionExec,
573 execution_plan: &Exec,
574) -> Result<Option<Arc<dyn ExecutionPlan>>> {
575 if projection.expr().is_empty() {
580 let new_execution_plan = Arc::new(execution_plan.with_projection(Some(vec![]))?);
581 return Ok(Some(new_execution_plan));
582 }
583
584 let projection_index = collect_column_indices(projection.expr());
586
587 if projection_index.is_empty() {
588 return Ok(None);
589 };
590
591 let columns_reduced = projection_index.len() < execution_plan.schema().fields().len();
592
593 let new_execution_plan =
594 Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
595
596 let embed_project_exprs = projection_index
598 .iter()
599 .zip(new_execution_plan.schema().fields())
600 .map(|(index, field)| ProjectionExpr {
601 expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
602 alias: field.name().to_owned(),
603 })
604 .collect::<Vec<_>>();
605
606 let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
607
608 for proj_expr in projection.expr() {
609 let Some(expr) =
611 update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
612 else {
613 return Ok(None);
614 };
615 new_projection_exprs.push(ProjectionExpr {
616 expr,
617 alias: proj_expr.alias.clone(),
618 });
619 }
620 let new_projection = Arc::new(ProjectionExec::try_new(
622 new_projection_exprs,
623 Arc::clone(&new_execution_plan) as _,
624 )?);
625 if is_projection_removable(&new_projection) {
626 Ok(Some(new_execution_plan))
628 } else if columns_reduced {
629 Ok(Some(new_projection))
632 } else {
633 Ok(None)
636 }
637}
638
639pub struct JoinData {
640 pub projected_left_child: ProjectionExec,
641 pub projected_right_child: ProjectionExec,
642 pub join_filter: Option<JoinFilter>,
643 pub join_on: JoinOn,
644}
645
646pub fn try_pushdown_through_join(
647 projection: &ProjectionExec,
648 join_left: &Arc<dyn ExecutionPlan>,
649 join_right: &Arc<dyn ExecutionPlan>,
650 join_on: JoinOnRef,
651 schema: &SchemaRef,
652 filter: Option<&JoinFilter>,
653) -> Result<Option<JoinData>> {
654 let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
656 return Ok(None);
657 };
658
659 let (far_right_left_col_ind, far_left_right_col_ind) =
660 join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
661
662 if !join_allows_pushdown(
663 &projection_as_columns,
664 schema,
665 far_right_left_col_ind,
666 far_left_right_col_ind,
667 ) {
668 return Ok(None);
669 }
670
671 let new_filter = if let Some(filter) = filter {
672 match update_join_filter(
673 &projection_as_columns[0..=far_right_left_col_ind as _],
674 &projection_as_columns[far_left_right_col_ind as _..],
675 filter,
676 join_left.schema().fields().len(),
677 ) {
678 Some(updated_filter) => Some(updated_filter),
679 None => return Ok(None),
680 }
681 } else {
682 None
683 };
684
685 let Some(new_on) = update_join_on(
686 &projection_as_columns[0..=far_right_left_col_ind as _],
687 &projection_as_columns[far_left_right_col_ind as _..],
688 join_on,
689 join_left.schema().fields().len(),
690 ) else {
691 return Ok(None);
692 };
693
694 let (new_left, new_right) = new_join_children(
695 &projection_as_columns,
696 far_right_left_col_ind,
697 far_left_right_col_ind,
698 join_left,
699 join_right,
700 )?;
701
702 Ok(Some(JoinData {
703 projected_left_child: new_left,
704 projected_right_child: new_right,
705 join_filter: new_filter,
706 join_on: new_on,
707 }))
708}
709
710pub fn remove_unnecessary_projections(
715 plan: Arc<dyn ExecutionPlan>,
716) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
717 let maybe_modified = if let Some(projection) = plan.downcast_ref::<ProjectionExec>() {
718 if is_projection_removable(projection) {
721 return Ok(Transformed::yes(Arc::clone(projection.input())));
722 }
723 projection
725 .input()
726 .try_swapping_with_projection(projection)?
727 } else {
728 return Ok(Transformed::no(plan));
729 };
730 Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
731}
732
733fn is_projection_removable(projection: &ProjectionExec) -> bool {
738 let exprs = projection.expr();
739 exprs.iter().enumerate().all(|(idx, proj_expr)| {
740 let Some(col) = proj_expr.expr.downcast_ref::<Column>() else {
741 return false;
742 };
743 col.name() == proj_expr.alias && col.index() == idx
744 }) && exprs.len() == projection.input().schema().fields().len()
745}
746
747pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
750 exprs.iter().all(|proj_expr| {
751 proj_expr
752 .expr
753 .downcast_ref::<Column>()
754 .map(|column| column.name() == proj_expr.alias)
755 .unwrap_or(false)
756 })
757}
758
759pub fn new_projections_for_columns(
763 projection: &[ProjectionExpr],
764 source: &[usize],
765) -> Vec<usize> {
766 projection
767 .iter()
768 .filter_map(|proj_expr| {
769 proj_expr
770 .expr
771 .downcast_ref::<Column>()
772 .map(|expr| source[expr.index()])
773 })
774 .collect()
775}
776
777pub fn make_with_child(
780 projection: &ProjectionExec,
781 child: &Arc<dyn ExecutionPlan>,
782) -> Result<Arc<dyn ExecutionPlan>> {
783 ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
784 .map(|e| Arc::new(e) as _)
785}
786
787pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
789 exprs.iter().all(|proj_expr| proj_expr.expr.is::<Column>())
790}
791
792pub fn update_ordering(
795 ordering: LexOrdering,
796 projected_exprs: &[ProjectionExpr],
797) -> Result<Option<LexOrdering>> {
798 let mut updated_exprs = vec![];
799 for mut sort_expr in ordering.into_iter() {
800 let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
801 else {
802 return Ok(None);
803 };
804 sort_expr.expr = updated_expr;
805 updated_exprs.push(sort_expr);
806 }
807 Ok(LexOrdering::new(updated_exprs))
808}
809
810pub fn update_ordering_requirement(
813 reqs: LexRequirement,
814 projected_exprs: &[ProjectionExpr],
815) -> Result<Option<LexRequirement>> {
816 let mut updated_exprs = vec![];
817 for mut sort_expr in reqs.into_iter() {
818 let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
819 else {
820 return Ok(None);
821 };
822 sort_expr.expr = updated_expr;
823 updated_exprs.push(sort_expr);
824 }
825 Ok(LexRequirement::new(updated_exprs))
826}
827
828pub fn physical_to_column_exprs(
831 exprs: &[ProjectionExpr],
832) -> Option<Vec<(Column, String)>> {
833 exprs
834 .iter()
835 .map(|proj_expr| {
836 proj_expr
837 .expr
838 .downcast_ref::<Column>()
839 .map(|col| (col.clone(), proj_expr.alias.clone()))
840 })
841 .collect()
842}
843
844pub fn new_join_children(
848 projection_as_columns: &[(Column, String)],
849 far_right_left_col_ind: i32,
850 far_left_right_col_ind: i32,
851 left_child: &Arc<dyn ExecutionPlan>,
852 right_child: &Arc<dyn ExecutionPlan>,
853) -> Result<(ProjectionExec, ProjectionExec)> {
854 let new_left = ProjectionExec::try_new(
855 projection_as_columns[0..=far_right_left_col_ind as _]
856 .iter()
857 .map(|(col, alias)| ProjectionExpr {
858 expr: Arc::new(Column::new(col.name(), col.index())) as _,
859 alias: alias.clone(),
860 }),
861 Arc::clone(left_child),
862 )?;
863 let left_size = left_child.schema().fields().len() as i32;
864 let new_right = ProjectionExec::try_new(
865 projection_as_columns[far_left_right_col_ind as _..]
866 .iter()
867 .map(|(col, alias)| {
868 ProjectionExpr {
869 expr: Arc::new(Column::new(
870 col.name(),
871 (col.index() as i32 - left_size) as _,
874 )) as _,
875 alias: alias.clone(),
876 }
877 }),
878 Arc::clone(right_child),
879 )?;
880
881 Ok((new_left, new_right))
882}
883
884pub fn join_allows_pushdown(
890 projection_as_columns: &[(Column, String)],
891 join_schema: &SchemaRef,
892 far_right_left_col_ind: i32,
893 far_left_right_col_ind: i32,
894) -> bool {
895 projection_as_columns.len() < join_schema.fields().len()
897 && (far_right_left_col_ind + 1 == far_left_right_col_ind)
899 && far_right_left_col_ind >= 0
901 && far_left_right_col_ind < projection_as_columns.len() as i32
902}
903
904pub fn join_table_borders(
910 left_table_column_count: usize,
911 projection_as_columns: &[(Column, String)],
912) -> (i32, i32) {
913 let far_right_left_col_ind = projection_as_columns
914 .iter()
915 .enumerate()
916 .take_while(|(_, (projection_column, _))| {
917 projection_column.index() < left_table_column_count
918 })
919 .last()
920 .map(|(index, _)| index as i32)
921 .unwrap_or(-1);
922
923 let far_left_right_col_ind = projection_as_columns
924 .iter()
925 .enumerate()
926 .rev()
927 .take_while(|(_, (projection_column, _))| {
928 projection_column.index() >= left_table_column_count
929 })
930 .last()
931 .map(|(index, _)| index as i32)
932 .unwrap_or(projection_as_columns.len() as i32);
933
934 (far_right_left_col_ind, far_left_right_col_ind)
935}
936
937pub fn update_join_on(
940 proj_left_exprs: &[(Column, String)],
941 proj_right_exprs: &[(Column, String)],
942 hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
943 left_field_size: usize,
944) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
945 let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
946 .iter()
947 .map(|(left, right)| (left, right))
948 .unzip();
949
950 let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
951 let new_right_columns =
952 new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
953
954 match (new_left_columns, new_right_columns) {
955 (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
956 _ => None,
957 }
958}
959
960pub fn update_join_filter(
963 projection_left_exprs: &[(Column, String)],
964 projection_right_exprs: &[(Column, String)],
965 join_filter: &JoinFilter,
966 left_field_size: usize,
967) -> Option<JoinFilter> {
968 let mut new_left_indices = new_indices_for_join_filter(
969 join_filter,
970 JoinSide::Left,
971 projection_left_exprs,
972 0,
973 )
974 .into_iter();
975 let mut new_right_indices = new_indices_for_join_filter(
976 join_filter,
977 JoinSide::Right,
978 projection_right_exprs,
979 left_field_size,
980 )
981 .into_iter();
982
983 (new_right_indices.len() + new_left_indices.len()
985 == join_filter.column_indices().len())
986 .then(|| {
987 JoinFilter::new(
988 Arc::clone(join_filter.expression()),
989 join_filter
990 .column_indices()
991 .iter()
992 .map(|col_idx| ColumnIndex {
993 index: if col_idx.side == JoinSide::Left {
994 new_left_indices.next().unwrap()
995 } else {
996 new_right_indices.next().unwrap()
997 },
998 side: col_idx.side,
999 })
1000 .collect(),
1001 Arc::clone(join_filter.schema()),
1002 )
1003 })
1004}
1005
1006fn try_unifying_projections(
1008 projection: &ProjectionExec,
1009 child: &ProjectionExec,
1010) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1011 let mut projected_exprs = vec![];
1012 let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
1013
1014 projection.expr().iter().for_each(|proj_expr| {
1016 proj_expr
1017 .expr
1018 .apply(|expr| {
1019 Ok({
1020 if let Some(column) = expr.downcast_ref::<Column>() {
1021 *column_ref_map.entry(column.clone()).or_default() += 1;
1022 }
1023 TreeNodeRecursion::Continue
1024 })
1025 })
1026 .unwrap();
1027 });
1028 if column_ref_map.iter().any(|(column, count)| {
1033 *count > 1
1034 && !child.expr()[column.index()]
1035 .expr
1036 .placement()
1037 .should_push_to_leaves()
1038 }) {
1039 return Ok(None);
1040 }
1041 for proj_expr in projection.expr() {
1042 let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
1046 return Ok(None);
1047 };
1048 projected_exprs.push(ProjectionExpr {
1049 expr,
1050 alias: proj_expr.alias.clone(),
1051 });
1052 }
1053 ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
1054 .map(|e| Some(Arc::new(e) as _))
1055}
1056
1057fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
1059 let mut seen = std::collections::HashSet::new();
1066 let mut indices = Vec::new();
1067 for proj_expr in exprs {
1068 if let Some(col) = proj_expr.expr.downcast_ref::<Column>() {
1069 if seen.insert(col.index()) {
1071 indices.push(col.index());
1072 }
1073 } else {
1074 proj_expr
1078 .expr
1079 .apply(|expr| {
1080 if let Some(col) = expr.downcast_ref::<Column>()
1081 && seen.insert(col.index())
1082 {
1083 indices.push(col.index());
1084 }
1085 Ok(TreeNodeRecursion::Continue)
1086 })
1087 .expect("closure always returns OK");
1088 }
1089 }
1090 indices
1091}
1092
1093fn new_indices_for_join_filter(
1101 join_filter: &JoinFilter,
1102 join_side: JoinSide,
1103 projection_exprs: &[(Column, String)],
1104 column_index_offset: usize,
1105) -> Vec<usize> {
1106 join_filter
1107 .column_indices()
1108 .iter()
1109 .filter(|col_idx| col_idx.side == join_side)
1110 .filter_map(|col_idx| {
1111 projection_exprs
1112 .iter()
1113 .position(|(col, _)| col_idx.index + column_index_offset == col.index())
1114 })
1115 .collect()
1116}
1117
1118fn new_columns_for_join_on(
1126 hash_join_on: &[&PhysicalExprRef],
1127 projection_exprs: &[(Column, String)],
1128 column_index_offset: usize,
1129) -> Option<Vec<PhysicalExprRef>> {
1130 let new_columns = hash_join_on
1131 .iter()
1132 .filter_map(|on| {
1133 Arc::clone(*on)
1135 .transform(|expr| {
1136 if let Some(column) = expr.downcast_ref::<Column>() {
1137 let new_column = projection_exprs
1139 .iter()
1140 .enumerate()
1141 .find(|(_, (proj_column, _))| {
1142 column.name() == proj_column.name()
1143 && column.index() + column_index_offset
1144 == proj_column.index()
1145 })
1146 .map(|(index, (_, alias))| Column::new(alias, index));
1147 if let Some(new_column) = new_column {
1148 Ok(Transformed::yes(Arc::new(new_column)))
1149 } else {
1150 internal_err!(
1154 "Column {:?} not found in projection expressions",
1155 column
1156 )
1157 }
1158 } else {
1159 Ok(Transformed::no(expr))
1160 }
1161 })
1162 .data()
1163 .ok()
1164 })
1165 .collect::<Vec<_>>();
1166 (new_columns.len() == hash_join_on.len()).then_some(new_columns)
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use super::*;
1172
1173 use crate::common::collect;
1174
1175 use crate::filter_pushdown::PushedDown;
1176 use crate::test;
1177 use crate::test::exec::StatisticsExec;
1178
1179 use arrow::datatypes::{DataType, Field, Schema};
1180 use datafusion_common::ScalarValue;
1181 use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
1182
1183 use datafusion_expr::Operator;
1184 use datafusion_physical_expr::expressions::{
1185 BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, binary, col, lit,
1186 };
1187
1188 #[test]
1189 fn test_collect_column_indices() -> Result<()> {
1190 let expr = Arc::new(BinaryExpr::new(
1191 Arc::new(Column::new("b", 7)),
1192 Operator::Minus,
1193 Arc::new(BinaryExpr::new(
1194 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1195 Operator::Plus,
1196 Arc::new(Column::new("a", 1)),
1197 )),
1198 ));
1199 let column_indices = collect_column_indices(&[ProjectionExpr {
1200 expr,
1201 alias: "b-(1+a)".to_string(),
1202 }]);
1203 assert_eq!(column_indices, vec![7, 1]);
1205 Ok(())
1206 }
1207
1208 #[test]
1209 fn test_join_table_borders() -> Result<()> {
1210 let projections = vec![
1211 (Column::new("b", 1), "b".to_owned()),
1212 (Column::new("c", 2), "c".to_owned()),
1213 (Column::new("e", 4), "e".to_owned()),
1214 (Column::new("d", 3), "d".to_owned()),
1215 (Column::new("c", 2), "c".to_owned()),
1216 (Column::new("f", 5), "f".to_owned()),
1217 (Column::new("h", 7), "h".to_owned()),
1218 (Column::new("g", 6), "g".to_owned()),
1219 ];
1220 let left_table_column_count = 5;
1221 assert_eq!(
1222 join_table_borders(left_table_column_count, &projections),
1223 (4, 5)
1224 );
1225
1226 let left_table_column_count = 8;
1227 assert_eq!(
1228 join_table_borders(left_table_column_count, &projections),
1229 (7, 8)
1230 );
1231
1232 let left_table_column_count = 1;
1233 assert_eq!(
1234 join_table_borders(left_table_column_count, &projections),
1235 (-1, 0)
1236 );
1237
1238 let projections = vec![
1239 (Column::new("a", 0), "a".to_owned()),
1240 (Column::new("b", 1), "b".to_owned()),
1241 (Column::new("d", 3), "d".to_owned()),
1242 (Column::new("g", 6), "g".to_owned()),
1243 (Column::new("e", 4), "e".to_owned()),
1244 (Column::new("f", 5), "f".to_owned()),
1245 (Column::new("e", 4), "e".to_owned()),
1246 (Column::new("h", 7), "h".to_owned()),
1247 ];
1248 let left_table_column_count = 5;
1249 assert_eq!(
1250 join_table_borders(left_table_column_count, &projections),
1251 (2, 7)
1252 );
1253
1254 let left_table_column_count = 7;
1255 assert_eq!(
1256 join_table_borders(left_table_column_count, &projections),
1257 (6, 7)
1258 );
1259
1260 Ok(())
1261 }
1262
1263 #[tokio::test]
1264 async fn project_no_column() -> Result<()> {
1265 let task_ctx = Arc::new(TaskContext::default());
1266
1267 let exec = test::scan_partitioned(1);
1268 let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
1269
1270 let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
1271 let stream = projection.execute(0, Arc::clone(&task_ctx))?;
1272 let output = collect(stream).await?;
1273 assert_eq!(output.len(), expected.len());
1274
1275 Ok(())
1276 }
1277
1278 #[tokio::test]
1279 async fn project_old_syntax() {
1280 let exec = test::scan_partitioned(1);
1281 let schema = exec.schema();
1282 let expr = col("i", &schema).unwrap();
1283 ProjectionExec::try_new(
1284 vec![
1285 (expr, "c".to_string()),
1288 ],
1289 exec,
1290 )
1291 .unwrap();
1293 }
1294
1295 #[test]
1296 fn test_projection_statistics_uses_input_schema() {
1297 let input_schema = Schema::new(vec![
1298 Field::new("a", DataType::Int32, false),
1299 Field::new("b", DataType::Int32, false),
1300 Field::new("c", DataType::Int32, false),
1301 Field::new("d", DataType::Int32, false),
1302 Field::new("e", DataType::Int32, false),
1303 Field::new("f", DataType::Int32, false),
1304 ]);
1305
1306 let input_statistics = Statistics {
1307 num_rows: Precision::Exact(10),
1308 column_statistics: vec![
1309 ColumnStatistics {
1310 min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1311 max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1312 ..Default::default()
1313 },
1314 ColumnStatistics {
1315 min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1316 max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
1317 ..Default::default()
1318 },
1319 ColumnStatistics {
1320 min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1321 max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
1322 ..Default::default()
1323 },
1324 ColumnStatistics {
1325 min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
1326 max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
1327 ..Default::default()
1328 },
1329 ColumnStatistics {
1330 min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
1331 max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
1332 ..Default::default()
1333 },
1334 ColumnStatistics {
1335 min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
1336 max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
1337 ..Default::default()
1338 },
1339 ],
1340 ..Default::default()
1341 };
1342
1343 let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
1344
1345 let exprs: Vec<ProjectionExpr> = vec![
1350 ProjectionExpr {
1351 expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
1352 alias: "c_renamed".to_string(),
1353 },
1354 ProjectionExpr {
1355 expr: Arc::new(BinaryExpr::new(
1356 Arc::new(Column::new("e", 4)),
1357 Operator::Plus,
1358 Arc::new(Column::new("f", 5)),
1359 )) as Arc<dyn PhysicalExpr>,
1360 alias: "e_plus_f".to_string(),
1361 },
1362 ];
1363
1364 let projection = ProjectionExec::try_new(exprs, input).unwrap();
1365
1366 let stats = projection.partition_statistics(None).unwrap();
1367
1368 assert_eq!(stats.num_rows, Precision::Exact(10));
1369 assert_eq!(
1370 stats.column_statistics.len(),
1371 2,
1372 "Expected 2 columns in projection statistics"
1373 );
1374 assert!(stats.total_byte_size.is_exact().unwrap_or(false));
1375 }
1376
1377 #[test]
1378 fn test_filter_pushdown_with_alias() -> Result<()> {
1379 let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1380 let input = Arc::new(StatisticsExec::new(
1381 Statistics::new_unknown(&input_schema),
1382 input_schema.clone(),
1383 ));
1384
1385 let projection = ProjectionExec::try_new(
1387 vec![ProjectionExpr {
1388 expr: Arc::new(Column::new("a", 0)),
1389 alias: "b".to_string(),
1390 }],
1391 input,
1392 )?;
1393
1394 let filter = Arc::new(BinaryExpr::new(
1396 Arc::new(Column::new("b", 0)),
1397 Operator::Gt,
1398 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1399 )) as Arc<dyn PhysicalExpr>;
1400
1401 let description = projection.gather_filters_for_pushdown(
1402 FilterPushdownPhase::Post,
1403 vec![filter],
1404 &ConfigOptions::default(),
1405 )?;
1406
1407 let expected_filter = Arc::new(BinaryExpr::new(
1410 Arc::new(Column::new("a", 0)),
1411 Operator::Gt,
1412 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1413 )) as Arc<dyn PhysicalExpr>;
1414
1415 assert_eq!(description.self_filters(), vec![vec![]]);
1416 let pushed_filters = &description.parent_filters()[0];
1417 assert_eq!(
1418 format!("{}", pushed_filters[0].predicate),
1419 format!("{}", expected_filter)
1420 );
1421 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1423
1424 Ok(())
1425 }
1426
1427 #[test]
1428 fn test_filter_pushdown_with_multiple_aliases() -> Result<()> {
1429 let input_schema = Schema::new(vec![
1430 Field::new("a", DataType::Int32, false),
1431 Field::new("b", DataType::Int32, false),
1432 ]);
1433 let input = Arc::new(StatisticsExec::new(
1434 Statistics {
1435 column_statistics: vec![Default::default(); input_schema.fields().len()],
1436 ..Default::default()
1437 },
1438 input_schema.clone(),
1439 ));
1440
1441 let projection = ProjectionExec::try_new(
1443 vec![
1444 ProjectionExpr {
1445 expr: Arc::new(Column::new("a", 0)),
1446 alias: "x".to_string(),
1447 },
1448 ProjectionExpr {
1449 expr: Arc::new(Column::new("b", 1)),
1450 alias: "y".to_string(),
1451 },
1452 ],
1453 input,
1454 )?;
1455
1456 let filter1 = Arc::new(BinaryExpr::new(
1458 Arc::new(Column::new("x", 0)),
1459 Operator::Gt,
1460 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1461 )) as Arc<dyn PhysicalExpr>;
1462
1463 let filter2 = Arc::new(BinaryExpr::new(
1465 Arc::new(Column::new("y", 1)),
1466 Operator::Lt,
1467 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1468 )) as Arc<dyn PhysicalExpr>;
1469
1470 let description = projection.gather_filters_for_pushdown(
1471 FilterPushdownPhase::Post,
1472 vec![filter1, filter2],
1473 &ConfigOptions::default(),
1474 )?;
1475
1476 let expected_filter1 = Arc::new(BinaryExpr::new(
1478 Arc::new(Column::new("a", 0)),
1479 Operator::Gt,
1480 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1481 )) as Arc<dyn PhysicalExpr>;
1482
1483 let expected_filter2 = Arc::new(BinaryExpr::new(
1484 Arc::new(Column::new("b", 1)),
1485 Operator::Lt,
1486 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1487 )) as Arc<dyn PhysicalExpr>;
1488
1489 let pushed_filters = &description.parent_filters()[0];
1490 assert_eq!(pushed_filters.len(), 2);
1491 assert_eq!(
1493 format!("{}", pushed_filters[0].predicate),
1494 format!("{}", expected_filter1)
1495 );
1496 assert_eq!(
1497 format!("{}", pushed_filters[1].predicate),
1498 format!("{}", expected_filter2)
1499 );
1500 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1502 assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1503
1504 Ok(())
1505 }
1506
1507 #[test]
1508 fn test_filter_pushdown_with_swapped_aliases() -> Result<()> {
1509 let input_schema = Schema::new(vec![
1510 Field::new("a", DataType::Int32, false),
1511 Field::new("b", DataType::Int32, false),
1512 ]);
1513 let input = Arc::new(StatisticsExec::new(
1514 Statistics {
1515 column_statistics: vec![Default::default(); input_schema.fields().len()],
1516 ..Default::default()
1517 },
1518 input_schema.clone(),
1519 ));
1520
1521 let projection = ProjectionExec::try_new(
1523 vec![
1524 ProjectionExpr {
1525 expr: Arc::new(Column::new("a", 0)),
1526 alias: "b".to_string(),
1527 },
1528 ProjectionExpr {
1529 expr: Arc::new(Column::new("b", 1)),
1530 alias: "a".to_string(),
1531 },
1532 ],
1533 input,
1534 )?;
1535
1536 let filter1 = Arc::new(BinaryExpr::new(
1538 Arc::new(Column::new("b", 0)),
1539 Operator::Gt,
1540 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1541 )) as Arc<dyn PhysicalExpr>;
1542
1543 let filter2 = Arc::new(BinaryExpr::new(
1545 Arc::new(Column::new("a", 1)),
1546 Operator::Lt,
1547 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1548 )) as Arc<dyn PhysicalExpr>;
1549
1550 let description = projection.gather_filters_for_pushdown(
1551 FilterPushdownPhase::Post,
1552 vec![filter1, filter2],
1553 &ConfigOptions::default(),
1554 )?;
1555
1556 let pushed_filters = &description.parent_filters()[0];
1557 assert_eq!(pushed_filters.len(), 2);
1558
1559 let expected_filter1 = "a@0 > 5";
1561 let expected_filter2 = "b@1 < 10";
1563
1564 assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
1565 assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
1566 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1568 assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1569
1570 Ok(())
1571 }
1572
1573 #[test]
1574 fn test_filter_pushdown_with_mixed_columns() -> Result<()> {
1575 let input_schema = Schema::new(vec![
1576 Field::new("a", DataType::Int32, false),
1577 Field::new("b", DataType::Int32, false),
1578 ]);
1579 let input = Arc::new(StatisticsExec::new(
1580 Statistics {
1581 column_statistics: vec![Default::default(); input_schema.fields().len()],
1582 ..Default::default()
1583 },
1584 input_schema.clone(),
1585 ));
1586
1587 let projection = ProjectionExec::try_new(
1589 vec![
1590 ProjectionExpr {
1591 expr: Arc::new(Column::new("a", 0)),
1592 alias: "x".to_string(),
1593 },
1594 ProjectionExpr {
1595 expr: Arc::new(Column::new("b", 1)),
1596 alias: "b".to_string(),
1597 },
1598 ],
1599 input,
1600 )?;
1601
1602 let filter1 = Arc::new(BinaryExpr::new(
1604 Arc::new(Column::new("x", 0)),
1605 Operator::Gt,
1606 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1607 )) as Arc<dyn PhysicalExpr>;
1608
1609 let filter2 = Arc::new(BinaryExpr::new(
1611 Arc::new(Column::new("b", 1)),
1612 Operator::Lt,
1613 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1614 )) as Arc<dyn PhysicalExpr>;
1615
1616 let description = projection.gather_filters_for_pushdown(
1617 FilterPushdownPhase::Post,
1618 vec![filter1, filter2],
1619 &ConfigOptions::default(),
1620 )?;
1621
1622 let pushed_filters = &description.parent_filters()[0];
1623 assert_eq!(pushed_filters.len(), 2);
1624 let expected_filter1 = "a@0 > 5";
1626 let expected_filter2 = "b@1 < 10";
1628
1629 assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
1630 assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
1631 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1633 assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1634
1635 Ok(())
1636 }
1637
1638 #[test]
1639 fn test_filter_pushdown_with_complex_expression() -> Result<()> {
1640 let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1641 let input = Arc::new(StatisticsExec::new(
1642 Statistics {
1643 column_statistics: vec![Default::default(); input_schema.fields().len()],
1644 ..Default::default()
1645 },
1646 input_schema.clone(),
1647 ));
1648
1649 let projection = ProjectionExec::try_new(
1651 vec![ProjectionExpr {
1652 expr: Arc::new(BinaryExpr::new(
1653 Arc::new(Column::new("a", 0)),
1654 Operator::Plus,
1655 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1656 )),
1657 alias: "z".to_string(),
1658 }],
1659 input,
1660 )?;
1661
1662 let filter = Arc::new(BinaryExpr::new(
1664 Arc::new(Column::new("z", 0)),
1665 Operator::Gt,
1666 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1667 )) as Arc<dyn PhysicalExpr>;
1668
1669 let description = projection.gather_filters_for_pushdown(
1670 FilterPushdownPhase::Post,
1671 vec![filter],
1672 &ConfigOptions::default(),
1673 )?;
1674
1675 let pushed_filters = &description.parent_filters()[0];
1677 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1678 assert_eq!(format!("{}", pushed_filters[0].predicate), "a@0 + 1 > 10");
1679
1680 Ok(())
1681 }
1682
1683 #[test]
1684 fn test_filter_pushdown_with_unknown_column() -> Result<()> {
1685 let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1686 let input = Arc::new(StatisticsExec::new(
1687 Statistics {
1688 column_statistics: vec![Default::default(); input_schema.fields().len()],
1689 ..Default::default()
1690 },
1691 input_schema.clone(),
1692 ));
1693
1694 let projection = ProjectionExec::try_new(
1696 vec![ProjectionExpr {
1697 expr: Arc::new(Column::new("a", 0)),
1698 alias: "a".to_string(),
1699 }],
1700 input,
1701 )?;
1702
1703 let filter = Arc::new(BinaryExpr::new(
1706 Arc::new(Column::new("unknown_col", 1)),
1707 Operator::Gt,
1708 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1709 )) as Arc<dyn PhysicalExpr>;
1710
1711 let description = projection.gather_filters_for_pushdown(
1712 FilterPushdownPhase::Post,
1713 vec![filter],
1714 &ConfigOptions::default(),
1715 )?;
1716
1717 let pushed_filters = &description.parent_filters()[0];
1718 assert!(matches!(pushed_filters[0].discriminant, PushedDown::No));
1719 assert_eq!(
1721 format!("{}", pushed_filters[0].predicate),
1722 "unknown_col@1 > 5"
1723 );
1724
1725 Ok(())
1726 }
1727
1728 #[test]
1732 fn test_basic_dyn_filter_projection_pushdown_update_child() -> Result<()> {
1733 let input_schema =
1734 Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)]));
1735
1736 let input = Arc::new(StatisticsExec::new(
1737 Statistics {
1738 column_statistics: vec![Default::default(); input_schema.fields().len()],
1739 ..Default::default()
1740 },
1741 input_schema.as_ref().clone(),
1742 ));
1743
1744 let projection = ProjectionExec::try_new(
1746 vec![ProjectionExpr {
1747 expr: binary(
1748 Arc::new(Column::new("b", 0)),
1749 Operator::Minus,
1750 lit(1),
1751 &input_schema,
1752 )
1753 .unwrap(),
1754 alias: "a".to_string(),
1755 }],
1756 input,
1757 )?;
1758
1759 let projected_schema = projection.schema();
1761 let col_a = col("a", &projected_schema)?;
1762 let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
1763 vec![Arc::clone(&col_a)],
1764 lit(true),
1765 ));
1766 let current = dynamic_filter.current()?;
1768 assert_eq!(format!("{current}"), "true");
1769
1770 let dyn_phy_expr: Arc<dyn PhysicalExpr> = Arc::clone(&dynamic_filter) as _;
1771
1772 let description = projection.gather_filters_for_pushdown(
1773 FilterPushdownPhase::Post,
1774 vec![dyn_phy_expr],
1775 &ConfigOptions::default(),
1776 )?;
1777
1778 let pushed_filters = &description.parent_filters()[0][0];
1779
1780 assert_eq!(
1782 format!("{}", pushed_filters.predicate),
1783 "DynamicFilter [ empty ]"
1784 );
1785
1786 let new_expr =
1788 Arc::new(BinaryExpr::new(Arc::clone(&col_a), Operator::Gt, lit(5i32)));
1789 dynamic_filter.update(new_expr)?;
1790
1791 let current = dynamic_filter.current()?;
1793 assert_eq!(format!("{current}"), "a@0 > 5");
1794
1795 assert_eq!(
1797 format!("{}", pushed_filters.predicate),
1798 "DynamicFilter [ b@0 - 1 > 5 ]"
1799 );
1800
1801 Ok(())
1802 }
1803}