use std::collections::HashMap;
use std::sync::Arc;
use super::output_requirements::OutputRequirementExec;
use crate::datasource::physical_plan::CsvExec;
use crate::error::Result;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use crate::physical_plan::joins::{
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec,
SymmetricHashJoinExec,
};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{Distribution, ExecutionPlan, ExecutionPlanProperties};
use arrow_schema::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{internal_err, JoinSide};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::{
utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_physical_plan::streaming::StreamingTableExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use itertools::Itertools;
#[derive(Default)]
pub struct ProjectionPushdown {}
impl ProjectionPushdown {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for ProjectionPushdown {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(remove_unnecessary_projections).data()
}
fn name(&self) -> &str {
"ProjectionPushdown"
}
fn schema_check(&self) -> bool {
true
}
}
pub fn remove_unnecessary_projections(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let maybe_modified = if let Some(projection) =
plan.as_any().downcast_ref::<ProjectionExec>()
{
if is_projection_removable(projection) {
return Ok(Transformed::yes(projection.input().clone()));
}
let input = projection.input().as_any();
if let Some(csv) = input.downcast_ref::<CsvExec>() {
try_swapping_with_csv(projection, csv)
} else if let Some(memory) = input.downcast_ref::<MemoryExec>() {
try_swapping_with_memory(projection, memory)?
} else if let Some(child_projection) = input.downcast_ref::<ProjectionExec>() {
let maybe_unified = try_unifying_projections(projection, child_projection)?;
return if let Some(new_plan) = maybe_unified {
remove_unnecessary_projections(new_plan)
.data()
.map(Transformed::yes)
} else {
Ok(Transformed::no(plan))
};
} else if let Some(output_req) = input.downcast_ref::<OutputRequirementExec>() {
try_swapping_with_output_req(projection, output_req)?
} else if input.is::<CoalescePartitionsExec>() {
try_swapping_with_coalesce_partitions(projection)?
} else if let Some(filter) = input.downcast_ref::<FilterExec>() {
try_swapping_with_filter(projection, filter)?
} else if let Some(repartition) = input.downcast_ref::<RepartitionExec>() {
try_swapping_with_repartition(projection, repartition)?
} else if let Some(sort) = input.downcast_ref::<SortExec>() {
try_swapping_with_sort(projection, sort)?
} else if let Some(spm) = input.downcast_ref::<SortPreservingMergeExec>() {
try_swapping_with_sort_preserving_merge(projection, spm)?
} else if let Some(union) = input.downcast_ref::<UnionExec>() {
try_pushdown_through_union(projection, union)?
} else if let Some(hash_join) = input.downcast_ref::<HashJoinExec>() {
try_pushdown_through_hash_join(projection, hash_join)?.map_or_else(
|| try_embed_to_hash_join(projection, hash_join),
|e| Ok(Some(e)),
)?
} else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>() {
try_swapping_with_cross_join(projection, cross_join)?
} else if let Some(nl_join) = input.downcast_ref::<NestedLoopJoinExec>() {
try_swapping_with_nested_loop_join(projection, nl_join)?
} else if let Some(sm_join) = input.downcast_ref::<SortMergeJoinExec>() {
try_swapping_with_sort_merge_join(projection, sm_join)?
} else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() {
try_swapping_with_sym_hash_join(projection, sym_join)?
} else if let Some(ste) = input.downcast_ref::<StreamingTableExec>() {
try_swapping_with_streaming_table(projection, ste)?
} else {
None
}
} else {
return Ok(Transformed::no(plan));
};
Ok(maybe_modified.map_or(Transformed::no(plan), Transformed::yes))
}
fn try_swapping_with_csv(
projection: &ProjectionExec,
csv: &CsvExec,
) -> Option<Arc<dyn ExecutionPlan>> {
all_alias_free_columns(projection.expr()).then(|| {
let mut file_scan = csv.base_config().clone();
let new_projections = new_projections_for_columns(
projection,
&file_scan
.projection
.unwrap_or((0..csv.schema().fields().len()).collect()),
);
file_scan.projection = Some(new_projections);
Arc::new(
CsvExec::builder(file_scan)
.with_has_header(csv.has_header())
.with_delimeter(csv.delimiter())
.with_quote(csv.quote())
.with_escape(csv.escape())
.with_comment(csv.comment())
.with_newlines_in_values(csv.newlines_in_values())
.with_file_compression_type(csv.file_compression_type)
.build(),
) as _
})
}
fn try_swapping_with_memory(
projection: &ProjectionExec,
memory: &MemoryExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
all_alias_free_columns(projection.expr())
.then(|| {
let all_projections = (0..memory.schema().fields().len()).collect();
let new_projections = new_projections_for_columns(
projection,
memory.projection().as_ref().unwrap_or(&all_projections),
);
MemoryExec::try_new(
memory.partitions(),
memory.original_schema(),
Some(new_projections),
)
.map(|e| Arc::new(e) as _)
})
.transpose()
}
fn try_swapping_with_streaming_table(
projection: &ProjectionExec,
streaming_table: &StreamingTableExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if !all_alias_free_columns(projection.expr()) {
return Ok(None);
}
let streaming_table_projections = streaming_table
.projection()
.as_ref()
.map(|i| i.as_ref().to_vec());
let new_projections = new_projections_for_columns(
projection,
&streaming_table_projections
.unwrap_or((0..streaming_table.schema().fields().len()).collect()),
);
let mut lex_orderings = vec![];
for lex_ordering in streaming_table.projected_output_ordering().into_iter() {
let mut orderings = vec![];
for order in lex_ordering {
let Some(new_ordering) = update_expr(&order.expr, projection.expr(), false)?
else {
return Ok(None);
};
orderings.push(PhysicalSortExpr {
expr: new_ordering,
options: order.options,
});
}
lex_orderings.push(orderings);
}
StreamingTableExec::try_new(
streaming_table.partition_schema().clone(),
streaming_table.partitions().clone(),
Some(new_projections.as_ref()),
lex_orderings,
streaming_table.is_infinite(),
streaming_table.limit(),
)
.map(|e| Some(Arc::new(e) as _))
}
fn try_unifying_projections(
projection: &ProjectionExec,
child: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut projected_exprs = vec![];
let mut column_ref_map: HashMap<Column, usize> = HashMap::new();
projection.expr().iter().for_each(|(expr, _)| {
expr.apply(|expr| {
Ok({
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
*column_ref_map.entry(column.clone()).or_default() += 1;
}
TreeNodeRecursion::Continue
})
})
.unwrap();
});
if column_ref_map.iter().any(|(column, count)| {
*count > 1 && !is_expr_trivial(&child.expr()[column.index()].0.clone())
}) {
return Ok(None);
}
for (expr, alias) in projection.expr() {
let Some(expr) = update_expr(expr, child.expr(), true)? else {
return Ok(None);
};
projected_exprs.push((expr, alias.clone()));
}
ProjectionExec::try_new(projected_exprs, child.input().clone())
.map(|e| Some(Arc::new(e) as _))
}
fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
expr.as_any().downcast_ref::<Column>().is_some()
|| expr.as_any().downcast_ref::<Literal>().is_some()
}
fn try_swapping_with_output_req(
projection: &ProjectionExec,
output_req: &OutputRequirementExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
let mut updated_sort_reqs = vec![];
if let Some(reqs) = &output_req.required_input_ordering()[0] {
for req in reqs {
let Some(new_expr) = update_expr(&req.expr, projection.expr(), false)? else {
return Ok(None);
};
updated_sort_reqs.push(PhysicalSortRequirement {
expr: new_expr,
options: req.options,
});
}
}
let dist_req = match &output_req.required_input_distribution()[0] {
Distribution::HashPartitioned(exprs) => {
let mut updated_exprs = vec![];
for expr in exprs {
let Some(new_expr) = update_expr(expr, projection.expr(), false)? else {
return Ok(None);
};
updated_exprs.push(new_expr);
}
Distribution::HashPartitioned(updated_exprs)
}
dist => dist.clone(),
};
make_with_child(projection, &output_req.input())
.map(|input| {
OutputRequirementExec::new(
input,
(!updated_sort_reqs.is_empty()).then_some(updated_sort_reqs),
dist_req,
)
})
.map(|e| Some(Arc::new(e) as _))
}
fn try_swapping_with_coalesce_partitions(
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
make_with_child(projection, projection.input().children()[0])
.map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _))
}
fn try_swapping_with_filter(
projection: &ProjectionExec,
filter: &FilterExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
let Some(new_predicate) = update_expr(filter.predicate(), projection.expr(), false)?
else {
return Ok(None);
};
FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
.and_then(|e| {
let selectivity = filter.default_selectivity();
e.with_default_selectivity(selectivity)
})
.map(|e| Some(Arc::new(e) as _))
}
fn try_swapping_with_repartition(
projection: &ProjectionExec,
repartition: &RepartitionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
if projection.benefits_from_input_partitioning()[0] || !all_columns(projection.expr())
{
return Ok(None);
}
let new_projection = make_with_child(projection, repartition.input())?;
let new_partitioning = match repartition.partitioning() {
Partitioning::Hash(partitions, size) => {
let mut new_partitions = vec![];
for partition in partitions {
let Some(new_partition) =
update_expr(partition, projection.expr(), false)?
else {
return Ok(None);
};
new_partitions.push(new_partition);
}
Partitioning::Hash(new_partitions, *size)
}
others => others.clone(),
};
Ok(Some(Arc::new(RepartitionExec::try_new(
new_projection,
new_partitioning,
)?)))
}
fn try_swapping_with_sort(
projection: &ProjectionExec,
sort: &SortExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
let mut updated_exprs = vec![];
for sort in sort.expr() {
let Some(new_expr) = update_expr(&sort.expr, projection.expr(), false)? else {
return Ok(None);
};
updated_exprs.push(PhysicalSortExpr {
expr: new_expr,
options: sort.options,
});
}
Ok(Some(Arc::new(
SortExec::new(updated_exprs, make_with_child(projection, sort.input())?)
.with_fetch(sort.fetch())
.with_preserve_partitioning(sort.preserve_partitioning()),
)))
}
fn try_swapping_with_sort_preserving_merge(
projection: &ProjectionExec,
spm: &SortPreservingMergeExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
let mut updated_exprs = vec![];
for sort in spm.expr() {
let Some(updated_expr) = update_expr(&sort.expr, projection.expr(), false)?
else {
return Ok(None);
};
updated_exprs.push(PhysicalSortExpr {
expr: updated_expr,
options: sort.options,
});
}
Ok(Some(Arc::new(
SortPreservingMergeExec::new(
updated_exprs,
make_with_child(projection, spm.input())?,
)
.with_fetch(spm.fetch()),
)))
}
fn try_pushdown_through_union(
projection: &ProjectionExec,
union: &UnionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
let new_children = union
.children()
.into_iter()
.map(|child| make_with_child(projection, child))
.collect::<Result<Vec<_>>>()?;
Ok(Some(Arc::new(UnionExec::new(new_children))))
}
fn try_embed_to_hash_join(
projection: &ProjectionExec,
hash_join: &HashJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let projection_index = collect_column_indices(projection.expr());
if projection_index.is_empty() {
return Ok(None);
};
if projection_index.len() == projection_index.last().unwrap() + 1
&& projection_index.len() == hash_join.schema().fields().len()
{
return Ok(None);
}
let new_hash_join =
Arc::new(hash_join.with_projection(Some(projection_index.to_vec()))?);
let embed_project_exprs = projection_index
.iter()
.zip(new_hash_join.schema().fields())
.map(|(index, field)| {
(
Arc::new(Column::new(field.name(), *index)) as Arc<dyn PhysicalExpr>,
field.name().to_owned(),
)
})
.collect::<Vec<_>>();
let mut new_projection_exprs = Vec::with_capacity(projection.expr().len());
for (expr, alias) in projection.expr() {
let Some(expr) = update_expr(expr, embed_project_exprs.as_slice(), false)? else {
return Ok(None);
};
new_projection_exprs.push((expr, alias.clone()));
}
let new_projection = Arc::new(ProjectionExec::try_new(
new_projection_exprs,
new_hash_join.clone(),
)?);
if is_projection_removable(&new_projection) {
Ok(Some(new_hash_join))
} else {
Ok(Some(new_projection))
}
}
fn collect_column_indices(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> Vec<usize> {
let mut indexs = exprs
.iter()
.flat_map(|(expr, _)| collect_columns(expr))
.map(|x| x.index())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
indexs.sort();
indexs
}
fn try_pushdown_through_hash_join(
projection: &ProjectionExec,
hash_join: &HashJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if hash_join.contain_projection() {
return Ok(None);
}
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};
let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
hash_join.left().schema().fields().len(),
&projection_as_columns,
);
if !join_allows_pushdown(
&projection_as_columns,
hash_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
return Ok(None);
}
let Some(new_on) = update_join_on(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
hash_join.on(),
hash_join.left().schema().fields().len(),
) else {
return Ok(None);
};
let new_filter = if let Some(filter) = hash_join.filter() {
match update_join_filter(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
hash_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
}
} else {
None
};
let (new_left, new_right) = new_join_children(
projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
hash_join.left(),
hash_join.right(),
)?;
Ok(Some(Arc::new(HashJoinExec::try_new(
Arc::new(new_left),
Arc::new(new_right),
new_on,
new_filter,
hash_join.join_type(),
hash_join.projection.clone(),
*hash_join.partition_mode(),
hash_join.null_equals_null,
)?)))
}
fn try_swapping_with_cross_join(
projection: &ProjectionExec,
cross_join: &CrossJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};
let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
cross_join.left().schema().fields().len(),
&projection_as_columns,
);
if !join_allows_pushdown(
&projection_as_columns,
cross_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
return Ok(None);
}
let (new_left, new_right) = new_join_children(
projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
cross_join.left(),
cross_join.right(),
)?;
Ok(Some(Arc::new(CrossJoinExec::new(
Arc::new(new_left),
Arc::new(new_right),
))))
}
fn try_swapping_with_nested_loop_join(
projection: &ProjectionExec,
nl_join: &NestedLoopJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};
let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
nl_join.left().schema().fields().len(),
&projection_as_columns,
);
if !join_allows_pushdown(
&projection_as_columns,
nl_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
return Ok(None);
}
let new_filter = if let Some(filter) = nl_join.filter() {
match update_join_filter(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
nl_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
}
} else {
None
};
let (new_left, new_right) = new_join_children(
projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
nl_join.left(),
nl_join.right(),
)?;
Ok(Some(Arc::new(NestedLoopJoinExec::try_new(
Arc::new(new_left),
Arc::new(new_right),
new_filter,
nl_join.join_type(),
)?)))
}
fn try_swapping_with_sort_merge_join(
projection: &ProjectionExec,
sm_join: &SortMergeJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};
let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
sm_join.left().schema().fields().len(),
&projection_as_columns,
);
if !join_allows_pushdown(
&projection_as_columns,
sm_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
return Ok(None);
}
let Some(new_on) = update_join_on(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
sm_join.on(),
sm_join.left().schema().fields().len(),
) else {
return Ok(None);
};
let (new_left, new_right) = new_join_children(
projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
sm_join.children()[0],
sm_join.children()[1],
)?;
Ok(Some(Arc::new(SortMergeJoinExec::try_new(
Arc::new(new_left),
Arc::new(new_right),
new_on,
sm_join.filter.clone(),
sm_join.join_type,
sm_join.sort_options.clone(),
sm_join.null_equals_null,
)?)))
}
fn try_swapping_with_sym_hash_join(
projection: &ProjectionExec,
sym_join: &SymmetricHashJoinExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let Some(projection_as_columns) = physical_to_column_exprs(projection.expr()) else {
return Ok(None);
};
let (far_right_left_col_ind, far_left_right_col_ind) = join_table_borders(
sym_join.left().schema().fields().len(),
&projection_as_columns,
);
if !join_allows_pushdown(
&projection_as_columns,
sym_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
return Ok(None);
}
let Some(new_on) = update_join_on(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
sym_join.on(),
sym_join.left().schema().fields().len(),
) else {
return Ok(None);
};
let new_filter = if let Some(filter) = sym_join.filter() {
match update_join_filter(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
sym_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
}
} else {
None
};
let (new_left, new_right) = new_join_children(
projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
sym_join.left(),
sym_join.right(),
)?;
Ok(Some(Arc::new(SymmetricHashJoinExec::try_new(
Arc::new(new_left),
Arc::new(new_right),
new_on,
new_filter,
sym_join.join_type(),
sym_join.null_equals_null(),
sym_join.right().output_ordering().map(|p| p.to_vec()),
sym_join.left().output_ordering().map(|p| p.to_vec()),
sym_join.partition_mode(),
)?)))
}
fn is_projection_removable(projection: &ProjectionExec) -> bool {
let exprs = projection.expr();
exprs.iter().enumerate().all(|(idx, (expr, alias))| {
let Some(col) = expr.as_any().downcast_ref::<Column>() else {
return false;
};
col.name() == alias && col.index() == idx
}) && exprs.len() == projection.input().schema().fields().len()
}
fn all_alias_free_columns(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> bool {
exprs.iter().all(|(expr, alias)| {
expr.as_any()
.downcast_ref::<Column>()
.map(|column| column.name() == alias)
.unwrap_or(false)
})
}
fn new_projections_for_columns(
projection: &ProjectionExec,
source: &[usize],
) -> Vec<usize> {
projection
.expr()
.iter()
.filter_map(|(expr, _)| {
expr.as_any()
.downcast_ref::<Column>()
.map(|expr| source[expr.index()])
})
.collect()
}
fn update_expr(
expr: &Arc<dyn PhysicalExpr>,
projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
sync_with_child: bool,
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
#[derive(Debug, PartialEq)]
enum RewriteState {
Unchanged,
RewrittenValid,
RewrittenInvalid,
}
let mut state = RewriteState::Unchanged;
let new_expr = expr
.clone()
.transform_up(|expr: Arc<dyn PhysicalExpr>| {
if state == RewriteState::RewrittenInvalid {
return Ok(Transformed::no(expr));
}
let Some(column) = expr.as_any().downcast_ref::<Column>() else {
return Ok(Transformed::no(expr));
};
if sync_with_child {
state = RewriteState::RewrittenValid;
Ok(Transformed::yes(projected_exprs[column.index()].0.clone()))
} else {
state = RewriteState::RewrittenInvalid;
projected_exprs
.iter()
.enumerate()
.find_map(|(index, (projected_expr, alias))| {
projected_expr.as_any().downcast_ref::<Column>().and_then(
|projected_column| {
(column.name().eq(projected_column.name())
&& column.index() == projected_column.index())
.then(|| {
state = RewriteState::RewrittenValid;
Arc::new(Column::new(alias, index)) as _
})
},
)
})
.map_or_else(
|| Ok(Transformed::no(expr)),
|c| Ok(Transformed::yes(c)),
)
}
})
.data();
new_expr.map(|e| (state == RewriteState::RewrittenValid).then_some(e))
}
fn make_with_child(
projection: &ProjectionExec,
child: &Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
ProjectionExec::try_new(projection.expr().to_vec(), child.clone())
.map(|e| Arc::new(e) as _)
}
fn all_columns(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> bool {
exprs.iter().all(|(expr, _)| expr.as_any().is::<Column>())
}
fn physical_to_column_exprs(
exprs: &[(Arc<dyn PhysicalExpr>, String)],
) -> Option<Vec<(Column, String)>> {
exprs
.iter()
.map(|(expr, alias)| {
expr.as_any()
.downcast_ref::<Column>()
.map(|col| (col.clone(), alias.clone()))
})
.collect()
}
fn join_table_borders(
left_table_column_count: usize,
projection_as_columns: &[(Column, String)],
) -> (i32, i32) {
let far_right_left_col_ind = projection_as_columns
.iter()
.enumerate()
.take_while(|(_, (projection_column, _))| {
projection_column.index() < left_table_column_count
})
.last()
.map(|(index, _)| index as i32)
.unwrap_or(-1);
let far_left_right_col_ind = projection_as_columns
.iter()
.enumerate()
.rev()
.take_while(|(_, (projection_column, _))| {
projection_column.index() >= left_table_column_count
})
.last()
.map(|(index, _)| index as i32)
.unwrap_or(projection_as_columns.len() as i32);
(far_right_left_col_ind, far_left_right_col_ind)
}
fn update_join_on(
proj_left_exprs: &[(Column, String)],
proj_right_exprs: &[(Column, String)],
hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
left_field_size: usize,
) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
#[allow(clippy::map_identity)]
let (left_idx, right_idx): (Vec<_>, Vec<_>) = hash_join_on
.iter()
.map(|(left, right)| (left, right))
.unzip();
let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 0);
let new_right_columns =
new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
match (new_left_columns, new_right_columns) {
(Some(left), Some(right)) => Some(left.into_iter().zip(right).collect()),
_ => None,
}
}
fn new_columns_for_join_on(
hash_join_on: &[&PhysicalExprRef],
projection_exprs: &[(Column, String)],
column_index_offset: usize,
) -> Option<Vec<PhysicalExprRef>> {
let new_columns = hash_join_on
.iter()
.filter_map(|on| {
(*on)
.clone()
.transform(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
let new_column = projection_exprs
.iter()
.enumerate()
.find(|(_, (proj_column, _))| {
column.name() == proj_column.name()
&& column.index() + column_index_offset
== proj_column.index()
})
.map(|(index, (_, alias))| Column::new(alias, index));
if let Some(new_column) = new_column {
Ok(Transformed::yes(Arc::new(new_column)))
} else {
internal_err!(
"Column {:?} not found in projection expressions",
column
)
}
} else {
Ok(Transformed::no(expr))
}
})
.data()
.ok()
})
.collect::<Vec<_>>();
(new_columns.len() == hash_join_on.len()).then_some(new_columns)
}
fn update_join_filter(
projection_left_exprs: &[(Column, String)],
projection_right_exprs: &[(Column, String)],
join_filter: &JoinFilter,
left_field_size: usize,
) -> Option<JoinFilter> {
let mut new_left_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Left,
projection_left_exprs,
0,
)
.into_iter();
let mut new_right_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Right,
projection_right_exprs,
left_field_size,
)
.into_iter();
(new_right_indices.len() + new_left_indices.len()
== join_filter.column_indices().len())
.then(|| {
JoinFilter::new(
join_filter.expression().clone(),
join_filter
.column_indices()
.iter()
.map(|col_idx| ColumnIndex {
index: if col_idx.side == JoinSide::Left {
new_left_indices.next().unwrap()
} else {
new_right_indices.next().unwrap()
},
side: col_idx.side,
})
.collect(),
join_filter.schema().clone(),
)
})
}
fn new_indices_for_join_filter(
join_filter: &JoinFilter,
join_side: JoinSide,
projection_exprs: &[(Column, String)],
column_index_offset: usize,
) -> Vec<usize> {
join_filter
.column_indices()
.iter()
.filter(|col_idx| col_idx.side == join_side)
.filter_map(|col_idx| {
projection_exprs
.iter()
.position(|(col, _)| col_idx.index + column_index_offset == col.index())
})
.collect()
}
fn join_allows_pushdown(
projection_as_columns: &[(Column, String)],
join_schema: SchemaRef,
far_right_left_col_ind: i32,
far_left_right_col_ind: i32,
) -> bool {
projection_as_columns.len() < join_schema.fields().len()
&& (far_right_left_col_ind + 1 == far_left_right_col_ind)
&& far_right_left_col_ind >= 0
&& far_left_right_col_ind < projection_as_columns.len() as i32
}
fn new_join_children(
projection_as_columns: Vec<(Column, String)>,
far_right_left_col_ind: i32,
far_left_right_col_ind: i32,
left_child: &Arc<dyn ExecutionPlan>,
right_child: &Arc<dyn ExecutionPlan>,
) -> Result<(ProjectionExec, ProjectionExec)> {
let new_left = ProjectionExec::try_new(
projection_as_columns[0..=far_right_left_col_ind as _]
.iter()
.map(|(col, alias)| {
(
Arc::new(Column::new(col.name(), col.index())) as _,
alias.clone(),
)
})
.collect_vec(),
left_child.clone(),
)?;
let left_size = left_child.schema().fields().len() as i32;
let new_right = ProjectionExec::try_new(
projection_as_columns[far_left_right_col_ind as _..]
.iter()
.map(|(col, alias)| {
(
Arc::new(Column::new(
col.name(),
(col.index() as i32 - left_size) as _,
)) as _,
alias.clone(),
)
})
.collect_vec(),
right_child.clone(),
)?;
Ok((new_left, new_right))
}
#[cfg(test)]
mod tests {
use super::*;
use std::any::Any;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::FileScanConfig;
use crate::physical_plan::get_plan_string;
use crate::physical_plan::joins::StreamJoinPartitionMode;
use arrow_schema::{DataType, Field, Schema, SortOptions};
use datafusion_common::{JoinType, ScalarValue};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{
ColumnarValue, Operator, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
};
use datafusion_physical_expr::expressions::{
BinaryExpr, CaseExpr, CastExpr, NegativeExpr,
};
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_plan::joins::PartitionMode;
use datafusion_physical_plan::streaming::PartitionStream;
#[derive(Debug)]
struct DummyUDF {
signature: Signature,
}
impl DummyUDF {
fn new() -> Self {
Self {
signature: Signature::variadic_any(Volatility::Immutable),
}
}
}
impl ScalarUDFImpl for DummyUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"dummy_udf"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int32)
}
fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> {
unimplemented!("DummyUDF::invoke")
}
}
#[test]
fn test_update_matching_exprs() -> Result<()> {
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 3)),
Operator::Divide,
Arc::new(Column::new("e", 5)),
)),
Arc::new(CastExpr::new(
Arc::new(Column::new("a", 3)),
DataType::Float32,
None,
)),
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Divide,
Arc::new(Column::new("c", 0)),
)),
Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 0)),
Operator::Divide,
Arc::new(Column::new("b", 1)),
)),
],
DataType::Int32,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
vec![
(
Arc::new(Column::new("a", 3)) as Arc<dyn PhysicalExpr>,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("d", 2)),
Operator::Plus,
Arc::new(Column::new("e", 5)),
)) as Arc<dyn PhysicalExpr>,
),
(
Arc::new(Column::new("a", 3)) as Arc<dyn PhysicalExpr>,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("e", 5)),
Operator::Plus,
Arc::new(Column::new("d", 2)),
)) as Arc<dyn PhysicalExpr>,
),
],
Some(Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 3)),
Operator::Modulo,
Arc::new(Column::new("e", 5)),
))),
)?),
];
let child: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![
(Arc::new(Column::new("c", 2)), "c".to_owned()),
(Arc::new(Column::new("b", 1)), "b".to_owned()),
(Arc::new(Column::new("d", 3)), "d".to_owned()),
(Arc::new(Column::new("a", 0)), "a".to_owned()),
(Arc::new(Column::new("f", 5)), "f".to_owned()),
(Arc::new(Column::new("e", 4)), "e".to_owned()),
];
let expected_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Divide,
Arc::new(Column::new("e", 4)),
)),
Arc::new(CastExpr::new(
Arc::new(Column::new("a", 0)),
DataType::Float32,
None,
)),
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 5)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Divide,
Arc::new(Column::new("c", 2)),
)),
Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 2)),
Operator::Divide,
Arc::new(Column::new("b", 1)),
)),
],
DataType::Int32,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
vec![
(
Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("d", 3)),
Operator::Plus,
Arc::new(Column::new("e", 4)),
)) as Arc<dyn PhysicalExpr>,
),
(
Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("e", 4)),
Operator::Plus,
Arc::new(Column::new("d", 3)),
)) as Arc<dyn PhysicalExpr>,
),
],
Some(Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Modulo,
Arc::new(Column::new("e", 4)),
))),
)?),
];
for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) {
assert!(update_expr(&expr, &child, true)?
.unwrap()
.eq(&expected_expr));
}
Ok(())
}
#[test]
fn test_update_projected_exprs() -> Result<()> {
let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 3)),
Operator::Divide,
Arc::new(Column::new("e", 5)),
)),
Arc::new(CastExpr::new(
Arc::new(Column::new("a", 3)),
DataType::Float32,
None,
)),
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Divide,
Arc::new(Column::new("c", 0)),
)),
Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 0)),
Operator::Divide,
Arc::new(Column::new("b", 1)),
)),
],
DataType::Int32,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
vec![
(
Arc::new(Column::new("a", 3)) as Arc<dyn PhysicalExpr>,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("d", 2)),
Operator::Plus,
Arc::new(Column::new("e", 5)),
)) as Arc<dyn PhysicalExpr>,
),
(
Arc::new(Column::new("a", 3)) as Arc<dyn PhysicalExpr>,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("e", 5)),
Operator::Plus,
Arc::new(Column::new("d", 2)),
)) as Arc<dyn PhysicalExpr>,
),
],
Some(Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 3)),
Operator::Modulo,
Arc::new(Column::new("e", 5)),
))),
)?),
];
let projected_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![
(Arc::new(Column::new("a", 3)), "a".to_owned()),
(Arc::new(Column::new("b", 1)), "b_new".to_owned()),
(Arc::new(Column::new("c", 0)), "c".to_owned()),
(Arc::new(Column::new("d", 2)), "d_new".to_owned()),
(Arc::new(Column::new("e", 5)), "e".to_owned()),
(Arc::new(Column::new("f", 4)), "f_new".to_owned()),
];
let expected_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Divide,
Arc::new(Column::new("e", 4)),
)),
Arc::new(CastExpr::new(
Arc::new(Column::new("a", 0)),
DataType::Float32,
None,
)),
Arc::new(NegativeExpr::new(Arc::new(Column::new("f_new", 5)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
Arc::new(ScalarUDF::new_from_impl(DummyUDF::new())),
vec![
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b_new", 1)),
Operator::Divide,
Arc::new(Column::new("c", 2)),
)),
Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 2)),
Operator::Divide,
Arc::new(Column::new("b_new", 1)),
)),
],
DataType::Int32,
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
vec![
(
Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("d_new", 3)),
Operator::Plus,
Arc::new(Column::new("e", 4)),
)) as Arc<dyn PhysicalExpr>,
),
(
Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("e", 4)),
Operator::Plus,
Arc::new(Column::new("d_new", 3)),
)) as Arc<dyn PhysicalExpr>,
),
],
Some(Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Modulo,
Arc::new(Column::new("e", 4)),
))),
)?),
];
for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) {
assert!(update_expr(&expr, &projected_exprs, false)?
.unwrap()
.eq(&expected_expr));
}
Ok(())
}
#[test]
fn test_join_table_borders() -> Result<()> {
let projections = vec![
(Column::new("b", 1), "b".to_owned()),
(Column::new("c", 2), "c".to_owned()),
(Column::new("e", 4), "e".to_owned()),
(Column::new("d", 3), "d".to_owned()),
(Column::new("c", 2), "c".to_owned()),
(Column::new("f", 5), "f".to_owned()),
(Column::new("h", 7), "h".to_owned()),
(Column::new("g", 6), "g".to_owned()),
];
let left_table_column_count = 5;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(4, 5)
);
let left_table_column_count = 8;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(7, 8)
);
let left_table_column_count = 1;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(-1, 0)
);
let projections = vec![
(Column::new("a", 0), "a".to_owned()),
(Column::new("b", 1), "b".to_owned()),
(Column::new("d", 3), "d".to_owned()),
(Column::new("g", 6), "g".to_owned()),
(Column::new("e", 4), "e".to_owned()),
(Column::new("f", 5), "f".to_owned()),
(Column::new("e", 4), "e".to_owned()),
(Column::new("h", 7), "h".to_owned()),
];
let left_table_column_count = 5;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(2, 7)
);
let left_table_column_count = 7;
assert_eq!(
join_table_borders(left_table_column_count, &projections),
(6, 7)
);
Ok(())
}
fn create_simple_csv_exec() -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
]));
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_projection(Some(vec![0, 1, 2, 3, 4])),
)
.with_has_header(false)
.with_delimeter(0)
.with_quote(0)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}
fn create_projecting_csv_exec() -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]));
Arc::new(
CsvExec::builder(
FileScanConfig::new(
ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone(),
)
.with_file(PartitionedFile::new("x".to_string(), 100))
.with_projection(Some(vec![3, 2, 1])),
)
.with_has_header(false)
.with_delimeter(0)
.with_quote(0)
.with_escape(None)
.with_comment(None)
.with_newlines_in_values(false)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build(),
)
}
fn create_projecting_memory_exec() -> Arc<dyn ExecutionPlan> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
]));
Arc::new(MemoryExec::try_new(&[], schema, Some(vec![2, 0, 3, 4])).unwrap())
}
#[test]
fn test_csv_after_projection() -> Result<()> {
let csv = create_projecting_csv_exec();
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("b", 2)), "b".to_string()),
(Arc::new(Column::new("d", 0)), "d".to_string()),
],
csv.clone(),
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[b@2 as b, d@0 as d]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[d, c, b], has_header=false",
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"CsvExec: file_groups={1 group: [[x]]}, projection=[b, d], has_header=false",
];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
#[test]
fn test_memory_after_projection() -> Result<()> {
let memory = create_projecting_memory_exec();
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("d", 2)), "d".to_string()),
(Arc::new(Column::new("e", 3)), "e".to_string()),
(Arc::new(Column::new("a", 1)), "a".to_string()),
],
memory.clone(),
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[d@2 as d, e@3 as e, a@1 as a]",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = ["MemoryExec: partitions=0, partition_sizes=[]"];
assert_eq!(get_plan_string(&after_optimize), expected);
assert_eq!(
after_optimize
.clone()
.as_any()
.downcast_ref::<MemoryExec>()
.unwrap()
.projection()
.clone()
.unwrap(),
vec![3, 4, 0]
);
Ok(())
}
#[test]
fn test_streaming_table_after_projection() -> Result<()> {
struct DummyStreamPartition {
schema: SchemaRef,
}
impl PartitionStream for DummyStreamPartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
unreachable!()
}
}
let streaming_table = StreamingTableExec::try_new(
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
])),
vec![Arc::new(DummyStreamPartition {
schema: Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
])),
}) as _],
Some(&vec![0_usize, 2, 4, 3]),
vec![
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("e", 2)),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options: SortOptions::default(),
},
],
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("d", 3)),
options: SortOptions::default(),
}],
]
.into_iter(),
true,
None,
)?;
let projection = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("d", 3)), "d".to_string()),
(Arc::new(Column::new("e", 2)), "e".to_string()),
(Arc::new(Column::new("a", 0)), "a".to_string()),
],
Arc::new(streaming_table) as _,
)?) as _;
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let result = after_optimize
.as_any()
.downcast_ref::<StreamingTableExec>()
.unwrap();
assert_eq!(
result.partition_schema(),
&Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
]))
);
assert_eq!(
result.projection().clone().unwrap().to_vec(),
vec![3_usize, 4, 0]
);
assert_eq!(
result.projected_schema(),
&Schema::new(vec![
Field::new("d", DataType::Int32, true),
Field::new("e", DataType::Int32, true),
Field::new("a", DataType::Int32, true),
])
);
assert_eq!(
result.projected_output_ordering().into_iter().collect_vec(),
vec![
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("e", 1)),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 2)),
options: SortOptions::default(),
},
],
vec![PhysicalSortExpr {
expr: Arc::new(Column::new("d", 0)),
options: SortOptions::default(),
}],
]
);
assert!(result.is_infinite());
Ok(())
}
#[test]
fn test_projection_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
let child_projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("c", 2)), "c".to_string()),
(Arc::new(Column::new("e", 4)), "new_e".to_string()),
(Arc::new(Column::new("a", 0)), "a".to_string()),
(Arc::new(Column::new("b", 1)), "new_b".to_string()),
],
csv.clone(),
)?);
let top_projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("new_b", 3)), "new_b".to_string()),
(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 0)),
Operator::Plus,
Arc::new(Column::new("new_e", 1)),
)),
"binary".to_string(),
),
(Arc::new(Column::new("new_b", 3)), "newest_b".to_string()),
],
child_projection.clone(),
)?);
let initial = get_plan_string(&top_projection);
let expected_initial = [
"ProjectionExec: expr=[new_b@3 as new_b, c@0 + new_e@1 as binary, new_b@3 as newest_b]",
" ProjectionExec: expr=[c@2 as c, e@4 as new_e, a@0 as a, b@1 as new_b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(top_projection, &ConfigOptions::new())?;
let expected = [
"ProjectionExec: expr=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
#[test]
fn test_output_req_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
let sort_req: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
csv.clone(),
Some(vec![
PhysicalSortRequirement {
expr: Arc::new(Column::new("b", 1)),
options: Some(SortOptions::default()),
},
PhysicalSortRequirement {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 2)),
Operator::Plus,
Arc::new(Column::new("a", 0)),
)),
options: Some(SortOptions::default()),
},
]),
Distribution::HashPartitioned(vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
]),
));
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("c", 2)), "c".to_string()),
(Arc::new(Column::new("a", 0)), "new_a".to_string()),
(Arc::new(Column::new("b", 1)), "b".to_string()),
],
sort_req.clone(),
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" OutputRequirementExec",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected: [&str; 3] = [
"OutputRequirementExec",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
let expected_reqs = vec![
PhysicalSortRequirement {
expr: Arc::new(Column::new("b", 2)),
options: Some(SortOptions::default()),
},
PhysicalSortRequirement {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 0)),
Operator::Plus,
Arc::new(Column::new("new_a", 1)),
)),
options: Some(SortOptions::default()),
},
];
assert_eq!(
after_optimize
.as_any()
.downcast_ref::<OutputRequirementExec>()
.unwrap()
.required_input_ordering()[0]
.clone()
.unwrap(),
expected_reqs
);
let expected_distribution: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("new_a", 1)),
Arc::new(Column::new("b", 2)),
];
if let Distribution::HashPartitioned(vec) = after_optimize
.as_any()
.downcast_ref::<OutputRequirementExec>()
.unwrap()
.required_input_distribution()[0]
.clone()
{
assert!(vec
.iter()
.zip(expected_distribution)
.all(|(actual, expected)| actual.eq(&expected)));
} else {
panic!("Expected HashPartitioned distribution!");
};
Ok(())
}
#[test]
fn test_coalesce_partitions_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
let coalesce_partitions: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(csv));
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("b", 1)), "b".to_string()),
(Arc::new(Column::new("a", 0)), "a_new".to_string()),
(Arc::new(Column::new("d", 3)), "d".to_string()),
],
coalesce_partitions,
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]",
" CoalescePartitionsExec",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"CoalescePartitionsExec",
" ProjectionExec: expr=[b@1 as b, a@0 as a_new, d@3 as d]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
#[test]
fn test_filter_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
let predicate = Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 1)),
Operator::Minus,
Arc::new(Column::new("a", 0)),
)),
Operator::Gt,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("d", 3)),
Operator::Minus,
Arc::new(Column::new("a", 0)),
)),
));
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, csv)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("a", 0)), "a_new".to_string()),
(Arc::new(Column::new("b", 1)), "b".to_string()),
(Arc::new(Column::new("d", 3)), "d".to_string()),
],
filter.clone(),
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]",
" FilterExec: b@1 - a@0 > d@3 - a@0",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"FilterExec: b@1 - a_new@0 > d@2 - a_new@0",
" ProjectionExec: expr=[a@0 as a_new, b@1 as b, d@3 as d]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
#[test]
fn test_join_after_projection() -> Result<()> {
let left_csv = create_simple_csv_exec();
let right_csv = create_simple_csv_exec();
let join: Arc<dyn ExecutionPlan> = Arc::new(SymmetricHashJoinExec::try_new(
left_csv,
right_csv,
vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))],
Some(JoinFilter::new(
Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b_left_inter", 0)),
Operator::Minus,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::Plus,
Arc::new(Column::new("a_right_inter", 1)),
)),
)),
Operator::LtEq,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a_right_inter", 1)),
Operator::Plus,
Arc::new(Column::new("c_left_inter", 2)),
)),
)),
vec![
ColumnIndex {
index: 1,
side: JoinSide::Left,
},
ColumnIndex {
index: 0,
side: JoinSide::Right,
},
ColumnIndex {
index: 2,
side: JoinSide::Left,
},
],
Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
]),
)),
&JoinType::Inner,
true,
None,
None,
StreamJoinPartitionMode::SinglePartition,
)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("c", 2)), "c_from_left".to_string()),
(Arc::new(Column::new("b", 1)), "b_from_left".to_string()),
(Arc::new(Column::new("a", 0)), "a_from_left".to_string()),
(Arc::new(Column::new("a", 5)), "a_from_right".to_string()),
(Arc::new(Column::new("c", 7)), "c_from_right".to_string()),
],
join,
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, a@5 as a_from_right, c@7 as c_from_right]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b_from_left@1, c_from_right@1)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[a@0 as a_from_right, c@2 as c_from_right]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
let expected_filter_col_ind = vec![
ColumnIndex {
index: 1,
side: JoinSide::Left,
},
ColumnIndex {
index: 0,
side: JoinSide::Right,
},
ColumnIndex {
index: 0,
side: JoinSide::Left,
},
];
assert_eq!(
expected_filter_col_ind,
after_optimize
.as_any()
.downcast_ref::<SymmetricHashJoinExec>()
.unwrap()
.filter()
.unwrap()
.column_indices()
);
Ok(())
}
#[test]
fn test_join_after_required_projection() -> Result<()> {
let left_csv = create_simple_csv_exec();
let right_csv = create_simple_csv_exec();
let join: Arc<dyn ExecutionPlan> = Arc::new(SymmetricHashJoinExec::try_new(
left_csv,
right_csv,
vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))],
Some(JoinFilter::new(
Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b_left_inter", 0)),
Operator::Minus,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::Plus,
Arc::new(Column::new("a_right_inter", 1)),
)),
)),
Operator::LtEq,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a_right_inter", 1)),
Operator::Plus,
Arc::new(Column::new("c_left_inter", 2)),
)),
)),
vec![
ColumnIndex {
index: 1,
side: JoinSide::Left,
},
ColumnIndex {
index: 0,
side: JoinSide::Right,
},
ColumnIndex {
index: 2,
side: JoinSide::Left,
},
],
Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
]),
)),
&JoinType::Inner,
true,
None,
None,
StreamJoinPartitionMode::SinglePartition,
)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("a", 5)), "a".to_string()),
(Arc::new(Column::new("b", 6)), "b".to_string()),
(Arc::new(Column::new("c", 7)), "c".to_string()),
(Arc::new(Column::new("d", 8)), "d".to_string()),
(Arc::new(Column::new("e", 9)), "e".to_string()),
(Arc::new(Column::new("a", 0)), "a".to_string()),
(Arc::new(Column::new("b", 1)), "b".to_string()),
(Arc::new(Column::new("c", 2)), "c".to_string()),
(Arc::new(Column::new("d", 3)), "d".to_string()),
(Arc::new(Column::new("e", 4)), "e".to_string()),
],
join,
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
#[test]
fn test_collect_column_indices() -> Result<()> {
let expr = Arc::new(BinaryExpr::new(
Arc::new(Column::new("b", 7)),
Operator::Minus,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::Plus,
Arc::new(Column::new("a", 1)),
)),
));
let column_indices = collect_column_indices(&[(expr, "b-(1+a)".to_string())]);
assert_eq!(column_indices, vec![1, 7]);
Ok(())
}
#[test]
fn test_hash_join_after_projection() -> Result<()> {
let left_csv = create_simple_csv_exec();
let right_csv = create_simple_csv_exec();
let join: Arc<dyn ExecutionPlan> = Arc::new(HashJoinExec::try_new(
left_csv,
right_csv,
vec![(Arc::new(Column::new("b", 1)), Arc::new(Column::new("c", 2)))],
Some(JoinFilter::new(
Arc::new(BinaryExpr::new(
Arc::new(BinaryExpr::new(
Arc::new(Column::new("b_left_inter", 0)),
Operator::Minus,
Arc::new(BinaryExpr::new(
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
Operator::Plus,
Arc::new(Column::new("a_right_inter", 1)),
)),
)),
Operator::LtEq,
Arc::new(BinaryExpr::new(
Arc::new(Column::new("a_right_inter", 1)),
Operator::Plus,
Arc::new(Column::new("c_left_inter", 2)),
)),
)),
vec![
ColumnIndex {
index: 1,
side: JoinSide::Left,
},
ColumnIndex {
index: 0,
side: JoinSide::Right,
},
ColumnIndex {
index: 2,
side: JoinSide::Left,
},
],
Schema::new(vec![
Field::new("b_left_inter", DataType::Int32, true),
Field::new("a_right_inter", DataType::Int32, true),
Field::new("c_left_inter", DataType::Int32, true),
]),
)),
&JoinType::Inner,
None,
PartitionMode::Auto,
true,
)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("c", 2)), "c_from_left".to_string()),
(Arc::new(Column::new("b", 1)), "b_from_left".to_string()),
(Arc::new(Column::new("a", 0)), "a_from_left".to_string()),
(Arc::new(Column::new("c", 7)), "c_from_right".to_string()),
],
join.clone(),
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@7 as c_from_right]", " HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = ["ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, c@3 as c_from_right]", " HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7]", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"];
assert_eq!(get_plan_string(&after_optimize), expected);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("a", 0)), "a".to_string()),
(Arc::new(Column::new("b", 1)), "b".to_string()),
(Arc::new(Column::new("c", 2)), "c".to_string()),
(Arc::new(Column::new("c", 7)), "c".to_string()),
],
join.clone(),
)?);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = ["HashJoinExec: mode=Auto, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2, projection=[a@0, b@1, c@2, c@7]", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false", " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
#[test]
fn test_repartition_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
let repartition: Arc<dyn ExecutionPlan> = Arc::new(RepartitionExec::try_new(
csv,
Partitioning::Hash(
vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("d", 3)),
],
6,
),
)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("b", 1)), "b_new".to_string()),
(Arc::new(Column::new("a", 0)), "a".to_string()),
(Arc::new(Column::new("d", 3)), "d_new".to_string()),
],
repartition,
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]",
" RepartitionExec: partitioning=Hash([a@0, b@1, d@3], 6), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"RepartitionExec: partitioning=Hash([a@1, b_new@0, d_new@2], 6), input_partitions=1",
" ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_eq!(get_plan_string(&after_optimize), expected);
assert_eq!(
after_optimize
.as_any()
.downcast_ref::<RepartitionExec>()
.unwrap()
.partitioning()
.clone(),
Partitioning::Hash(
vec![
Arc::new(Column::new("a", 1)),
Arc::new(Column::new("b_new", 0)),
Arc::new(Column::new("d_new", 2)),
],
6,
),
);
Ok(())
}
#[test]
fn test_sort_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
let sort_req: Arc<dyn ExecutionPlan> = Arc::new(SortExec::new(
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 2)),
Operator::Plus,
Arc::new(Column::new("a", 0)),
)),
options: SortOptions::default(),
},
],
csv.clone(),
));
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("c", 2)), "c".to_string()),
(Arc::new(Column::new("a", 0)), "new_a".to_string()),
(Arc::new(Column::new("b", 1)), "b".to_string()),
],
sort_req.clone(),
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" SortExec: expr=[b@1 ASC,c@2 + a@0 ASC], preserve_partitioning=[false]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"SortExec: expr=[b@2 ASC,c@0 + new_a@1 ASC], preserve_partitioning=[false]",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
#[test]
fn test_sort_preserving_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
let sort_req: Arc<dyn ExecutionPlan> = Arc::new(SortPreservingMergeExec::new(
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("b", 1)),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::new(BinaryExpr::new(
Arc::new(Column::new("c", 2)),
Operator::Plus,
Arc::new(Column::new("a", 0)),
)),
options: SortOptions::default(),
},
],
csv.clone(),
));
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("c", 2)), "c".to_string()),
(Arc::new(Column::new("a", 0)), "new_a".to_string()),
(Arc::new(Column::new("b", 1)), "b".to_string()),
],
sort_req.clone(),
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" SortPreservingMergeExec: [b@1 ASC,c@2 + a@0 ASC]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"SortPreservingMergeExec: [b@2 ASC,c@0 + new_a@1 ASC]",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
#[test]
fn test_union_after_projection() -> Result<()> {
let csv = create_simple_csv_exec();
let union: Arc<dyn ExecutionPlan> =
Arc::new(UnionExec::new(vec![csv.clone(), csv.clone(), csv]));
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
vec![
(Arc::new(Column::new("c", 2)), "c".to_string()),
(Arc::new(Column::new("a", 0)), "new_a".to_string()),
(Arc::new(Column::new("b", 1)), "b".to_string()),
],
union.clone(),
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" UnionExec",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
let after_optimize =
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
let expected = [
"UnionExec",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Ok(())
}
}