use super::expressions::Column;
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, SortOrderPushdownResult, Statistics,
};
use crate::column_rewriter::PhysicalColumnRewriter;
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
};
use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
use std::any::Any;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
use datafusion_execution::TaskContext;
use datafusion_expr::ExpressionPlacement;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::projection::Projector;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, fmt_sql};
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, LexRequirement, PhysicalSortExpr,
};
pub use datafusion_physical_expr::projection::{
ProjectionExpr, ProjectionExprs, update_expr,
};
use futures::stream::{Stream, StreamExt};
use log::trace;
#[derive(Debug, Clone)]
pub struct ProjectionExec {
projector: Projector,
input: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
cache: Arc<PlanProperties>,
}
impl ProjectionExec {
pub fn try_new<I, E>(expr: I, input: Arc<dyn ExecutionPlan>) -> Result<Self>
where
I: IntoIterator<Item = E>,
E: Into<ProjectionExpr>,
{
let input_schema = input.schema();
let expr_arc = expr.into_iter().map(Into::into).collect::<Arc<_>>();
let projection = ProjectionExprs::from_expressions(expr_arc);
let projector = projection.make_projector(&input_schema)?;
Self::try_from_projector(projector, input)
}
fn try_from_projector(
projector: Projector,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let projection_mapping =
projector.projection().projection_mapping(&input.schema())?;
let cache = Self::compute_properties(
&input,
&projection_mapping,
Arc::clone(projector.output_schema()),
)?;
Ok(Self {
projector,
input,
metrics: ExecutionPlanMetricsSet::new(),
cache: Arc::new(cache),
})
}
pub fn expr(&self) -> &[ProjectionExpr] {
self.projector.projection().as_ref()
}
pub fn projection_expr(&self) -> &ProjectionExprs {
self.projector.projection()
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
projection_mapping: &ProjectionMapping,
schema: SchemaRef,
) -> Result<PlanProperties> {
let input_eq_properties = input.equivalence_properties();
let eq_properties = input_eq_properties.project(projection_mapping, schema);
let output_partitioning = input
.output_partitioning()
.project(projection_mapping, input_eq_properties);
Ok(PlanProperties::new(
eq_properties,
output_partitioning,
input.pipeline_behavior(),
input.boundedness(),
))
}
fn collect_reverse_alias(
&self,
) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
let mut alias_map = datafusion_common::HashMap::new();
for projection in self.projection_expr().iter() {
let (aliased_index, _output_field) = self
.projector
.output_schema()
.column_with_name(&projection.alias)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Expr {} with alias {} not found in output schema",
projection.expr, projection.alias
))
})?;
let aliased_col = Column::new(&projection.alias, aliased_index);
alias_map.insert(aliased_col, Arc::clone(&projection.expr));
}
Ok(alias_map)
}
fn with_new_children_and_same_properties(
&self,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Self {
Self {
input: children.swap_remove(0),
metrics: ExecutionPlanMetricsSet::new(),
..Self::clone(self)
}
}
}
impl DisplayAs for ProjectionExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let expr: Vec<String> = self
.projector
.projection()
.as_ref()
.iter()
.map(|proj_expr| {
let e = proj_expr.expr.to_string();
if e != proj_expr.alias {
format!("{e} as {}", proj_expr.alias)
} else {
e
}
})
.collect();
write!(f, "ProjectionExec: expr=[{}]", expr.join(", "))
}
DisplayFormatType::TreeRender => {
for (i, proj_expr) in self.expr().iter().enumerate() {
let expr_sql = fmt_sql(proj_expr.expr.as_ref());
if proj_expr.expr.to_string() == proj_expr.alias {
writeln!(f, "expr{i}={expr_sql}")?;
} else {
writeln!(f, "{}={expr_sql}", proj_expr.alias)?;
}
}
Ok(())
}
}
}
}
impl ExecutionPlan for ProjectionExec {
fn name(&self) -> &'static str {
"ProjectionExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
let all_simple_exprs =
self.projector
.projection()
.as_ref()
.iter()
.all(|proj_expr| {
!matches!(
proj_expr.expr.placement(),
ExpressionPlacement::KeepInPlace
)
});
vec![!all_simple_exprs]
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
check_if_same_properties!(self, children);
ProjectionExec::try_from_projector(
self.projector.clone(),
children.swap_remove(0),
)
.map(|p| Arc::new(p) as _)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!(
"Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}",
partition,
context.session_id(),
context.task_id()
);
let projector = self.projector.with_metrics(&self.metrics, partition);
Ok(Box::pin(ProjectionStream::new(
projector,
self.input.execute(partition, context)?,
BaselineMetrics::new(&self.metrics, partition),
)?))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
let input_stats = self.input.partition_statistics(partition)?;
let output_schema = self.schema();
self.projector
.projection()
.project_statistics(input_stats, &output_schema)
}
fn supports_limit_pushdown(&self) -> bool {
true
}
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Equal
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let maybe_unified = try_unifying_projections(projection, self)?;
if let Some(new_plan) = maybe_unified {
remove_unnecessary_projections(new_plan).data().map(Some)
} else {
Ok(Some(Arc::new(projection.clone())))
}
}
fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
let invert_alias_map = self.collect_reverse_alias()?;
let output_schema = self.schema();
let remapper = FilterRemapper::new(output_schema);
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
for filter in parent_filters {
if let Some(reassigned) = remapper.try_remap(&filter)? {
let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
let rewritten = reassigned.rewrite(&mut rewriter)?.data;
child_parent_filters.push(PushedDownPredicate::supported(rewritten));
} else {
child_parent_filters.push(PushedDownPredicate::unsupported(filter));
}
}
Ok(FilterDescription::new().with_child(ChildFilterDescription {
parent_filters: child_parent_filters,
self_filters: vec![],
}))
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
let child = self.input();
let mut child_order = Vec::new();
for sort_expr in order {
let mut can_pushdown = true;
let transformed = Arc::clone(&sort_expr.expr).transform(|expr| {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
if col.index() >= self.expr().len() {
can_pushdown = false;
return Ok(Transformed::no(expr));
}
let proj_expr = &self.expr()[col.index()];
if let Some(child_col) =
proj_expr.expr.as_any().downcast_ref::<Column>()
{
Ok(Transformed::yes(Arc::new(child_col.clone()) as _))
} else {
can_pushdown = false;
Ok(Transformed::no(expr))
}
} else {
Ok(Transformed::no(expr))
}
})?;
if !can_pushdown {
return Ok(SortOrderPushdownResult::Unsupported);
}
child_order.push(PhysicalSortExpr {
expr: transformed.data,
options: sort_expr.options,
});
}
match child.try_pushdown_sort(&child_order)? {
SortOrderPushdownResult::Exact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Exact { inner: new_exec })
}
SortOrderPushdownResult::Inexact { inner } => {
let new_exec = Arc::new(self.clone()).with_new_children(vec![inner])?;
Ok(SortOrderPushdownResult::Inexact { inner: new_exec })
}
SortOrderPushdownResult::Unsupported => {
Ok(SortOrderPushdownResult::Unsupported)
}
}
}
fn with_preserve_order(
&self,
preserve_order: bool,
) -> Option<Arc<dyn ExecutionPlan>> {
self.input
.with_preserve_order(preserve_order)
.and_then(|new_input| {
Arc::new(self.clone())
.with_new_children(vec![new_input])
.ok()
})
}
}
impl ProjectionStream {
fn new(
projector: Projector,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
) -> Result<Self> {
Ok(Self {
projector,
input,
baseline_metrics,
})
}
fn batch_project(&self, batch: &RecordBatch) -> Result<RecordBatch> {
let _timer = self.baseline_metrics.elapsed_compute().timer();
self.projector.project_batch(batch)
}
}
struct ProjectionStream {
projector: Projector,
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
}
impl Stream for ProjectionStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => Some(self.batch_project(&batch)),
other => other,
});
self.baseline_metrics.record_poll(poll)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}
impl RecordBatchStream for ProjectionStream {
fn schema(&self) -> SchemaRef {
Arc::clone(self.projector.output_schema())
}
}
pub trait EmbeddedProjection: ExecutionPlan + Sized {
fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>;
}
pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
projection: &ProjectionExec,
execution_plan: &Exec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().is_empty() {
let new_execution_plan = Arc::new(execution_plan.with_projection(Some(vec![]))?);
return Ok(Some(new_execution_plan));
}
let projection_index = collect_column_indices(projection.expr());
if projection_index.is_empty() {
return Ok(None);
};
if projection_index.len() == projection_index.last().unwrap() + 1
&& projection_index.len() == execution_plan.schema().fields().len()
{
return Ok(None);
}
let new_execution_plan =
Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
let embed_project_exprs = projection_index
.iter()
.zip(new_execution_plan.schema().fields())
.map(|(index, field)| ProjectionExpr {
expr: Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
alias: field.name().to_owned(),
})
.collect::<Vec<_>>();
let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
for proj_expr in projection.expr() {
let Some(expr) =
update_expr(&proj_expr.expr, embed_project_exprs.as_slice(), false)?
else {
return Ok(None);
};
new_projection_exprs.push(ProjectionExpr {
expr,
alias: proj_expr.alias.clone(),
});
}
let new_projection = Arc::new(ProjectionExec::try_new(
new_projection_exprs,
Arc::clone(&new_execution_plan) as _,
)?);
if is_projection_removable(&new_projection) {
Ok(Some(new_execution_plan))
} else {
Ok(Some(new_projection))
}
}
pub struct JoinData {
pub projected_left_child: ProjectionExec,
pub projected_right_child: ProjectionExec,
pub join_filter: Option<JoinFilter>,
pub join_on: JoinOn,
}
pub fn try_pushdown_through_join(
projection: &ProjectionExec,
join_left: &Arc<dyn ExecutionPlan>,
join_right: &Arc<dyn ExecutionPlan>,
join_on: JoinOnRef,
schema: &SchemaRef,
filter: Option<&JoinFilter>,
) -> Result<Option<JoinData>> {
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};
let (far_right_left_col_ind, far_left_right_col_ind) =
join_table_borders(join_left.schema().fields().len(), &projection_as_columns);
if !join_allows_pushdown(
&projection_as_columns,
schema,
far_right_left_col_ind,
far_left_right_col_ind,
) {
return Ok(None);
}
let new_filter = if let Some(filter) = filter {
match update_join_filter(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
join_left.schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
}
} else {
None
};
let Some(new_on) = update_join_on(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
join_on,
join_left.schema().fields().len(),
) else {
return Ok(None);
};
let (new_left, new_right) = new_join_children(
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
join_left,
join_right,
)?;
Ok(Some(JoinData {
projected_left_child: new_left,
projected_right_child: new_right,
join_filter: new_filter,
join_on: new_on,
}))
}
pub fn remove_unnecessary_projections(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let maybe_modified =
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
if is_projection_removable(projection) {
return Ok(Transformed::yes(Arc::clone(projection.input())));
}
projection
.input()
.try_swapping_with_projection(projection)?
} else {
return Ok(Transformed::no(plan));
};
Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes))
}
fn is_projection_removable(projection: &ProjectionExec) -> bool {
let exprs = projection.expr();
exprs.iter().enumerate().all(|(idx, proj_expr)| {
let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() else {
return false;
};
col.name() == proj_expr.alias && col.index() == idx
}) && exprs.len() == projection.input().schema().fields().len()
}
pub fn all_alias_free_columns(exprs: &[ProjectionExpr]) -> bool {
exprs.iter().all(|proj_expr| {
proj_expr
.expr
.as_any()
.downcast_ref::<Column>()
.map(|column| column.name() == proj_expr.alias)
.unwrap_or(false)
})
}
pub fn new_projections_for_columns(
projection: &[ProjectionExpr],
source: &[usize],
) -> Vec<usize> {
projection
.iter()
.filter_map(|proj_expr| {
proj_expr
.expr
.as_any()
.downcast_ref::<Column>()
.map(|expr| source[expr.index()])
})
.collect()
}
pub fn make_with_child(
projection: &ProjectionExec,
child: &Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
ProjectionExec::try_new(projection.expr().to_vec(), Arc::clone(child))
.map(|e| Arc::new(e) as _)
}
pub fn all_columns(exprs: &[ProjectionExpr]) -> bool {
exprs
.iter()
.all(|proj_expr| proj_expr.expr.as_any().is::<Column>())
}
pub fn update_ordering(
ordering: LexOrdering,
projected_exprs: &[ProjectionExpr],
) -> Result<Option<LexOrdering>> {
let mut updated_exprs = vec![];
for mut sort_expr in ordering.into_iter() {
let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
else {
return Ok(None);
};
sort_expr.expr = updated_expr;
updated_exprs.push(sort_expr);
}
Ok(LexOrdering::new(updated_exprs))
}
pub fn update_ordering_requirement(
reqs: LexRequirement,
projected_exprs: &[ProjectionExpr],
) -> Result<Option<LexRequirement>> {
let mut updated_exprs = vec![];
for mut sort_expr in reqs.into_iter() {
let Some(updated_expr) = update_expr(&sort_expr.expr, projected_exprs, false)?
else {
return Ok(None);
};
sort_expr.expr = updated_expr;
updated_exprs.push(sort_expr);
}
Ok(LexRequirement::new(updated_exprs))
}
pub fn physical_to_column_exprs(
exprs: &[ProjectionExpr],
) -> Option<Vec<(Column, String)>> {
exprs
.iter()
.map(|proj_expr| {
proj_expr
.expr
.as_any()
.downcast_ref::<Column>()
.map(|col| (col.clone(), proj_expr.alias.clone()))
})
.collect()
}
pub fn new_join_children(
projection_as_columns: &[(Column, String)],
far_right_left_col_ind: i32,
far_left_right_col_ind: i32,
left_child: &Arc<dyn ExecutionPlan>,
right_child: &Arc<dyn ExecutionPlan>,
) -> Result<(ProjectionExec, ProjectionExec)> {
let new_left = ProjectionExec::try_new(
projection_as_columns[0..=far_right_left_col_ind as _]
.iter()
.map(|(col, alias)| ProjectionExpr {
expr: Arc::new(Column::new(col.name(), col.index())) as _,
alias: alias.clone(),
}),
Arc::clone(left_child),
)?;
let left_size = left_child.schema().fields().len() as i32;
let new_right = ProjectionExec::try_new(
projection_as_columns[far_left_right_col_ind as _..]
.iter()
.map(|(col, alias)| {
ProjectionExpr {
expr: Arc::new(Column::new(
col.name(),
(col.index() as i32 - left_size) as _,
)) as _,
alias: alias.clone(),
}
}),
Arc::clone(right_child),
)?;
Ok((new_left, new_right))
}
pub fn join_allows_pushdown(
projection_as_columns: &[(Column, String)],
join_schema: &SchemaRef,
far_right_left_col_ind: i32,
far_left_right_col_ind: i32,
) -> bool {
projection_as_columns.len() < join_schema.fields().len()
&& (far_right_left_col_ind + 1 == far_left_right_col_ind)
&& far_right_left_col_ind >= 0
&& far_left_right_col_ind < projection_as_columns.len() as i32
}
pub fn join_table_borders(
left_table_column_count: usize,
projection_as_columns: &[(Column, String)],
) -> (i32, i32) {
let far_right_left_col_ind = projection_as_columns
.iter()
.enumerate()
.take_while(|(_, (projection_column, _))| {
projection_column.index() < left_table_column_count
})
.last()
.map(|(index, _)| index as i32)
.unwrap_or(-1);
let far_left_right_col_ind = projection_as_columns
.iter()
.enumerate()
.rev()
.take_while(|(_, (projection_column, _))| {
projection_column.index() >= left_table_column_count
})
.last()
.map(|(index, _)| index as i32)
.unwrap_or(projection_as_columns.len() as i32);
(far_right_left_col_ind, far_left_right_col_ind)
}
pub fn update_join_on(
proj_left_exprs: &[(Column, String)],
proj_right_exprs: &[(Column, String)],
hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
left_field_size: usize,
) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
.iter()
.map(|(left, right)| (left, right))
.unzip();
let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
let new_right_columns =
new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
match (new_left_columns, new_right_columns) {
(Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
_ => None,
}
}
pub fn update_join_filter(
projection_left_exprs: &[(Column, String)],
projection_right_exprs: &[(Column, String)],
join_filter: &JoinFilter,
left_field_size: usize,
) -> Option<JoinFilter> {
let mut new_left_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Left,
projection_left_exprs,
0,
)
.into_iter();
let mut new_right_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Right,
projection_right_exprs,
left_field_size,
)
.into_iter();
(new_right_indices.len() + new_left_indices.len()
== join_filter.column_indices().len())
.then(|| {
JoinFilter::new(
Arc::clone(join_filter.expression()),
join_filter
.column_indices()
.iter()
.map(|col_idx| ColumnIndex {
index: if col_idx.side == JoinSide::Left {
new_left_indices.next().unwrap()
} else {
new_right_indices.next().unwrap()
},
side: col_idx.side,
})
.collect(),
Arc::clone(join_filter.schema()),
)
})
}
fn try_unifying_projections(
projection: &ProjectionExec,
child: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut projected_exprs = vec![];
let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
projection.expr().iter().for_each(|proj_expr| {
proj_expr
.expr
.apply(|expr| {
Ok({
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
*column_ref_map.entry(column.clone()).or_default() += 1;
}
TreeNodeRecursion::Continue
})
})
.unwrap();
});
if column_ref_map.iter().any(|(column, count)| {
*count > 1
&& !child.expr()[column.index()]
.expr
.placement()
.should_push_to_leaves()
}) {
return Ok(None);
}
for proj_expr in projection.expr() {
let Some(expr) = update_expr(&proj_expr.expr, child.expr(), true)? else {
return Ok(None);
};
projected_exprs.push(ProjectionExpr {
expr,
alias: proj_expr.alias.clone(),
});
}
ProjectionExec::try_new(projected_exprs, Arc::clone(child.input()))
.map(|e| Some(Arc::new(e) as _))
}
fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
let mut indices = exprs
.iter()
.flat_map(|proj_expr| collect_columns(&proj_expr.expr))
.map(|x| x.index())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
indices.sort();
indices
}
fn new_indices_for_join_filter(
join_filter: &JoinFilter,
join_side: JoinSide,
projection_exprs: &[(Column, String)],
column_index_offset: usize,
) -> Vec<usize> {
join_filter
.column_indices()
.iter()
.filter(|col_idx| col_idx.side == join_side)
.filter_map(|col_idx| {
projection_exprs
.iter()
.position(|(col, _)| col_idx.index + column_index_offset == col.index())
})
.collect()
}
fn new_columns_for_join_on(
hash_join_on: &[&PhysicalExprRef],
projection_exprs: &[(Column, String)],
column_index_offset: usize,
) -> Option<Vec<PhysicalExprRef>> {
let new_columns = hash_join_on
.iter()
.filter_map(|on| {
Arc::clone(*on)
.transform(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
let new_column = projection_exprs
.iter()
.enumerate()
.find(|(_, (proj_column, _))| {
column.name() == proj_column.name()
&& column.index() + column_index_offset
== proj_column.index()
})
.map(|(index, (_, alias))| Column::new(alias, index));
if let Some(new_column) = new_column {
Ok(Transformed::yes(Arc::new(new_column)))
} else {
internal_err!(
"Column {:?} not found in projection expressions",
column
)
}
} else {
Ok(Transformed::no(expr))
}
})
.data()
.ok()
})
.collect::<Vec<_>>();
(new_columns.len() == hash_join_on.len()).then_some(new_columns)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use crate::common::collect;
use crate::filter_pushdown::PushedDown;
use crate::test;
use crate::test::exec::StatisticsExec;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::ScalarValue;
use datafusion_common::stats::{ColumnStatistics, Precision, Statistics};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{
BinaryExpr, Column, DynamicFilterPhysicalExpr, Literal, binary, col, lit,
};
#[test]
fn test_collect_column_indices() -> Result<()> {
let expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 7)),
Operator::Minus,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::Plus,
Arc::new(Column::new("a", 1)),
)),
));
let column_indices = collect_column_indices(&[ProjectionExpr {
expr,
alias: "b-(1+a)".to_string(),
}]);
assert_eq!(column_indices, vec![1, 7]);
Ok(())
}
#[test]
fn test_join_table_borders() -> Result<()> {
let projections = vec![
(Column::new("b", 1), "b".to_owned()),
(Column::new("c", 2), "c".to_owned()),
(Column::new("e", 4), "e".to_owned()),
(Column::new("d", 3), "d".to_owned()),
(Column::new("c", 2), "c".to_owned()),
(Column::new("f", 5), "f".to_owned()),
(Column::new("h", 7), "h".to_owned()),
(Column::new("g", 6), "g".to_owned()),
];
let left_table_column_count = 5;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(4, 5)
);
let left_table_column_count = 8;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(7, 8)
);
let left_table_column_count = 1;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(-1, 0)
);
let projections = vec![
(Column::new("a", 0), "a".to_owned()),
(Column::new("b", 1), "b".to_owned()),
(Column::new("d", 3), "d".to_owned()),
(Column::new("g", 6), "g".to_owned()),
(Column::new("e", 4), "e".to_owned()),
(Column::new("f", 5), "f".to_owned()),
(Column::new("e", 4), "e".to_owned()),
(Column::new("h", 7), "h".to_owned()),
];
let left_table_column_count = 5;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(2, 7)
);
let left_table_column_count = 7;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(6, 7)
);
Ok(())
}
#[tokio::test]
async fn project_no_column() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let exec = test::scan_partitioned(1);
let expected = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
let projection = ProjectionExec::try_new(vec![] as Vec<ProjectionExpr>, exec)?;
let stream = projection.execute(0, Arc::clone(&task_ctx))?;
let output = collect(stream).await?;
assert_eq!(output.len(), expected.len());
Ok(())
}
#[tokio::test]
async fn project_old_syntax() {
let exec = test::scan_partitioned(1);
let schema = exec.schema();
let expr = col("i", &schema).unwrap();
ProjectionExec::try_new(
vec![
(expr, "c".to_string()),
],
exec,
)
.unwrap();
}
#[test]
fn test_projection_statistics_uses_input_schema() {
let input_schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, false),
Field::new("e", DataType::Int32, false),
Field::new("f", DataType::Int32, false),
]);
let input_statistics = Statistics {
num_rows: Precision::Exact(10),
column_statistics: vec![
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Int32(Some(5))),
max_value: Precision::Exact(ScalarValue::Int32(Some(50))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Int32(Some(10))),
max_value: Precision::Exact(ScalarValue::Int32(Some(40))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Int32(Some(20))),
max_value: Precision::Exact(ScalarValue::Int32(Some(30))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Int32(Some(21))),
max_value: Precision::Exact(ScalarValue::Int32(Some(29))),
..Default::default()
},
ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Int32(Some(24))),
max_value: Precision::Exact(ScalarValue::Int32(Some(26))),
..Default::default()
},
],
..Default::default()
};
let input = Arc::new(StatisticsExec::new(input_statistics, input_schema));
let exprs: Vec<ProjectionExpr> = vec![
ProjectionExpr {
expr: Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
alias: "c_renamed".to_string(),
},
ProjectionExpr {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("e", 4)),
Operator::Plus,
Arc::new(Column::new("f", 5)),
)) as Arc<dyn PhysicalExpr>,
alias: "e_plus_f".to_string(),
},
];
let projection = ProjectionExec::try_new(exprs, input).unwrap();
let stats = projection.partition_statistics(None).unwrap();
assert_eq!(stats.num_rows, Precision::Exact(10));
assert_eq!(
stats.column_statistics.len(),
2,
"Expected 2 columns in projection statistics"
);
assert!(stats.total_byte_size.is_exact().unwrap_or(false));
}
#[test]
fn test_filter_pushdown_with_alias() -> Result<()> {
let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&input_schema),
input_schema.clone(),
));
let projection = ProjectionExec::try_new(
vec![ProjectionExpr {
expr: Arc::new(Column::new("a", 0)),
alias: "b".to_string(),
}],
input,
)?;
let filter = Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)) as Arc<dyn PhysicalExpr>;
let description = projection.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![filter],
&ConfigOptions::default(),
)?;
let expected_filter = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)) as Arc<dyn PhysicalExpr>;
assert_eq!(description.self_filters(), vec![vec![]]);
let pushed_filters = &description.parent_filters()[0];
assert_eq!(
format!("{}", pushed_filters[0].predicate),
format!("{}", expected_filter)
);
assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
Ok(())
}
#[test]
fn test_filter_pushdown_with_multiple_aliases() -> Result<()> {
let input_schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
column_statistics: vec![Default::default(); input_schema.fields().len()],
..Default::default()
},
input_schema.clone(),
));
let projection = ProjectionExec::try_new(
vec![
ProjectionExpr {
expr: Arc::new(Column::new("a", 0)),
alias: "x".to_string(),
},
ProjectionExpr {
expr: Arc::new(Column::new("b", 1)),
alias: "y".to_string(),
},
],
input,
)?;
let filter1 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)) as Arc<dyn PhysicalExpr>;
let filter2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("y", 1)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let description = projection.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![filter1, filter2],
&ConfigOptions::default(),
)?;
let expected_filter1 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)) as Arc<dyn PhysicalExpr>;
let expected_filter2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let pushed_filters = &description.parent_filters()[0];
assert_eq!(pushed_filters.len(), 2);
assert_eq!(
format!("{}", pushed_filters[0].predicate),
format!("{}", expected_filter1)
);
assert_eq!(
format!("{}", pushed_filters[1].predicate),
format!("{}", expected_filter2)
);
assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
Ok(())
}
#[test]
fn test_filter_pushdown_with_swapped_aliases() -> Result<()> {
let input_schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
column_statistics: vec![Default::default(); input_schema.fields().len()],
..Default::default()
},
input_schema.clone(),
));
let projection = ProjectionExec::try_new(
vec![
ProjectionExpr {
expr: Arc::new(Column::new("a", 0)),
alias: "b".to_string(),
},
ProjectionExpr {
expr: Arc::new(Column::new("b", 1)),
alias: "a".to_string(),
},
],
input,
)?;
let filter1 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)) as Arc<dyn PhysicalExpr>;
let filter2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 1)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let description = projection.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![filter1, filter2],
&ConfigOptions::default(),
)?;
let pushed_filters = &description.parent_filters()[0];
assert_eq!(pushed_filters.len(), 2);
let expected_filter1 = "a@0 > 5";
let expected_filter2 = "b@1 < 10";
assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
Ok(())
}
#[test]
fn test_filter_pushdown_with_mixed_columns() -> Result<()> {
let input_schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
column_statistics: vec![Default::default(); input_schema.fields().len()],
..Default::default()
},
input_schema.clone(),
));
let projection = ProjectionExec::try_new(
vec![
ProjectionExpr {
expr: Arc::new(Column::new("a", 0)),
alias: "x".to_string(),
},
ProjectionExpr {
expr: Arc::new(Column::new("b", 1)),
alias: "b".to_string(),
},
],
input,
)?;
let filter1 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)) as Arc<dyn PhysicalExpr>;
let filter2 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let description = projection.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![filter1, filter2],
&ConfigOptions::default(),
)?;
let pushed_filters = &description.parent_filters()[0];
assert_eq!(pushed_filters.len(), 2);
let expected_filter1 = "a@0 > 5";
let expected_filter2 = "b@1 < 10";
assert_eq!(format!("{}", pushed_filters[0].predicate), expected_filter1);
assert_eq!(format!("{}", pushed_filters[1].predicate), expected_filter2);
assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
assert!(matches!(pushed_filters[1].discriminant, PushedDown::Yes));
Ok(())
}
#[test]
fn test_filter_pushdown_with_complex_expression() -> Result<()> {
let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
column_statistics: vec![Default::default(); input_schema.fields().len()],
..Default::default()
},
input_schema.clone(),
));
let projection = ProjectionExec::try_new(
vec![ProjectionExpr {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
alias: "z".to_string(),
}],
input,
)?;
let filter = Arc::new(BinaryExpr::new(
Arc::new(Column::new("z", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let description = projection.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![filter],
&ConfigOptions::default(),
)?;
let pushed_filters = &description.parent_filters()[0];
assert!(matches!(pushed_filters[0].discriminant, PushedDown::Yes));
assert_eq!(format!("{}", pushed_filters[0].predicate), "a@0 + 1 > 10");
Ok(())
}
#[test]
fn test_filter_pushdown_with_unknown_column() -> Result<()> {
let input_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
column_statistics: vec![Default::default(); input_schema.fields().len()],
..Default::default()
},
input_schema.clone(),
));
let projection = ProjectionExec::try_new(
vec![ProjectionExpr {
expr: Arc::new(Column::new("a", 0)),
alias: "a".to_string(),
}],
input,
)?;
let filter = Arc::new(BinaryExpr::new(
Arc::new(Column::new("unknown_col", 1)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
)) as Arc<dyn PhysicalExpr>;
let description = projection.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![filter],
&ConfigOptions::default(),
)?;
let pushed_filters = &description.parent_filters()[0];
assert!(matches!(pushed_filters[0].discriminant, PushedDown::No));
assert_eq!(
format!("{}", pushed_filters[0].predicate),
"unknown_col@1 > 5"
);
Ok(())
}
#[test]
fn test_basic_dyn_filter_projection_pushdown_update_child() -> Result<()> {
let input_schema =
Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)]));
let input = Arc::new(StatisticsExec::new(
Statistics {
column_statistics: vec![Default::default(); input_schema.fields().len()],
..Default::default()
},
input_schema.as_ref().clone(),
));
let projection = ProjectionExec::try_new(
vec![ProjectionExpr {
expr: binary(
Arc::new(Column::new("b", 0)),
Operator::Minus,
lit(1),
&input_schema,
)
.unwrap(),
alias: "a".to_string(),
}],
input,
)?;
let projected_schema = projection.schema();
let col_a = col("a", &projected_schema)?;
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::clone(&col_a)],
lit(true),
));
let current = dynamic_filter.current()?;
assert_eq!(format!("{current}"), "true");
let dyn_phy_expr: Arc<dyn PhysicalExpr> = Arc::clone(&dynamic_filter) as _;
let description = projection.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![dyn_phy_expr],
&ConfigOptions::default(),
)?;
let pushed_filters = &description.parent_filters()[0][0];
assert_eq!(
format!("{}", pushed_filters.predicate),
"DynamicFilter [ empty ]"
);
let new_expr =
Arc::new(BinaryExpr::new(Arc::clone(&col_a), Operator::Gt, lit(5i32)));
dynamic_filter.update(new_expr)?;
let current = dynamic_filter.current()?;
assert_eq!(format!("{current}"), "a@0 > 5");
assert_eq!(
format!("{}", pushed_filters.predicate),
"DynamicFilter [ b@0 - 1 > 5 ]"
);
Ok(())
}
}