1use super::expressions::Column;
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26 DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27 SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
28};
29use crate::column_rewriter::PhysicalColumnRewriter;
30use crate::execution_plan::CardinalityEffect;
31use crate::filter_pushdown::{
32 ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
33 FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
34};
35use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
36use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
37use std::any::Any;
38use std::collections::HashMap;
39use std::pin::Pin;
40use std::sync::Arc;
41use std::task::{Context, Poll};
42
43use arrow::datatypes::SchemaRef;
44use arrow::record_batch::RecordBatch;
45use datafusion_common::config::ConfigOptions;
46use datafusion_common::tree_node::{
47 Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
48};
49use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
50use datafusion_execution::TaskContext;
51use datafusion_expr::ExpressionPlacement;
52use datafusion_physical_expr::equivalence::ProjectionMapping;
53use datafusion_physical_expr::projection::Projector;
54use datafusion_physical_expr::utils::collect_columns;
55use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
56use datafusion_physical_expr_common::sort_expr::{
57 LexOrdering, LexRequirement, PhysicalSortExpr,
58};
59pub use datafusion_physical_expr::projection::{
62 ProjectionExpr, ProjectionExprs, update_expr,
63};
64
65use futures::stream::{Stream, StreamExt};
66use log::trace;
67
68#[derive(Debug, Clone)]
73pub struct ProjectionExec {
74 projector: Projector,
77 input: Arc<dyn ExecutionPlan>,
79 metrics: ExecutionPlanMetricsSet,
81 cache: Arc<PlanProperties>,
83}
84
85impl ProjectionExec {
86 pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
136 where
137 I: IntoIterator<Item = E>,
138 E: Into<ProjectionExpr>,
139 {
140 let input_schema = input.schema();
141 let expr_arc = expr.into_iter().map(Into::into).collect::<Arc<_>>();
142 let projection = ProjectionExprs::from_expressions(expr_arc);
143 let projector = projection.make_projector(&input_schema)?;
144 Self::try_from_projector(projector, input)
145 }
146
147 fn try_from_projector(
148 projector: Projector,
149 input: Arc<dyn ExecutionPlan>,
150 ) -> Result<Self> {
151 let projection_mapping =
153 projector.projection().projection_mapping(&input.schema())?;
154 let cache = Self::compute_properties(
155 &input,
156 &projection_mapping,
157 Arc::clone(projector.output_schema()),
158 )?;
159 Ok(Self {
160 projector,
161 input,
162 metrics: ExecutionPlanMetricsSet::new(),
163 cache: Arc::new(cache),
164 })
165 }
166
167 pub fn expr(&self) -> &[ProjectionExpr] {
169 self.projector.projection().as_ref()
170 }
171
172 pub fn projection_expr(&self) -> &ProjectionExprs {
174 self.projector.projection()
175 }
176
177 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
179 &self.input
180 }
181
182 fn compute_properties(
184 input: &Arc<dyn ExecutionPlan>,
185 projection_mapping: &ProjectionMapping,
186 schema: SchemaRef,
187 ) -> Result<PlanProperties> {
188 let input_eq_properties = input.equivalence_properties();
190 let eq_properties = input_eq_properties.project(projection_mapping, schema);
191 let output_partitioning = input
193 .output_partitioning()
194 .project(projection_mapping, input_eq_properties);
195
196 Ok(PlanProperties::new(
197 eq_properties,
198 output_partitioning,
199 input.pipeline_behavior(),
200 input.boundedness(),
201 ))
202 }
203
204 fn collect_reverse_alias(
207 &self,
208 ) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
209 let mut alias_map = datafusion_common::HashMap::new();
210 for projection in self.projection_expr().iter() {
211 let (aliased_index, _output_field) = self
212 .projector
213 .output_schema()
214 .column_with_name(&projection.alias)
215 .ok_or_else(|| {
216 DataFusionError::Internal(format!(
217 "Expr {} with alias {} not found in output schema",
218 projection.expr, projection.alias
219 ))
220 })?;
221 let aliased_col = Column::new(&projection.alias, aliased_index);
222 alias_map.insert(aliased_col, Arc::clone(&projection.expr));
223 }
224 Ok(alias_map)
225 }
226
227 fn with_new_children_and_same_properties(
228 &self,
229 mut children: Vec<Arc<dyn ExecutionPlan>>,
230 ) -> Self {
231 Self {
232 input: children.swap_remove(0),
233 metrics: ExecutionPlanMetricsSet::new(),
234 ..Self::clone(self)
235 }
236 }
237}
238
239impl DisplayAs for ProjectionExec {
240 fn fmt_as(
241 &self,
242 t: DisplayFormatType,
243 f: &mut std::fmt::Formatter,
244 ) -> std::fmt::Result {
245 match t {
246 DisplayFormatType::Default | DisplayFormatType::Verbose => {
247 let expr: Vec<String> = self
248 .projector
249 .projection()
250 .as_ref()
251 .iter()
252 .map(|proj_expr| {
253 let e = proj_expr.expr.to_string();
254 if e != proj_expr.alias {
255 format!("{e} as {}", proj_expr.alias)
256 } else {
257 e
258 }
259 })
260 .collect();
261
262 write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
263 }
264 DisplayFormatType::TreeRender => {
265 for (i, proj_expr) in self.expr().iter().enumerate() {
266 let expr_sql = fmt_sql(proj_expr.expr.as_ref());
267 if proj_expr.expr.to_string() == proj_expr.alias {
268 writeln!(f, "expr{i}={expr_sql}")?;
269 } else {
270 writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
271 }
272 }
273
274 Ok(())
275 }
276 }
277 }
278}
279
280impl ExecutionPlan for ProjectionExec {
281 fn name(&self) -> &'static str {
282 "ProjectionExec"
283 }
284
285 fn as_any(&self) -> &dyn Any {
287 self
288 }
289
290 fn properties(&self) -> &Arc<PlanProperties> {
291 &self.cache
292 }
293
294 fn maintains_input_order(&self) -> Vec<bool> {
295 vec![true]
297 }
298
299 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
300 let all_simple_exprs =
301 self.projector
302 .projection()
303 .as_ref()
304 .iter()
305 .all(|proj_expr| {
306 !matches!(
307 proj_expr.expr.placement(),
308 ExpressionPlacement::KeepInPlace
309 )
310 });
311 vec![!all_simple_exprs]
315 }
316
317 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
318 vec![&self.input]
319 }
320
321 fn with_new_children(
322 self: Arc<Self>,
323 mut children: Vec<Arc<dyn ExecutionPlan>>,
324 ) -> Result<Arc<dyn ExecutionPlan>> {
325 check_if_same_properties!(self, children);
326 ProjectionExec::try_from_projector(
327 self.projector.clone(),
328 children.swap_remove(0),
329 )
330 .map(|p| Arc::new(p) as _)
331 }
332
333 fn execute(
334 &self,
335 partition: usize,
336 context: Arc<TaskContext>,
337 ) -> Result<SendableRecordBatchStream> {
338 trace!(
339 "Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}",
340 partition,
341 context.session_id(),
342 context.task_id()
343 );
344
345 let projector = self.projector.with_metrics(&self.metrics, partition);
346 Ok(Box::pin(ProjectionStream::new(
347 projector,
348 self.input.execute(partition, context)?,
349 BaselineMetrics::new(&self.metrics, partition),
350 )?))
351 }
352
353 fn metrics(&self) -> Option<MetricsSet> {
354 Some(self.metrics.clone_inner())
355 }
356
357 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
358 let input_stats = self.input.partition_statistics(partition)?;
359 let output_schema = self.schema();
360 self.projector
361 .projection()
362 .project_statistics(input_stats, &output_schema)
363 }
364
365 fn supports_limit_pushdown(&self) -> bool {
366 true
367 }
368
369 fn cardinality_effect(&self) -> CardinalityEffect {
370 CardinalityEffect::Equal
371 }
372
373 fn try_swapping_with_projection(
374 &self,
375 projection: &ProjectionExec,
376 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
377 let maybe_unified = try_unifying_projections(projection, self)?;
378 if let Some(new_plan) = maybe_unified {
379 remove_unnecessary_projections(new_plan).data().map(Some)
381 } else {
382 Ok(Some(Arc::new(projection.clone())))
383 }
384 }
385
386 fn gather_filters_for_pushdown(
387 &self,
388 _phase: FilterPushdownPhase,
389 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
390 _config: &ConfigOptions,
391 ) -> Result<FilterDescription> {
392 let invert_alias_map = self.collect_reverse_alias()?;
394 let output_schema = self.schema();
395 let remapper = FilterRemapper::new(output_schema);
396 let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
397
398 for filter in parent_filters {
399 if let Some(reassigned) = remapper.try_remap(&filter)? {
401 let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
403 let rewritten = reassigned.rewrite(&mut rewriter)?.data;
404 child_parent_filters.push(PushedDownPredicate::supported(rewritten));
405 } else {
406 child_parent_filters.push(PushedDownPredicate::unsupported(filter));
407 }
408 }
409
410 Ok(FilterDescription::new().with_child(ChildFilterDescription {
411 parent_filters: child_parent_filters,
412 self_filters: vec![],
413 }))
414 }
415
416 fn handle_child_pushdown_result(
417 &self,
418 _phase: FilterPushdownPhase,
419 child_pushdown_result: ChildPushdownResult,
420 _config: &ConfigOptions,
421 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
422 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
423 }
424
425 fn try_pushdown_sort(
426 &self,
427 order: &[PhysicalSortExpr],
428 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
429 let child = self.input();
430 let mut child_order = Vec::new();
431
432 for sort_expr in order {
434 let mut can_pushdown = true;
436 let transformed = Arc::clone(&sort_expr.expr).transform(|expr| {
437 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
438 if col.index() >= self.expr().len() {
441 can_pushdown = false;
442 return Ok(Transformed::no(expr));
443 }
444
445 let proj_expr = &self.expr()[col.index()];
446
447 if let Some(child_col) =
451 proj_expr.expr.as_any().downcast_ref::<Column>()
452 {
453 Ok(Transformed::yes(Arc::new(child_col.clone()) as _))
455 } else {
456 can_pushdown = false;
458 Ok(Transformed::no(expr))
459 }
460 } else {
461 Ok(Transformed::no(expr))
462 }
463 })?;
464
465 if !can_pushdown {
466 return Ok(SortOrderPushdownResult::Unsupported);
467 }
468
469 child_order.push(PhysicalSortExpr {
470 expr: transformed.data,
471 options: sort_expr.options,
472 });
473 }
474
475 match child.try_pushdown_sort(&child_order)? {
477 SortOrderPushdownResult::Exact { inner } => {
478 let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
479 Ok(SortOrderPushdownResult::Exact { inner: new_exec })
480 }
481 SortOrderPushdownResult::Inexact { inner } => {
482 let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
483 Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
484 }
485 SortOrderPushdownResult::Unsupported => {
486 Ok(SortOrderPushdownResult::Unsupported)
487 }
488 }
489 }
490
491 fn with_preserve_order(
492 &self,
493 preserve_order: bool,
494 ) -> Option<Arc<dyn ExecutionPlan>> {
495 self.input
496 .with_preserve_order(preserve_order)
497 .and_then(|new_input| {
498 Arc::new(self.clone())
499 .with_new_children(vec![new_input])
500 .ok()
501 })
502 }
503}
504
505impl ProjectionStream {
506 fn new(
508 projector: Projector,
509 input: SendableRecordBatchStream,
510 baseline_metrics: BaselineMetrics,
511 ) -> Result<Self> {
512 Ok(Self {
513 projector,
514 input,
515 baseline_metrics,
516 })
517 }
518
519 fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
520 let _timer = self.baseline_metrics.elapsed_compute().timer();
522 self.projector.project_batch(batch)
523 }
524}
525
526struct ProjectionStream {
528 projector: Projector,
529 input: SendableRecordBatchStream,
530 baseline_metrics: BaselineMetrics,
531}
532
533impl Stream for ProjectionStream {
534 type Item = Result<RecordBatch>;
535
536 fn poll_next(
537 mut self: Pin<&mut Self>,
538 cx: &mut Context<'_>,
539 ) -> Poll<Option<Self::Item>> {
540 let poll = self.input.poll_next_unpin(cx).map(|x| match x {
541 Some(Ok(batch)) => Some(self.batch_project(&batch)),
542 other => other,
543 });
544
545 self.baseline_metrics.record_poll(poll)
546 }
547
548 fn size_hint(&self) -> (usize, Option<usize>) {
549 self.input.size_hint()
551 }
552}
553
554impl RecordBatchStream for ProjectionStream {
555 fn schema(&self) -> SchemaRef {
557 Arc::clone(self.projector.output_schema())
558 }
559}
560
561pub trait EmbeddedProjection: ExecutionPlan + Sized {
571 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
572}
573
574pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
577 projection: &ProjectionExec,
578 execution_plan: &Exec,
579) -> Result<Option<Arc<dyn ExecutionPlan>>> {
580 if projection.expr().is_empty() {
585 let new_execution_plan = Arc::new(execution_plan.with_projection(Some(vec![]))?);
586 return Ok(Some(new_execution_plan));
587 }
588
589 let projection_index = collect_column_indices(projection.expr());
591
592 if projection_index.is_empty() {
593 return Ok(None);
594 };
595
596 if projection_index.len() == projection_index.last().unwrap() + 1
599 && projection_index.len() == execution_plan.schema().fields().len()
600 {
601 return Ok(None);
602 }
603
604 let new_execution_plan =
605 Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
606
607 let embed_project_exprs = projection_index
609 .iter()
610 .zip(new_execution_plan.schema().fields())
611 .map(|(index, field)| ProjectionExpr {
612 expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
613 alias: field.name().to_owned(),
614 })
615 .collect::<Vec<_>>();
616
617 let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
618
619 for proj_expr in projection.expr() {
620 let Some(expr) =
622 update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
623 else {
624 return Ok(None);
625 };
626 new_projection_exprs.push(ProjectionExpr {
627 expr,
628 alias: proj_expr.alias.clone(),
629 });
630 }
631 let new_projection = Arc::new(ProjectionExec::try_new(
633 new_projection_exprs,
634 Arc::clone(&new_execution_plan) as _,
635 )?);
636 if is_projection_removable(&new_projection) {
637 Ok(Some(new_execution_plan))
638 } else {
639 Ok(Some(new_projection))
640 }
641}
642
643pub struct JoinData {
644 pub projected_left_child: ProjectionExec,
645 pub projected_right_child: ProjectionExec,
646 pub join_filter: Option<JoinFilter>,
647 pub join_on: JoinOn,
648}
649
650pub fn try_pushdown_through_join(
651 projection: &ProjectionExec,
652 join_left: &Arc<dyn ExecutionPlan>,
653 join_right: &Arc<dyn ExecutionPlan>,
654 join_on: JoinOnRef,
655 schema: &SchemaRef,
656 filter: Option<&JoinFilter>,
657) -> Result<Option<JoinData>> {
658 let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
660 return Ok(None);
661 };
662
663 let (far_right_left_col_ind, far_left_right_col_ind) =
664 join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
665
666 if !join_allows_pushdown(
667 &projection_as_columns,
668 schema,
669 far_right_left_col_ind,
670 far_left_right_col_ind,
671 ) {
672 return Ok(None);
673 }
674
675 let new_filter = if let Some(filter) = filter {
676 match update_join_filter(
677 &projection_as_columns[0..=far_right_left_col_ind as _],
678 &projection_as_columns[far_left_right_col_ind as _..],
679 filter,
680 join_left.schema().fields().len(),
681 ) {
682 Some(updated_filter) => Some(updated_filter),
683 None => return Ok(None),
684 }
685 } else {
686 None
687 };
688
689 let Some(new_on) = update_join_on(
690 &projection_as_columns[0..=far_right_left_col_ind as _],
691 &projection_as_columns[far_left_right_col_ind as _..],
692 join_on,
693 join_left.schema().fields().len(),
694 ) else {
695 return Ok(None);
696 };
697
698 let (new_left, new_right) = new_join_children(
699 &projection_as_columns,
700 far_right_left_col_ind,
701 far_left_right_col_ind,
702 join_left,
703 join_right,
704 )?;
705
706 Ok(Some(JoinData {
707 projected_left_child: new_left,
708 projected_right_child: new_right,
709 join_filter: new_filter,
710 join_on: new_on,
711 }))
712}
713
714pub fn remove_unnecessary_projections(
719 plan: Arc<dyn ExecutionPlan>,
720) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
721 let maybe_modified =
722 if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
723 if is_projection_removable(projection) {
726 return Ok(Transformed::yes(Arc::clone(projection.input())));
727 }
728 projection
730 .input()
731 .try_swapping_with_projection(projection)?
732 } else {
733 return Ok(Transformed::no(plan));
734 };
735 Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
736}
737
738fn is_projection_removable(projection: &ProjectionExec) -> bool {
743 let exprs = projection.expr();
744 exprs.iter().enumerate().all(|(idx, proj_expr)| {
745 let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() else {
746 return false;
747 };
748 col.name() == proj_expr.alias && col.index() == idx
749 }) && exprs.len() == projection.input().schema().fields().len()
750}
751
752pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
755 exprs.iter().all(|proj_expr| {
756 proj_expr
757 .expr
758 .as_any()
759 .downcast_ref::<Column>()
760 .map(|column| column.name() == proj_expr.alias)
761 .unwrap_or(false)
762 })
763}
764
765pub fn new_projections_for_columns(
769 projection: &[ProjectionExpr],
770 source: &[usize],
771) -> Vec<usize> {
772 projection
773 .iter()
774 .filter_map(|proj_expr| {
775 proj_expr
776 .expr
777 .as_any()
778 .downcast_ref::<Column>()
779 .map(|expr| source[expr.index()])
780 })
781 .collect()
782}
783
784pub fn make_with_child(
787 projection: &ProjectionExec,
788 child: &Arc<dyn ExecutionPlan>,
789) -> Result<Arc<dyn ExecutionPlan>> {
790 ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
791 .map(|e| Arc::new(e) as _)
792}
793
794pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
796 exprs
797 .iter()
798 .all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
799}
800
801pub fn update_ordering(
804 ordering: LexOrdering,
805 projected_exprs: &[ProjectionExpr],
806) -> Result<Option<LexOrdering>> {
807 let mut updated_exprs = vec![];
808 for mut sort_expr in ordering.into_iter() {
809 let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
810 else {
811 return Ok(None);
812 };
813 sort_expr.expr = updated_expr;
814 updated_exprs.push(sort_expr);
815 }
816 Ok(LexOrdering::new(updated_exprs))
817}
818
819pub fn update_ordering_requirement(
822 reqs: LexRequirement,
823 projected_exprs: &[ProjectionExpr],
824) -> Result<Option<LexRequirement>> {
825 let mut updated_exprs = vec![];
826 for mut sort_expr in reqs.into_iter() {
827 let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
828 else {
829 return Ok(None);
830 };
831 sort_expr.expr = updated_expr;
832 updated_exprs.push(sort_expr);
833 }
834 Ok(LexRequirement::new(updated_exprs))
835}
836
837pub fn physical_to_column_exprs(
840 exprs: &[ProjectionExpr],
841) -> Option<Vec<(Column, String)>> {
842 exprs
843 .iter()
844 .map(|proj_expr| {
845 proj_expr
846 .expr
847 .as_any()
848 .downcast_ref::<Column>()
849 .map(|col| (col.clone(), proj_expr.alias.clone()))
850 })
851 .collect()
852}
853
854pub fn new_join_children(
858 projection_as_columns: &[(Column, String)],
859 far_right_left_col_ind: i32,
860 far_left_right_col_ind: i32,
861 left_child: &Arc<dyn ExecutionPlan>,
862 right_child: &Arc<dyn ExecutionPlan>,
863) -> Result<(ProjectionExec, ProjectionExec)> {
864 let new_left = ProjectionExec::try_new(
865 projection_as_columns[0..=far_right_left_col_ind as _]
866 .iter()
867 .map(|(col, alias)| ProjectionExpr {
868 expr: Arc::new(Column::new(col.name(), col.index())) as _,
869 alias: alias.clone(),
870 }),
871 Arc::clone(left_child),
872 )?;
873 let left_size = left_child.schema().fields().len() as i32;
874 let new_right = ProjectionExec::try_new(
875 projection_as_columns[far_left_right_col_ind as _..]
876 .iter()
877 .map(|(col, alias)| {
878 ProjectionExpr {
879 expr: Arc::new(Column::new(
880 col.name(),
881 (col.index() as i32 - left_size) as _,
884 )) as _,
885 alias: alias.clone(),
886 }
887 }),
888 Arc::clone(right_child),
889 )?;
890
891 Ok((new_left, new_right))
892}
893
894pub fn join_allows_pushdown(
900 projection_as_columns: &[(Column, String)],
901 join_schema: &SchemaRef,
902 far_right_left_col_ind: i32,
903 far_left_right_col_ind: i32,
904) -> bool {
905 projection_as_columns.len() < join_schema.fields().len()
907 && (far_right_left_col_ind + 1 == far_left_right_col_ind)
909 && far_right_left_col_ind >= 0
911 && far_left_right_col_ind < projection_as_columns.len() as i32
912}
913
914pub fn join_table_borders(
920 left_table_column_count: usize,
921 projection_as_columns: &[(Column, String)],
922) -> (i32, i32) {
923 let far_right_left_col_ind = projection_as_columns
924 .iter()
925 .enumerate()
926 .take_while(|(_, (projection_column, _))| {
927 projection_column.index() < left_table_column_count
928 })
929 .last()
930 .map(|(index, _)| index as i32)
931 .unwrap_or(-1);
932
933 let far_left_right_col_ind = projection_as_columns
934 .iter()
935 .enumerate()
936 .rev()
937 .take_while(|(_, (projection_column, _))| {
938 projection_column.index() >= left_table_column_count
939 })
940 .last()
941 .map(|(index, _)| index as i32)
942 .unwrap_or(projection_as_columns.len() as i32);
943
944 (far_right_left_col_ind, far_left_right_col_ind)
945}
946
947pub fn update_join_on(
950 proj_left_exprs: &[(Column, String)],
951 proj_right_exprs: &[(Column, String)],
952 hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
953 left_field_size: usize,
954) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
955 let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
956 .iter()
957 .map(|(left, right)| (left, right))
958 .unzip();
959
960 let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
961 let new_right_columns =
962 new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
963
964 match (new_left_columns, new_right_columns) {
965 (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
966 _ => None,
967 }
968}
969
970pub fn update_join_filter(
973 projection_left_exprs: &[(Column, String)],
974 projection_right_exprs: &[(Column, String)],
975 join_filter: &JoinFilter,
976 left_field_size: usize,
977) -> Option<JoinFilter> {
978 let mut new_left_indices = new_indices_for_join_filter(
979 join_filter,
980 JoinSide::Left,
981 projection_left_exprs,
982 0,
983 )
984 .into_iter();
985 let mut new_right_indices = new_indices_for_join_filter(
986 join_filter,
987 JoinSide::Right,
988 projection_right_exprs,
989 left_field_size,
990 )
991 .into_iter();
992
993 (new_right_indices.len() + new_left_indices.len()
995 == join_filter.column_indices().len())
996 .then(|| {
997 JoinFilter::new(
998 Arc::clone(join_filter.expression()),
999 join_filter
1000 .column_indices()
1001 .iter()
1002 .map(|col_idx| ColumnIndex {
1003 index: if col_idx.side == JoinSide::Left {
1004 new_left_indices.next().unwrap()
1005 } else {
1006 new_right_indices.next().unwrap()
1007 },
1008 side: col_idx.side,
1009 })
1010 .collect(),
1011 Arc::clone(join_filter.schema()),
1012 )
1013 })
1014}
1015
1016fn try_unifying_projections(
1018 projection: &ProjectionExec,
1019 child: &ProjectionExec,
1020) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1021 let mut projected_exprs = vec![];
1022 let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
1023
1024 projection.expr().iter().for_each(|proj_expr| {
1026 proj_expr
1027 .expr
1028 .apply(|expr| {
1029 Ok({
1030 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
1031 *column_ref_map.entry(column.clone()).or_default() += 1;
1032 }
1033 TreeNodeRecursion::Continue
1034 })
1035 })
1036 .unwrap();
1037 });
1038 if column_ref_map.iter().any(|(column, count)| {
1043 *count > 1
1044 && !child.expr()[column.index()]
1045 .expr
1046 .placement()
1047 .should_push_to_leaves()
1048 }) {
1049 return Ok(None);
1050 }
1051 for proj_expr in projection.expr() {
1052 let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
1056 return Ok(None);
1057 };
1058 projected_exprs.push(ProjectionExpr {
1059 expr,
1060 alias: proj_expr.alias.clone(),
1061 });
1062 }
1063 ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
1064 .map(|e| Some(Arc::new(e) as _))
1065}
1066
1067fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
1069 let mut indices = exprs
1071 .iter()
1072 .flat_map(|proj_expr| collect_columns(&proj_expr.expr))
1073 .map(|x| x.index())
1074 .collect::<std::collections::HashSet<_>>()
1075 .into_iter()
1076 .collect::<Vec<_>>();
1077 indices.sort();
1078 indices
1079}
1080
1081fn new_indices_for_join_filter(
1089 join_filter: &JoinFilter,
1090 join_side: JoinSide,
1091 projection_exprs: &[(Column, String)],
1092 column_index_offset: usize,
1093) -> Vec<usize> {
1094 join_filter
1095 .column_indices()
1096 .iter()
1097 .filter(|col_idx| col_idx.side == join_side)
1098 .filter_map(|col_idx| {
1099 projection_exprs
1100 .iter()
1101 .position(|(col, _)| col_idx.index + column_index_offset == col.index())
1102 })
1103 .collect()
1104}
1105
1106fn new_columns_for_join_on(
1114 hash_join_on: &[&PhysicalExprRef],
1115 projection_exprs: &[(Column, String)],
1116 column_index_offset: usize,
1117) -> Option<Vec<PhysicalExprRef>> {
1118 let new_columns = hash_join_on
1119 .iter()
1120 .filter_map(|on| {
1121 Arc::clone(*on)
1123 .transform(|expr| {
1124 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
1125 let new_column = projection_exprs
1127 .iter()
1128 .enumerate()
1129 .find(|(_, (proj_column, _))| {
1130 column.name() == proj_column.name()
1131 && column.index() + column_index_offset
1132 == proj_column.index()
1133 })
1134 .map(|(index, (_, alias))| Column::new(alias, index));
1135 if let Some(new_column) = new_column {
1136 Ok(Transformed::yes(Arc::new(new_column)))
1137 } else {
1138 internal_err!(
1142 "Column {:?} not found in projection expressions",
1143 column
1144 )
1145 }
1146 } else {
1147 Ok(Transformed::no(expr))
1148 }
1149 })
1150 .data()
1151 .ok()
1152 })
1153 .collect::<Vec<_>>();
1154 (new_columns.len() == hash_join_on.len()).then_some(new_columns)
1155}
1156
1157#[cfg(test)]
1158mod tests {
1159 use super::*;
1160 use std::sync::Arc;
1161
1162 use crate::common::collect;
1163
1164 use crate::filter_pushdown::PushedDown;
1165 use crate::test;
1166 use crate::test::exec::StatisticsExec;
1167
1168 use arrow::datatypes::{DataType, Field, Schema};
1169 use datafusion_common::ScalarValue;
1170 use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
1171
1172 use datafusion_expr::Operator;
1173 use datafusion_physical_expr::expressions::{
1174 BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, binary, col, lit,
1175 };
1176
1177 #[test]
1178 fn test_collect_column_indices() -> Result<()> {
1179 let expr = Arc::new(BinaryExpr::new(
1180 Arc::new(Column::new("b", 7)),
1181 Operator::Minus,
1182 Arc::new(BinaryExpr::new(
1183 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1184 Operator::Plus,
1185 Arc::new(Column::new("a", 1)),
1186 )),
1187 ));
1188 let column_indices = collect_column_indices(&[ProjectionExpr {
1189 expr,
1190 alias: "b-(1+a)".to_string(),
1191 }]);
1192 assert_eq!(column_indices, vec![1, 7]);
1193 Ok(())
1194 }
1195
1196 #[test]
1197 fn test_join_table_borders() -> Result<()> {
1198 let projections = vec![
1199 (Column::new("b", 1), "b".to_owned()),
1200 (Column::new("c", 2), "c".to_owned()),
1201 (Column::new("e", 4), "e".to_owned()),
1202 (Column::new("d", 3), "d".to_owned()),
1203 (Column::new("c", 2), "c".to_owned()),
1204 (Column::new("f", 5), "f".to_owned()),
1205 (Column::new("h", 7), "h".to_owned()),
1206 (Column::new("g", 6), "g".to_owned()),
1207 ];
1208 let left_table_column_count = 5;
1209 assert_eq!(
1210 join_table_borders(left_table_column_count, &projections),
1211 (4, 5)
1212 );
1213
1214 let left_table_column_count = 8;
1215 assert_eq!(
1216 join_table_borders(left_table_column_count, &projections),
1217 (7, 8)
1218 );
1219
1220 let left_table_column_count = 1;
1221 assert_eq!(
1222 join_table_borders(left_table_column_count, &projections),
1223 (-1, 0)
1224 );
1225
1226 let projections = vec![
1227 (Column::new("a", 0), "a".to_owned()),
1228 (Column::new("b", 1), "b".to_owned()),
1229 (Column::new("d", 3), "d".to_owned()),
1230 (Column::new("g", 6), "g".to_owned()),
1231 (Column::new("e", 4), "e".to_owned()),
1232 (Column::new("f", 5), "f".to_owned()),
1233 (Column::new("e", 4), "e".to_owned()),
1234 (Column::new("h", 7), "h".to_owned()),
1235 ];
1236 let left_table_column_count = 5;
1237 assert_eq!(
1238 join_table_borders(left_table_column_count, &projections),
1239 (2, 7)
1240 );
1241
1242 let left_table_column_count = 7;
1243 assert_eq!(
1244 join_table_borders(left_table_column_count, &projections),
1245 (6, 7)
1246 );
1247
1248 Ok(())
1249 }
1250
1251 #[tokio::test]
1252 async fn project_no_column() -> Result<()> {
1253 let task_ctx = Arc::new(TaskContext::default());
1254
1255 let exec = test::scan_partitioned(1);
1256 let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
1257
1258 let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
1259 let stream = projection.execute(0, Arc::clone(&task_ctx))?;
1260 let output = collect(stream).await?;
1261 assert_eq!(output.len(), expected.len());
1262
1263 Ok(())
1264 }
1265
1266 #[tokio::test]
1267 async fn project_old_syntax() {
1268 let exec = test::scan_partitioned(1);
1269 let schema = exec.schema();
1270 let expr = col("i", &schema).unwrap();
1271 ProjectionExec::try_new(
1272 vec![
1273 (expr, "c".to_string()),
1276 ],
1277 exec,
1278 )
1279 .unwrap();
1281 }
1282
1283 #[test]
1284 fn test_projection_statistics_uses_input_schema() {
1285 let input_schema = Schema::new(vec![
1286 Field::new("a", DataType::Int32, false),
1287 Field::new("b", DataType::Int32, false),
1288 Field::new("c", DataType::Int32, false),
1289 Field::new("d", DataType::Int32, false),
1290 Field::new("e", DataType::Int32, false),
1291 Field::new("f", DataType::Int32, false),
1292 ]);
1293
1294 let input_statistics = Statistics {
1295 num_rows: Precision::Exact(10),
1296 column_statistics: vec![
1297 ColumnStatistics {
1298 min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1299 max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1300 ..Default::default()
1301 },
1302 ColumnStatistics {
1303 min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1304 max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
1305 ..Default::default()
1306 },
1307 ColumnStatistics {
1308 min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1309 max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
1310 ..Default::default()
1311 },
1312 ColumnStatistics {
1313 min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
1314 max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
1315 ..Default::default()
1316 },
1317 ColumnStatistics {
1318 min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
1319 max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
1320 ..Default::default()
1321 },
1322 ColumnStatistics {
1323 min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
1324 max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
1325 ..Default::default()
1326 },
1327 ],
1328 ..Default::default()
1329 };
1330
1331 let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
1332
1333 let exprs: Vec<ProjectionExpr> = vec![
1338 ProjectionExpr {
1339 expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
1340 alias: "c_renamed".to_string(),
1341 },
1342 ProjectionExpr {
1343 expr: Arc::new(BinaryExpr::new(
1344 Arc::new(Column::new("e", 4)),
1345 Operator::Plus,
1346 Arc::new(Column::new("f", 5)),
1347 )) as Arc<dyn PhysicalExpr>,
1348 alias: "e_plus_f".to_string(),
1349 },
1350 ];
1351
1352 let projection = ProjectionExec::try_new(exprs, input).unwrap();
1353
1354 let stats = projection.partition_statistics(None).unwrap();
1355
1356 assert_eq!(stats.num_rows, Precision::Exact(10));
1357 assert_eq!(
1358 stats.column_statistics.len(),
1359 2,
1360 "Expected 2 columns in projection statistics"
1361 );
1362 assert!(stats.total_byte_size.is_exact().unwrap_or(false));
1363 }
1364
1365 #[test]
1366 fn test_filter_pushdown_with_alias() -> Result<()> {
1367 let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1368 let input = Arc::new(StatisticsExec::new(
1369 Statistics::new_unknown(&input_schema),
1370 input_schema.clone(),
1371 ));
1372
1373 let projection = ProjectionExec::try_new(
1375 vec![ProjectionExpr {
1376 expr: Arc::new(Column::new("a", 0)),
1377 alias: "b".to_string(),
1378 }],
1379 input,
1380 )?;
1381
1382 let filter = Arc::new(BinaryExpr::new(
1384 Arc::new(Column::new("b", 0)),
1385 Operator::Gt,
1386 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1387 )) as Arc<dyn PhysicalExpr>;
1388
1389 let description = projection.gather_filters_for_pushdown(
1390 FilterPushdownPhase::Post,
1391 vec![filter],
1392 &ConfigOptions::default(),
1393 )?;
1394
1395 let expected_filter = Arc::new(BinaryExpr::new(
1398 Arc::new(Column::new("a", 0)),
1399 Operator::Gt,
1400 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1401 )) as Arc<dyn PhysicalExpr>;
1402
1403 assert_eq!(description.self_filters(), vec![vec![]]);
1404 let pushed_filters = &description.parent_filters()[0];
1405 assert_eq!(
1406 format!("{}", pushed_filters[0].predicate),
1407 format!("{}", expected_filter)
1408 );
1409 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1411
1412 Ok(())
1413 }
1414
1415 #[test]
1416 fn test_filter_pushdown_with_multiple_aliases() -> Result<()> {
1417 let input_schema = Schema::new(vec![
1418 Field::new("a", DataType::Int32, false),
1419 Field::new("b", DataType::Int32, false),
1420 ]);
1421 let input = Arc::new(StatisticsExec::new(
1422 Statistics {
1423 column_statistics: vec![Default::default(); input_schema.fields().len()],
1424 ..Default::default()
1425 },
1426 input_schema.clone(),
1427 ));
1428
1429 let projection = ProjectionExec::try_new(
1431 vec![
1432 ProjectionExpr {
1433 expr: Arc::new(Column::new("a", 0)),
1434 alias: "x".to_string(),
1435 },
1436 ProjectionExpr {
1437 expr: Arc::new(Column::new("b", 1)),
1438 alias: "y".to_string(),
1439 },
1440 ],
1441 input,
1442 )?;
1443
1444 let filter1 = Arc::new(BinaryExpr::new(
1446 Arc::new(Column::new("x", 0)),
1447 Operator::Gt,
1448 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1449 )) as Arc<dyn PhysicalExpr>;
1450
1451 let filter2 = Arc::new(BinaryExpr::new(
1453 Arc::new(Column::new("y", 1)),
1454 Operator::Lt,
1455 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1456 )) as Arc<dyn PhysicalExpr>;
1457
1458 let description = projection.gather_filters_for_pushdown(
1459 FilterPushdownPhase::Post,
1460 vec![filter1, filter2],
1461 &ConfigOptions::default(),
1462 )?;
1463
1464 let expected_filter1 = Arc::new(BinaryExpr::new(
1466 Arc::new(Column::new("a", 0)),
1467 Operator::Gt,
1468 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1469 )) as Arc<dyn PhysicalExpr>;
1470
1471 let expected_filter2 = Arc::new(BinaryExpr::new(
1472 Arc::new(Column::new("b", 1)),
1473 Operator::Lt,
1474 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1475 )) as Arc<dyn PhysicalExpr>;
1476
1477 let pushed_filters = &description.parent_filters()[0];
1478 assert_eq!(pushed_filters.len(), 2);
1479 assert_eq!(
1481 format!("{}", pushed_filters[0].predicate),
1482 format!("{}", expected_filter1)
1483 );
1484 assert_eq!(
1485 format!("{}", pushed_filters[1].predicate),
1486 format!("{}", expected_filter2)
1487 );
1488 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1490 assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1491
1492 Ok(())
1493 }
1494
1495 #[test]
1496 fn test_filter_pushdown_with_swapped_aliases() -> Result<()> {
1497 let input_schema = Schema::new(vec![
1498 Field::new("a", DataType::Int32, false),
1499 Field::new("b", DataType::Int32, false),
1500 ]);
1501 let input = Arc::new(StatisticsExec::new(
1502 Statistics {
1503 column_statistics: vec![Default::default(); input_schema.fields().len()],
1504 ..Default::default()
1505 },
1506 input_schema.clone(),
1507 ));
1508
1509 let projection = ProjectionExec::try_new(
1511 vec![
1512 ProjectionExpr {
1513 expr: Arc::new(Column::new("a", 0)),
1514 alias: "b".to_string(),
1515 },
1516 ProjectionExpr {
1517 expr: Arc::new(Column::new("b", 1)),
1518 alias: "a".to_string(),
1519 },
1520 ],
1521 input,
1522 )?;
1523
1524 let filter1 = Arc::new(BinaryExpr::new(
1526 Arc::new(Column::new("b", 0)),
1527 Operator::Gt,
1528 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1529 )) as Arc<dyn PhysicalExpr>;
1530
1531 let filter2 = Arc::new(BinaryExpr::new(
1533 Arc::new(Column::new("a", 1)),
1534 Operator::Lt,
1535 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1536 )) as Arc<dyn PhysicalExpr>;
1537
1538 let description = projection.gather_filters_for_pushdown(
1539 FilterPushdownPhase::Post,
1540 vec![filter1, filter2],
1541 &ConfigOptions::default(),
1542 )?;
1543
1544 let pushed_filters = &description.parent_filters()[0];
1545 assert_eq!(pushed_filters.len(), 2);
1546
1547 let expected_filter1 = "a@0 > 5";
1549 let expected_filter2 = "b@1 < 10";
1551
1552 assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
1553 assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
1554 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1556 assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1557
1558 Ok(())
1559 }
1560
1561 #[test]
1562 fn test_filter_pushdown_with_mixed_columns() -> Result<()> {
1563 let input_schema = Schema::new(vec![
1564 Field::new("a", DataType::Int32, false),
1565 Field::new("b", DataType::Int32, false),
1566 ]);
1567 let input = Arc::new(StatisticsExec::new(
1568 Statistics {
1569 column_statistics: vec![Default::default(); input_schema.fields().len()],
1570 ..Default::default()
1571 },
1572 input_schema.clone(),
1573 ));
1574
1575 let projection = ProjectionExec::try_new(
1577 vec![
1578 ProjectionExpr {
1579 expr: Arc::new(Column::new("a", 0)),
1580 alias: "x".to_string(),
1581 },
1582 ProjectionExpr {
1583 expr: Arc::new(Column::new("b", 1)),
1584 alias: "b".to_string(),
1585 },
1586 ],
1587 input,
1588 )?;
1589
1590 let filter1 = Arc::new(BinaryExpr::new(
1592 Arc::new(Column::new("x", 0)),
1593 Operator::Gt,
1594 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1595 )) as Arc<dyn PhysicalExpr>;
1596
1597 let filter2 = Arc::new(BinaryExpr::new(
1599 Arc::new(Column::new("b", 1)),
1600 Operator::Lt,
1601 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1602 )) as Arc<dyn PhysicalExpr>;
1603
1604 let description = projection.gather_filters_for_pushdown(
1605 FilterPushdownPhase::Post,
1606 vec![filter1, filter2],
1607 &ConfigOptions::default(),
1608 )?;
1609
1610 let pushed_filters = &description.parent_filters()[0];
1611 assert_eq!(pushed_filters.len(), 2);
1612 let expected_filter1 = "a@0 > 5";
1614 let expected_filter2 = "b@1 < 10";
1616
1617 assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
1618 assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
1619 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1621 assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
1622
1623 Ok(())
1624 }
1625
1626 #[test]
1627 fn test_filter_pushdown_with_complex_expression() -> Result<()> {
1628 let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1629 let input = Arc::new(StatisticsExec::new(
1630 Statistics {
1631 column_statistics: vec![Default::default(); input_schema.fields().len()],
1632 ..Default::default()
1633 },
1634 input_schema.clone(),
1635 ));
1636
1637 let projection = ProjectionExec::try_new(
1639 vec![ProjectionExpr {
1640 expr: Arc::new(BinaryExpr::new(
1641 Arc::new(Column::new("a", 0)),
1642 Operator::Plus,
1643 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1644 )),
1645 alias: "z".to_string(),
1646 }],
1647 input,
1648 )?;
1649
1650 let filter = Arc::new(BinaryExpr::new(
1652 Arc::new(Column::new("z", 0)),
1653 Operator::Gt,
1654 Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1655 )) as Arc<dyn PhysicalExpr>;
1656
1657 let description = projection.gather_filters_for_pushdown(
1658 FilterPushdownPhase::Post,
1659 vec![filter],
1660 &ConfigOptions::default(),
1661 )?;
1662
1663 let pushed_filters = &description.parent_filters()[0];
1665 assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
1666 assert_eq!(format!("{}", pushed_filters[0].predicate), "a@0 + 1 > 10");
1667
1668 Ok(())
1669 }
1670
1671 #[test]
1672 fn test_filter_pushdown_with_unknown_column() -> Result<()> {
1673 let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1674 let input = Arc::new(StatisticsExec::new(
1675 Statistics {
1676 column_statistics: vec![Default::default(); input_schema.fields().len()],
1677 ..Default::default()
1678 },
1679 input_schema.clone(),
1680 ));
1681
1682 let projection = ProjectionExec::try_new(
1684 vec![ProjectionExpr {
1685 expr: Arc::new(Column::new("a", 0)),
1686 alias: "a".to_string(),
1687 }],
1688 input,
1689 )?;
1690
1691 let filter = Arc::new(BinaryExpr::new(
1694 Arc::new(Column::new("unknown_col", 1)),
1695 Operator::Gt,
1696 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1697 )) as Arc<dyn PhysicalExpr>;
1698
1699 let description = projection.gather_filters_for_pushdown(
1700 FilterPushdownPhase::Post,
1701 vec![filter],
1702 &ConfigOptions::default(),
1703 )?;
1704
1705 let pushed_filters = &description.parent_filters()[0];
1706 assert!(matches!(pushed_filters[0].discriminant, PushedDown::No));
1707 assert_eq!(
1709 format!("{}", pushed_filters[0].predicate),
1710 "unknown_col@1 > 5"
1711 );
1712
1713 Ok(())
1714 }
1715
1716 #[test]
1720 fn test_basic_dyn_filter_projection_pushdown_update_child() -> Result<()> {
1721 let input_schema =
1722 Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)]));
1723
1724 let input = Arc::new(StatisticsExec::new(
1725 Statistics {
1726 column_statistics: vec![Default::default(); input_schema.fields().len()],
1727 ..Default::default()
1728 },
1729 input_schema.as_ref().clone(),
1730 ));
1731
1732 let projection = ProjectionExec::try_new(
1734 vec![ProjectionExpr {
1735 expr: binary(
1736 Arc::new(Column::new("b", 0)),
1737 Operator::Minus,
1738 lit(1),
1739 &input_schema,
1740 )
1741 .unwrap(),
1742 alias: "a".to_string(),
1743 }],
1744 input,
1745 )?;
1746
1747 let projected_schema = projection.schema();
1749 let col_a = col("a", &projected_schema)?;
1750 let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
1751 vec![Arc::clone(&col_a)],
1752 lit(true),
1753 ));
1754 let current = dynamic_filter.current()?;
1756 assert_eq!(format!("{current}"), "true");
1757
1758 let dyn_phy_expr: Arc<dyn PhysicalExpr> = Arc::clone(&dynamic_filter) as _;
1759
1760 let description = projection.gather_filters_for_pushdown(
1761 FilterPushdownPhase::Post,
1762 vec![dyn_phy_expr],
1763 &ConfigOptions::default(),
1764 )?;
1765
1766 let pushed_filters = &description.parent_filters()[0][0];
1767
1768 assert_eq!(
1770 format!("{}", pushed_filters.predicate),
1771 "DynamicFilter [ empty ]"
1772 );
1773
1774 let new_expr =
1776 Arc::new(BinaryExpr::new(Arc::clone(&col_a), Operator::Gt, lit(5i32)));
1777 dynamic_filter.update(new_expr)?;
1778
1779 let current = dynamic_filter.current()?;
1781 assert_eq!(format!("{current}"), "a@0 > 5");
1782
1783 assert_eq!(
1785 format!("{}", pushed_filters.predicate),
1786 "DynamicFilter [ b@0 - 1 > 5 ]"
1787 );
1788
1789 Ok(())
1790 }
1791}