1use super::expressions::{Column, Literal};
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{
26 DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
27 SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
28};
29use crate::execution_plan::CardinalityEffect;
30use crate::filter_pushdown::{
31 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
32 FilterPushdownPropagation,
33};
34use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
35use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr};
36use std::any::Any;
37use std::collections::HashMap;
38use std::pin::Pin;
39use std::sync::Arc;
40use std::task::{Context, Poll};
41
42use arrow::datatypes::SchemaRef;
43use arrow::record_batch::RecordBatch;
44use datafusion_common::config::ConfigOptions;
45use datafusion_common::tree_node::{
46 Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
47};
48use datafusion_common::{JoinSide, Result, internal_err};
49use datafusion_execution::TaskContext;
50use datafusion_physical_expr::equivalence::ProjectionMapping;
51use datafusion_physical_expr::projection::Projector;
52use datafusion_physical_expr::utils::collect_columns;
53use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
54use datafusion_physical_expr_common::sort_expr::{
55 LexOrdering, LexRequirement, PhysicalSortExpr,
56};
57pub use datafusion_physical_expr::projection::{
60 ProjectionExpr, ProjectionExprs, update_expr,
61};
62
63use futures::stream::{Stream, StreamExt};
64use log::trace;
65
66#[derive(Debug, Clone)]
71pub struct ProjectionExec {
72 projector: Projector,
75 input: Arc<dyn ExecutionPlan>,
77 metrics: ExecutionPlanMetricsSet,
79 cache: PlanProperties,
81}
82
83impl ProjectionExec {
84 pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
134 where
135 I: IntoIterator<Item = E>,
136 E: Into<ProjectionExpr>,
137 {
138 let input_schema = input.schema();
139 let expr_vec = expr.into_iter().map(Into::into).collect::<Vec<_>>();
141 let projection = ProjectionExprs::new(expr_vec);
142 let projector = projection.make_projector(&input_schema)?;
143
144 let projection_mapping = projection.projection_mapping(&input_schema)?;
146 let cache = Self::compute_properties(
147 &input,
148 &projection_mapping,
149 Arc::clone(projector.output_schema()),
150 )?;
151 Ok(Self {
152 projector,
153 input,
154 metrics: ExecutionPlanMetricsSet::new(),
155 cache,
156 })
157 }
158
159 pub fn expr(&self) -> &[ProjectionExpr] {
161 self.projector.projection().as_ref()
162 }
163
164 pub fn projection_expr(&self) -> &ProjectionExprs {
166 self.projector.projection()
167 }
168
169 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
171 &self.input
172 }
173
174 fn compute_properties(
176 input: &Arc<dyn ExecutionPlan>,
177 projection_mapping: &ProjectionMapping,
178 schema: SchemaRef,
179 ) -> Result<PlanProperties> {
180 let input_eq_properties = input.equivalence_properties();
182 let eq_properties = input_eq_properties.project(projection_mapping, schema);
183 let output_partitioning = input
185 .output_partitioning()
186 .project(projection_mapping, input_eq_properties);
187
188 Ok(PlanProperties::new(
189 eq_properties,
190 output_partitioning,
191 input.pipeline_behavior(),
192 input.boundedness(),
193 ))
194 }
195}
196
197impl DisplayAs for ProjectionExec {
198 fn fmt_as(
199 &self,
200 t: DisplayFormatType,
201 f: &mut std::fmt::Formatter,
202 ) -> std::fmt::Result {
203 match t {
204 DisplayFormatType::Default | DisplayFormatType::Verbose => {
205 let expr: Vec<String> = self
206 .projector
207 .projection()
208 .as_ref()
209 .iter()
210 .map(|proj_expr| {
211 let e = proj_expr.expr.to_string();
212 if e != proj_expr.alias {
213 format!("{e} as {}", proj_expr.alias)
214 } else {
215 e
216 }
217 })
218 .collect();
219
220 write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
221 }
222 DisplayFormatType::TreeRender => {
223 for (i, proj_expr) in self.expr().iter().enumerate() {
224 let expr_sql = fmt_sql(proj_expr.expr.as_ref());
225 if proj_expr.expr.to_string() == proj_expr.alias {
226 writeln!(f, "expr{i}={expr_sql}")?;
227 } else {
228 writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
229 }
230 }
231
232 Ok(())
233 }
234 }
235 }
236}
237
238impl ExecutionPlan for ProjectionExec {
239 fn name(&self) -> &'static str {
240 "ProjectionExec"
241 }
242
243 fn as_any(&self) -> &dyn Any {
245 self
246 }
247
248 fn properties(&self) -> &PlanProperties {
249 &self.cache
250 }
251
252 fn maintains_input_order(&self) -> Vec<bool> {
253 vec![true]
255 }
256
257 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
258 let all_simple_exprs =
259 self.projector
260 .projection()
261 .as_ref()
262 .iter()
263 .all(|proj_expr| {
264 proj_expr.expr.as_any().is::<Column>()
265 || proj_expr.expr.as_any().is::<Literal>()
266 });
267 vec![!all_simple_exprs]
270 }
271
272 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
273 vec![&self.input]
274 }
275
276 fn with_new_children(
277 self: Arc<Self>,
278 mut children: Vec<Arc<dyn ExecutionPlan>>,
279 ) -> Result<Arc<dyn ExecutionPlan>> {
280 ProjectionExec::try_new(
281 self.projector.projection().clone(),
282 children.swap_remove(0),
283 )
284 .map(|p| Arc::new(p) as _)
285 }
286
287 fn execute(
288 &self,
289 partition: usize,
290 context: Arc<TaskContext>,
291 ) -> Result<SendableRecordBatchStream> {
292 trace!(
293 "Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}",
294 partition,
295 context.session_id(),
296 context.task_id()
297 );
298
299 let projector = self.projector.with_metrics(&self.metrics, partition);
300 Ok(Box::pin(ProjectionStream::new(
301 projector,
302 self.input.execute(partition, context)?,
303 BaselineMetrics::new(&self.metrics, partition),
304 )?))
305 }
306
307 fn metrics(&self) -> Option<MetricsSet> {
308 Some(self.metrics.clone_inner())
309 }
310
311 fn statistics(&self) -> Result<Statistics> {
312 self.partition_statistics(None)
313 }
314
315 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
316 let input_stats = self.input.partition_statistics(partition)?;
317 let output_schema = self.schema();
318 self.projector
319 .projection()
320 .project_statistics(input_stats, &output_schema)
321 }
322
323 fn supports_limit_pushdown(&self) -> bool {
324 true
325 }
326
327 fn cardinality_effect(&self) -> CardinalityEffect {
328 CardinalityEffect::Equal
329 }
330
331 fn try_swapping_with_projection(
332 &self,
333 projection: &ProjectionExec,
334 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
335 let maybe_unified = try_unifying_projections(projection, self)?;
336 if let Some(new_plan) = maybe_unified {
337 remove_unnecessary_projections(new_plan).data().map(Some)
339 } else {
340 Ok(Some(Arc::new(projection.clone())))
341 }
342 }
343
344 fn gather_filters_for_pushdown(
345 &self,
346 _phase: FilterPushdownPhase,
347 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
348 _config: &ConfigOptions,
349 ) -> Result<FilterDescription> {
350 FilterDescription::from_children(parent_filters, &self.children())
354 }
355
356 fn handle_child_pushdown_result(
357 &self,
358 _phase: FilterPushdownPhase,
359 child_pushdown_result: ChildPushdownResult,
360 _config: &ConfigOptions,
361 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
362 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
363 }
364
365 fn try_pushdown_sort(
366 &self,
367 order: &[PhysicalSortExpr],
368 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
369 let child = self.input();
370 let mut child_order = Vec::new();
371
372 for sort_expr in order {
374 let mut can_pushdown = true;
376 let transformed = Arc::clone(&sort_expr.expr).transform(|expr| {
377 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
378 if col.index() >= self.expr().len() {
381 can_pushdown = false;
382 return Ok(Transformed::no(expr));
383 }
384
385 let proj_expr = &self.expr()[col.index()];
386
387 if let Some(child_col) =
391 proj_expr.expr.as_any().downcast_ref::<Column>()
392 {
393 Ok(Transformed::yes(Arc::new(child_col.clone()) as _))
395 } else {
396 can_pushdown = false;
398 Ok(Transformed::no(expr))
399 }
400 } else {
401 Ok(Transformed::no(expr))
402 }
403 })?;
404
405 if !can_pushdown {
406 return Ok(SortOrderPushdownResult::Unsupported);
407 }
408
409 child_order.push(PhysicalSortExpr {
410 expr: transformed.data,
411 options: sort_expr.options,
412 });
413 }
414
415 match child.try_pushdown_sort(&child_order)? {
417 SortOrderPushdownResult::Exact { inner } => {
418 let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
419 Ok(SortOrderPushdownResult::Exact { inner: new_exec })
420 }
421 SortOrderPushdownResult::Inexact { inner } => {
422 let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
423 Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
424 }
425 SortOrderPushdownResult::Unsupported => {
426 Ok(SortOrderPushdownResult::Unsupported)
427 }
428 }
429 }
430}
431
432impl ProjectionStream {
433 fn new(
435 projector: Projector,
436 input: SendableRecordBatchStream,
437 baseline_metrics: BaselineMetrics,
438 ) -> Result<Self> {
439 Ok(Self {
440 projector,
441 input,
442 baseline_metrics,
443 })
444 }
445
446 fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
447 let _timer = self.baseline_metrics.elapsed_compute().timer();
449 self.projector.project_batch(batch)
450 }
451}
452
453struct ProjectionStream {
455 projector: Projector,
456 input: SendableRecordBatchStream,
457 baseline_metrics: BaselineMetrics,
458}
459
460impl Stream for ProjectionStream {
461 type Item = Result<RecordBatch>;
462
463 fn poll_next(
464 mut self: Pin<&mut Self>,
465 cx: &mut Context<'_>,
466 ) -> Poll<Option<Self::Item>> {
467 let poll = self.input.poll_next_unpin(cx).map(|x| match x {
468 Some(Ok(batch)) => Some(self.batch_project(&batch)),
469 other => other,
470 });
471
472 self.baseline_metrics.record_poll(poll)
473 }
474
475 fn size_hint(&self) -> (usize, Option<usize>) {
476 self.input.size_hint()
478 }
479}
480
481impl RecordBatchStream for ProjectionStream {
482 fn schema(&self) -> SchemaRef {
484 Arc::clone(self.projector.output_schema())
485 }
486}
487
488pub trait EmbeddedProjection: ExecutionPlan + Sized {
489 fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
490}
491
492pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
495 projection: &ProjectionExec,
496 execution_plan: &Exec,
497) -> Result<Option<Arc<dyn ExecutionPlan>>> {
498 let projection_index = collect_column_indices(projection.expr());
500
501 if projection_index.is_empty() {
502 return Ok(None);
503 };
504
505 if projection_index.len() == projection_index.last().unwrap() + 1
508 && projection_index.len() == execution_plan.schema().fields().len()
509 {
510 return Ok(None);
511 }
512
513 let new_execution_plan =
514 Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
515
516 let embed_project_exprs = projection_index
518 .iter()
519 .zip(new_execution_plan.schema().fields())
520 .map(|(index, field)| ProjectionExpr {
521 expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
522 alias: field.name().to_owned(),
523 })
524 .collect::<Vec<_>>();
525
526 let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
527
528 for proj_expr in projection.expr() {
529 let Some(expr) =
531 update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
532 else {
533 return Ok(None);
534 };
535 new_projection_exprs.push(ProjectionExpr {
536 expr,
537 alias: proj_expr.alias.clone(),
538 });
539 }
540 let new_projection = Arc::new(ProjectionExec::try_new(
542 new_projection_exprs,
543 Arc::clone(&new_execution_plan) as _,
544 )?);
545 if is_projection_removable(&new_projection) {
546 Ok(Some(new_execution_plan))
547 } else {
548 Ok(Some(new_projection))
549 }
550}
551
552pub struct JoinData {
553 pub projected_left_child: ProjectionExec,
554 pub projected_right_child: ProjectionExec,
555 pub join_filter: Option<JoinFilter>,
556 pub join_on: JoinOn,
557}
558
559pub fn try_pushdown_through_join(
560 projection: &ProjectionExec,
561 join_left: &Arc<dyn ExecutionPlan>,
562 join_right: &Arc<dyn ExecutionPlan>,
563 join_on: JoinOnRef,
564 schema: &SchemaRef,
565 filter: Option<&JoinFilter>,
566) -> Result<Option<JoinData>> {
567 let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
569 return Ok(None);
570 };
571
572 let (far_right_left_col_ind, far_left_right_col_ind) =
573 join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
574
575 if !join_allows_pushdown(
576 &projection_as_columns,
577 schema,
578 far_right_left_col_ind,
579 far_left_right_col_ind,
580 ) {
581 return Ok(None);
582 }
583
584 let new_filter = if let Some(filter) = filter {
585 match update_join_filter(
586 &projection_as_columns[0..=far_right_left_col_ind as _],
587 &projection_as_columns[far_left_right_col_ind as _..],
588 filter,
589 join_left.schema().fields().len(),
590 ) {
591 Some(updated_filter) => Some(updated_filter),
592 None => return Ok(None),
593 }
594 } else {
595 None
596 };
597
598 let Some(new_on) = update_join_on(
599 &projection_as_columns[0..=far_right_left_col_ind as _],
600 &projection_as_columns[far_left_right_col_ind as _..],
601 join_on,
602 join_left.schema().fields().len(),
603 ) else {
604 return Ok(None);
605 };
606
607 let (new_left, new_right) = new_join_children(
608 &projection_as_columns,
609 far_right_left_col_ind,
610 far_left_right_col_ind,
611 join_left,
612 join_right,
613 )?;
614
615 Ok(Some(JoinData {
616 projected_left_child: new_left,
617 projected_right_child: new_right,
618 join_filter: new_filter,
619 join_on: new_on,
620 }))
621}
622
623pub fn remove_unnecessary_projections(
628 plan: Arc<dyn ExecutionPlan>,
629) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
630 let maybe_modified =
631 if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
632 if is_projection_removable(projection) {
635 return Ok(Transformed::yes(Arc::clone(projection.input())));
636 }
637 projection
639 .input()
640 .try_swapping_with_projection(projection)?
641 } else {
642 return Ok(Transformed::no(plan));
643 };
644 Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
645}
646
647fn is_projection_removable(projection: &ProjectionExec) -> bool {
652 let exprs = projection.expr();
653 exprs.iter().enumerate().all(|(idx, proj_expr)| {
654 let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() else {
655 return false;
656 };
657 col.name() == proj_expr.alias && col.index() == idx
658 }) && exprs.len() == projection.input().schema().fields().len()
659}
660
661pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
664 exprs.iter().all(|proj_expr| {
665 proj_expr
666 .expr
667 .as_any()
668 .downcast_ref::<Column>()
669 .map(|column| column.name() == proj_expr.alias)
670 .unwrap_or(false)
671 })
672}
673
674pub fn new_projections_for_columns(
678 projection: &[ProjectionExpr],
679 source: &[usize],
680) -> Vec<usize> {
681 projection
682 .iter()
683 .filter_map(|proj_expr| {
684 proj_expr
685 .expr
686 .as_any()
687 .downcast_ref::<Column>()
688 .map(|expr| source[expr.index()])
689 })
690 .collect()
691}
692
693pub fn make_with_child(
696 projection: &ProjectionExec,
697 child: &Arc<dyn ExecutionPlan>,
698) -> Result<Arc<dyn ExecutionPlan>> {
699 ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
700 .map(|e| Arc::new(e) as _)
701}
702
703pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
705 exprs
706 .iter()
707 .all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
708}
709
710pub fn update_ordering(
713 ordering: LexOrdering,
714 projected_exprs: &[ProjectionExpr],
715) -> Result<Option<LexOrdering>> {
716 let mut updated_exprs = vec![];
717 for mut sort_expr in ordering.into_iter() {
718 let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
719 else {
720 return Ok(None);
721 };
722 sort_expr.expr = updated_expr;
723 updated_exprs.push(sort_expr);
724 }
725 Ok(LexOrdering::new(updated_exprs))
726}
727
728pub fn update_ordering_requirement(
731 reqs: LexRequirement,
732 projected_exprs: &[ProjectionExpr],
733) -> Result<Option<LexRequirement>> {
734 let mut updated_exprs = vec![];
735 for mut sort_expr in reqs.into_iter() {
736 let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
737 else {
738 return Ok(None);
739 };
740 sort_expr.expr = updated_expr;
741 updated_exprs.push(sort_expr);
742 }
743 Ok(LexRequirement::new(updated_exprs))
744}
745
746pub fn physical_to_column_exprs(
749 exprs: &[ProjectionExpr],
750) -> Option<Vec<(Column, String)>> {
751 exprs
752 .iter()
753 .map(|proj_expr| {
754 proj_expr
755 .expr
756 .as_any()
757 .downcast_ref::<Column>()
758 .map(|col| (col.clone(), proj_expr.alias.clone()))
759 })
760 .collect()
761}
762
763pub fn new_join_children(
767 projection_as_columns: &[(Column, String)],
768 far_right_left_col_ind: i32,
769 far_left_right_col_ind: i32,
770 left_child: &Arc<dyn ExecutionPlan>,
771 right_child: &Arc<dyn ExecutionPlan>,
772) -> Result<(ProjectionExec, ProjectionExec)> {
773 let new_left = ProjectionExec::try_new(
774 projection_as_columns[0..=far_right_left_col_ind as _]
775 .iter()
776 .map(|(col, alias)| ProjectionExpr {
777 expr: Arc::new(Column::new(col.name(), col.index())) as _,
778 alias: alias.clone(),
779 }),
780 Arc::clone(left_child),
781 )?;
782 let left_size = left_child.schema().fields().len() as i32;
783 let new_right = ProjectionExec::try_new(
784 projection_as_columns[far_left_right_col_ind as _..]
785 .iter()
786 .map(|(col, alias)| {
787 ProjectionExpr {
788 expr: Arc::new(Column::new(
789 col.name(),
790 (col.index() as i32 - left_size) as _,
793 )) as _,
794 alias: alias.clone(),
795 }
796 }),
797 Arc::clone(right_child),
798 )?;
799
800 Ok((new_left, new_right))
801}
802
803pub fn join_allows_pushdown(
809 projection_as_columns: &[(Column, String)],
810 join_schema: &SchemaRef,
811 far_right_left_col_ind: i32,
812 far_left_right_col_ind: i32,
813) -> bool {
814 projection_as_columns.len() < join_schema.fields().len()
816 && (far_right_left_col_ind + 1 == far_left_right_col_ind)
818 && far_right_left_col_ind >= 0
820 && far_left_right_col_ind < projection_as_columns.len() as i32
821}
822
823pub fn join_table_borders(
829 left_table_column_count: usize,
830 projection_as_columns: &[(Column, String)],
831) -> (i32, i32) {
832 let far_right_left_col_ind = projection_as_columns
833 .iter()
834 .enumerate()
835 .take_while(|(_, (projection_column, _))| {
836 projection_column.index() < left_table_column_count
837 })
838 .last()
839 .map(|(index, _)| index as i32)
840 .unwrap_or(-1);
841
842 let far_left_right_col_ind = projection_as_columns
843 .iter()
844 .enumerate()
845 .rev()
846 .take_while(|(_, (projection_column, _))| {
847 projection_column.index() >= left_table_column_count
848 })
849 .last()
850 .map(|(index, _)| index as i32)
851 .unwrap_or(projection_as_columns.len() as i32);
852
853 (far_right_left_col_ind, far_left_right_col_ind)
854}
855
856pub fn update_join_on(
859 proj_left_exprs: &[(Column, String)],
860 proj_right_exprs: &[(Column, String)],
861 hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
862 left_field_size: usize,
863) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
864 let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
865 .iter()
866 .map(|(left, right)| (left, right))
867 .unzip();
868
869 let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
870 let new_right_columns =
871 new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
872
873 match (new_left_columns, new_right_columns) {
874 (Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
875 _ => None,
876 }
877}
878
879pub fn update_join_filter(
882 projection_left_exprs: &[(Column, String)],
883 projection_right_exprs: &[(Column, String)],
884 join_filter: &JoinFilter,
885 left_field_size: usize,
886) -> Option<JoinFilter> {
887 let mut new_left_indices = new_indices_for_join_filter(
888 join_filter,
889 JoinSide::Left,
890 projection_left_exprs,
891 0,
892 )
893 .into_iter();
894 let mut new_right_indices = new_indices_for_join_filter(
895 join_filter,
896 JoinSide::Right,
897 projection_right_exprs,
898 left_field_size,
899 )
900 .into_iter();
901
902 (new_right_indices.len() + new_left_indices.len()
904 == join_filter.column_indices().len())
905 .then(|| {
906 JoinFilter::new(
907 Arc::clone(join_filter.expression()),
908 join_filter
909 .column_indices()
910 .iter()
911 .map(|col_idx| ColumnIndex {
912 index: if col_idx.side == JoinSide::Left {
913 new_left_indices.next().unwrap()
914 } else {
915 new_right_indices.next().unwrap()
916 },
917 side: col_idx.side,
918 })
919 .collect(),
920 Arc::clone(join_filter.schema()),
921 )
922 })
923}
924
925fn try_unifying_projections(
927 projection: &ProjectionExec,
928 child: &ProjectionExec,
929) -> Result<Option<Arc<dyn ExecutionPlan>>> {
930 let mut projected_exprs = vec![];
931 let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
932
933 projection.expr().iter().for_each(|proj_expr| {
935 proj_expr
936 .expr
937 .apply(|expr| {
938 Ok({
939 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
940 *column_ref_map.entry(column.clone()).or_default() += 1;
941 }
942 TreeNodeRecursion::Continue
943 })
944 })
945 .unwrap();
946 });
947 if column_ref_map.iter().any(|(column, count)| {
952 *count > 1 && !is_expr_trivial(&Arc::clone(&child.expr()[column.index()].expr))
953 }) {
954 return Ok(None);
955 }
956 for proj_expr in projection.expr() {
957 let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
961 return Ok(None);
962 };
963 projected_exprs.push(ProjectionExpr {
964 expr,
965 alias: proj_expr.alias.clone(),
966 });
967 }
968 ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
969 .map(|e| Some(Arc::new(e) as _))
970}
971
972fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
974 let mut indices = exprs
976 .iter()
977 .flat_map(|proj_expr| collect_columns(&proj_expr.expr))
978 .map(|x| x.index())
979 .collect::<std::collections::HashSet<_>>()
980 .into_iter()
981 .collect::<Vec<_>>();
982 indices.sort();
983 indices
984}
985
986fn new_indices_for_join_filter(
994 join_filter: &JoinFilter,
995 join_side: JoinSide,
996 projection_exprs: &[(Column, String)],
997 column_index_offset: usize,
998) -> Vec<usize> {
999 join_filter
1000 .column_indices()
1001 .iter()
1002 .filter(|col_idx| col_idx.side == join_side)
1003 .filter_map(|col_idx| {
1004 projection_exprs
1005 .iter()
1006 .position(|(col, _)| col_idx.index + column_index_offset == col.index())
1007 })
1008 .collect()
1009}
1010
1011fn new_columns_for_join_on(
1019 hash_join_on: &[&PhysicalExprRef],
1020 projection_exprs: &[(Column, String)],
1021 column_index_offset: usize,
1022) -> Option<Vec<PhysicalExprRef>> {
1023 let new_columns = hash_join_on
1024 .iter()
1025 .filter_map(|on| {
1026 Arc::clone(*on)
1028 .transform(|expr| {
1029 if let Some(column) = expr.as_any().downcast_ref::<Column>() {
1030 let new_column = projection_exprs
1032 .iter()
1033 .enumerate()
1034 .find(|(_, (proj_column, _))| {
1035 column.name() == proj_column.name()
1036 && column.index() + column_index_offset
1037 == proj_column.index()
1038 })
1039 .map(|(index, (_, alias))| Column::new(alias, index));
1040 if let Some(new_column) = new_column {
1041 Ok(Transformed::yes(Arc::new(new_column)))
1042 } else {
1043 internal_err!(
1047 "Column {:?} not found in projection expressions",
1048 column
1049 )
1050 }
1051 } else {
1052 Ok(Transformed::no(expr))
1053 }
1054 })
1055 .data()
1056 .ok()
1057 })
1058 .collect::<Vec<_>>();
1059 (new_columns.len() == hash_join_on.len()).then_some(new_columns)
1060}
1061
1062fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
1065 expr.as_any().downcast_ref::<Column>().is_some()
1066 || expr.as_any().downcast_ref::<Literal>().is_some()
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072 use std::sync::Arc;
1073
1074 use crate::common::collect;
1075
1076 use crate::test;
1077 use crate::test::exec::StatisticsExec;
1078
1079 use arrow::datatypes::{DataType, Field, Schema};
1080 use datafusion_common::ScalarValue;
1081 use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
1082
1083 use datafusion_expr::Operator;
1084 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal, col};
1085
1086 #[test]
1087 fn test_collect_column_indices() -> Result<()> {
1088 let expr = Arc::new(BinaryExpr::new(
1089 Arc::new(Column::new("b", 7)),
1090 Operator::Minus,
1091 Arc::new(BinaryExpr::new(
1092 Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1093 Operator::Plus,
1094 Arc::new(Column::new("a", 1)),
1095 )),
1096 ));
1097 let column_indices = collect_column_indices(&[ProjectionExpr {
1098 expr,
1099 alias: "b-(1+a)".to_string(),
1100 }]);
1101 assert_eq!(column_indices, vec![1, 7]);
1102 Ok(())
1103 }
1104
1105 #[test]
1106 fn test_join_table_borders() -> Result<()> {
1107 let projections = vec![
1108 (Column::new("b", 1), "b".to_owned()),
1109 (Column::new("c", 2), "c".to_owned()),
1110 (Column::new("e", 4), "e".to_owned()),
1111 (Column::new("d", 3), "d".to_owned()),
1112 (Column::new("c", 2), "c".to_owned()),
1113 (Column::new("f", 5), "f".to_owned()),
1114 (Column::new("h", 7), "h".to_owned()),
1115 (Column::new("g", 6), "g".to_owned()),
1116 ];
1117 let left_table_column_count = 5;
1118 assert_eq!(
1119 join_table_borders(left_table_column_count, &projections),
1120 (4, 5)
1121 );
1122
1123 let left_table_column_count = 8;
1124 assert_eq!(
1125 join_table_borders(left_table_column_count, &projections),
1126 (7, 8)
1127 );
1128
1129 let left_table_column_count = 1;
1130 assert_eq!(
1131 join_table_borders(left_table_column_count, &projections),
1132 (-1, 0)
1133 );
1134
1135 let projections = vec![
1136 (Column::new("a", 0), "a".to_owned()),
1137 (Column::new("b", 1), "b".to_owned()),
1138 (Column::new("d", 3), "d".to_owned()),
1139 (Column::new("g", 6), "g".to_owned()),
1140 (Column::new("e", 4), "e".to_owned()),
1141 (Column::new("f", 5), "f".to_owned()),
1142 (Column::new("e", 4), "e".to_owned()),
1143 (Column::new("h", 7), "h".to_owned()),
1144 ];
1145 let left_table_column_count = 5;
1146 assert_eq!(
1147 join_table_borders(left_table_column_count, &projections),
1148 (2, 7)
1149 );
1150
1151 let left_table_column_count = 7;
1152 assert_eq!(
1153 join_table_borders(left_table_column_count, &projections),
1154 (6, 7)
1155 );
1156
1157 Ok(())
1158 }
1159
1160 #[tokio::test]
1161 async fn project_no_column() -> Result<()> {
1162 let task_ctx = Arc::new(TaskContext::default());
1163
1164 let exec = test::scan_partitioned(1);
1165 let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
1166
1167 let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
1168 let stream = projection.execute(0, Arc::clone(&task_ctx))?;
1169 let output = collect(stream).await?;
1170 assert_eq!(output.len(), expected.len());
1171
1172 Ok(())
1173 }
1174
1175 #[tokio::test]
1176 async fn project_old_syntax() {
1177 let exec = test::scan_partitioned(1);
1178 let schema = exec.schema();
1179 let expr = col("i", &schema).unwrap();
1180 ProjectionExec::try_new(
1181 vec![
1182 (expr, "c".to_string()),
1185 ],
1186 exec,
1187 )
1188 .unwrap();
1190 }
1191
1192 #[test]
1193 fn test_projection_statistics_uses_input_schema() {
1194 let input_schema = Schema::new(vec![
1195 Field::new("a", DataType::Int32, false),
1196 Field::new("b", DataType::Int32, false),
1197 Field::new("c", DataType::Int32, false),
1198 Field::new("d", DataType::Int32, false),
1199 Field::new("e", DataType::Int32, false),
1200 Field::new("f", DataType::Int32, false),
1201 ]);
1202
1203 let input_statistics = Statistics {
1204 num_rows: Precision::Exact(10),
1205 column_statistics: vec![
1206 ColumnStatistics {
1207 min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1208 max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
1209 ..Default::default()
1210 },
1211 ColumnStatistics {
1212 min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
1213 max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
1214 ..Default::default()
1215 },
1216 ColumnStatistics {
1217 min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
1218 max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
1219 ..Default::default()
1220 },
1221 ColumnStatistics {
1222 min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
1223 max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
1224 ..Default::default()
1225 },
1226 ColumnStatistics {
1227 min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
1228 max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
1229 ..Default::default()
1230 },
1231 ColumnStatistics {
1232 min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
1233 max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
1234 ..Default::default()
1235 },
1236 ],
1237 ..Default::default()
1238 };
1239
1240 let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
1241
1242 let exprs: Vec<ProjectionExpr> = vec![
1247 ProjectionExpr {
1248 expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
1249 alias: "c_renamed".to_string(),
1250 },
1251 ProjectionExpr {
1252 expr: Arc::new(BinaryExpr::new(
1253 Arc::new(Column::new("e", 4)),
1254 Operator::Plus,
1255 Arc::new(Column::new("f", 5)),
1256 )) as Arc<dyn PhysicalExpr>,
1257 alias: "e_plus_f".to_string(),
1258 },
1259 ];
1260
1261 let projection = ProjectionExec::try_new(exprs, input).unwrap();
1262
1263 let stats = projection.partition_statistics(None).unwrap();
1264
1265 assert_eq!(stats.num_rows, Precision::Exact(10));
1266 assert_eq!(
1267 stats.column_statistics.len(),
1268 2,
1269 "Expected 2 columns in projection statistics"
1270 );
1271 assert!(stats.total_byte_size.is_exact().unwrap_or(false));
1272 }
1273}