use indexmap::{IndexMap, IndexSet};
use std::collections::HashMap;
use std::sync::Arc;
use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{Column, DFSchema, Result, qualified_name};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{Expr, ExpressionPlacement, Projection};
use crate::optimizer::ApplyOrder;
use crate::push_down_filter::replace_cols_by_name;
use crate::utils::has_all_column_refs;
use crate::{OptimizerConfig, OptimizerRule};
const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted";
fn has_extractable_expr(exprs: &[Expr]) -> bool {
exprs.iter().any(|expr| {
expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes))
.unwrap_or(false)
})
}
#[derive(Default, Debug)]
pub struct ExtractLeafExpressions {}
impl ExtractLeafExpressions {
pub fn new() -> Self {
Self {}
}
}
impl OptimizerRule for ExtractLeafExpressions {
fn name(&self) -> &str {
"extract_leaf_expressions"
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if !config.options().optimizer.enable_leaf_expression_pushdown {
return Ok(Transformed::no(plan));
}
let alias_generator = config.alias_generator();
advance_generator_past_existing(&plan, alias_generator)?;
plan.transform_down_with_subqueries(|plan| {
extract_from_plan(plan, alias_generator)
})
}
}
fn advance_generator_past_existing(
plan: &LogicalPlan,
alias_generator: &AliasGenerator,
) -> Result<()> {
plan.apply(|plan| {
plan.expressions().iter().try_for_each(|expr| {
expr.apply(|e| {
if let Expr::Alias(alias) = e
&& let Some(id) = alias
.name
.strip_prefix(EXTRACTED_EXPR_PREFIX)
.and_then(|s| s.strip_prefix('_'))
.and_then(|s| s.parse().ok())
{
alias_generator.update_min_id(id);
}
Ok(TreeNodeRecursion::Continue)
})?;
Ok::<(), datafusion_common::error::DataFusionError>(())
})?;
Ok(TreeNodeRecursion::Continue)
})
.map(|_| ())
}
fn extract_from_plan(
plan: LogicalPlan,
alias_generator: &Arc<AliasGenerator>,
) -> Result<Transformed<LogicalPlan>> {
if !matches!(
&plan,
LogicalPlan::Aggregate(_)
| LogicalPlan::Filter(_)
| LogicalPlan::Sort(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Join(_)
) {
return Ok(Transformed::no(plan));
}
let inputs = plan.inputs();
if inputs.is_empty() {
return Ok(Transformed::no(plan));
}
if !has_extractable_expr(&plan.expressions()) {
return Ok(Transformed::no(plan));
}
let original_schema = Arc::clone(plan.schema());
let input_schemas: Vec<Arc<DFSchema>> =
inputs.iter().map(|i| Arc::clone(i.schema())).collect();
let mut extractors: Vec<LeafExpressionExtractor> = input_schemas
.iter()
.map(|schema| LeafExpressionExtractor::new(schema.as_ref(), alias_generator))
.collect();
let input_column_sets: Vec<std::collections::HashSet<Column>> = input_schemas
.iter()
.map(|schema| schema_columns(schema.as_ref()))
.collect();
let transformed = plan.map_expressions(|expr| {
routing_extract(expr, &mut extractors, &input_column_sets)
})?;
if !transformed.transformed {
return Ok(transformed);
}
let owned_inputs: Vec<Arc<LogicalPlan>> = transformed
.data
.inputs()
.into_iter()
.map(|i| Arc::new(i.clone()))
.collect();
let new_inputs: Vec<LogicalPlan> = owned_inputs
.into_iter()
.zip(extractors.iter())
.map(|(input_arc, extractor)| {
match extractor.build_extraction_projection(&input_arc)? {
Some(plan) => Ok(plan),
None => {
Ok(Arc::try_unwrap(input_arc).unwrap_or_else(|arc| (*arc).clone()))
}
}
})
.collect::<Result<Vec<_>>>()?;
let new_plan = transformed
.data
.with_new_exprs(transformed.data.expressions(), new_inputs)?;
let recovered = build_recovery_projection(original_schema.as_ref(), new_plan)?;
Ok(Transformed::yes(recovered))
}
fn find_owning_input(
expr: &Expr,
input_column_sets: &[std::collections::HashSet<Column>],
) -> Option<usize> {
let mut found = None;
for (idx, cols) in input_column_sets.iter().enumerate() {
if has_all_column_refs(expr, cols) {
if found.is_some() {
return None;
}
found = Some(idx);
}
}
found
}
fn routing_extract(
expr: Expr,
extractors: &mut [LeafExpressionExtractor],
input_column_sets: &[std::collections::HashSet<Column>],
) -> Result<Transformed<Expr>> {
expr.transform_down(|e| {
if let Expr::Alias(alias) = &e
&& alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
{
return Ok(Transformed {
data: e,
transformed: false,
tnr: TreeNodeRecursion::Jump,
});
}
if matches!(&e, Expr::Alias(_)) {
return Ok(Transformed::no(e));
}
match e.placement() {
ExpressionPlacement::MoveTowardsLeafNodes => {
if let Some(idx) = find_owning_input(&e, input_column_sets) {
let col_ref = extractors[idx].add_extracted(e)?;
Ok(Transformed::yes(col_ref))
} else {
Ok(Transformed::no(e))
}
}
ExpressionPlacement::Column => {
if let Expr::Column(col) = &e
&& let Some(idx) = find_owning_input(&e, input_column_sets)
{
extractors[idx].columns_needed.insert(col.clone());
}
Ok(Transformed::no(e))
}
_ => Ok(Transformed::no(e)),
}
})
}
fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> {
schema
.iter()
.flat_map(|(qualifier, field)| {
[
Column::new(qualifier.cloned(), field.name()),
Column::new_unqualified(field.name()),
]
})
.collect()
}
fn remap_pairs_and_columns(
pairs: &[(Expr, String)],
columns: &IndexSet<Column>,
from_schema: &DFSchema,
to_schema: &DFSchema,
) -> Result<ExtractionTarget> {
let mut replace_map = HashMap::new();
for ((from_q, from_f), (to_q, to_f)) in from_schema.iter().zip(to_schema.iter()) {
replace_map.insert(
qualified_name(from_q, from_f.name()),
Expr::Column(Column::new(to_q.cloned(), to_f.name())),
);
}
let remapped_pairs: Vec<(Expr, String)> = pairs
.iter()
.map(|(expr, alias)| {
Ok((
replace_cols_by_name(expr.clone(), &replace_map)?,
alias.clone(),
))
})
.collect::<Result<_>>()?;
let remapped_columns: IndexSet<Column> = columns
.iter()
.filter_map(|col| {
let rewritten =
replace_cols_by_name(Expr::Column(col.clone()), &replace_map).ok()?;
if let Expr::Column(c) = rewritten {
Some(c)
} else {
Some(col.clone())
}
})
.collect();
Ok(ExtractionTarget {
pairs: remapped_pairs,
columns: remapped_columns,
})
}
struct ExtractionTarget {
pairs: Vec<(Expr, String)>,
columns: IndexSet<Column>,
}
fn build_projection_replace_map(projection: &Projection) -> HashMap<String, Expr> {
projection
.schema
.iter()
.zip(projection.expr.iter())
.map(|((qualifier, field), expr)| {
let key = Column::from((qualifier, field)).flat_name();
(key, expr.clone().unalias())
})
.collect()
}
fn build_recovery_projection(
original_schema: &DFSchema,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let new_schema = input.schema();
let orig_len = original_schema.fields().len();
let new_len = new_schema.fields().len();
if orig_len == new_len {
let schemas_match = original_schema.iter().zip(new_schema.iter()).all(
|((orig_q, orig_f), (new_q, new_f))| {
orig_f.name() == new_f.name() && orig_q == new_q
},
);
if schemas_match {
return Ok(input);
}
debug_assert!(
orig_len == new_len,
"build_recovery_projection: positional mapping requires same field count, \
got original={orig_len} vs new={new_len}"
);
let mut proj_exprs = Vec::with_capacity(orig_len);
for (i, (orig_qualifier, orig_field)) in original_schema.iter().enumerate() {
let (new_qualifier, new_field) = new_schema.qualified_field(i);
if orig_field.name() == new_field.name() && orig_qualifier == new_qualifier {
proj_exprs.push(Expr::from((orig_qualifier, orig_field)));
} else {
let new_col = Expr::Column(Column::from((new_qualifier, new_field)));
proj_exprs.push(
new_col.alias_qualified(orig_qualifier.cloned(), orig_field.name()),
);
}
}
let projection = Projection::try_new(proj_exprs, Arc::new(input))?;
Ok(LogicalPlan::Projection(projection))
} else {
let col_exprs: Vec<Expr> = original_schema.iter().map(Expr::from).collect();
let projection = Projection::try_new(col_exprs, Arc::new(input))?;
Ok(LogicalPlan::Projection(projection))
}
}
struct LeafExpressionExtractor<'a> {
extracted: IndexMap<Expr, String>,
columns_needed: IndexSet<Column>,
input_schema: &'a DFSchema,
alias_generator: &'a Arc<AliasGenerator>,
}
impl<'a> LeafExpressionExtractor<'a> {
fn new(input_schema: &'a DFSchema, alias_generator: &'a Arc<AliasGenerator>) -> Self {
Self {
extracted: IndexMap::new(),
columns_needed: IndexSet::new(),
input_schema,
alias_generator,
}
}
fn add_extracted(&mut self, expr: Expr) -> Result<Expr> {
if let Some(alias) = self.extracted.get(&expr) {
return Ok(Expr::Column(Column::new_unqualified(alias)));
}
for col in expr.column_refs() {
self.columns_needed.insert(col.clone());
}
let alias = self.alias_generator.next(EXTRACTED_EXPR_PREFIX);
self.extracted.insert(expr, alias.clone());
Ok(Expr::Column(Column::new_unqualified(&alias)))
}
fn build_extraction_projection(
&self,
input: &Arc<LogicalPlan>,
) -> Result<Option<LogicalPlan>> {
if self.extracted.is_empty() {
return Ok(None);
}
let pairs: Vec<(Expr, String)> = self
.extracted
.iter()
.map(|(e, a)| (e.clone(), a.clone()))
.collect();
let proj = build_extraction_projection_impl(
&pairs,
&self.columns_needed,
input,
self.input_schema,
)?;
Ok(Some(LogicalPlan::Projection(proj)))
}
}
fn build_extraction_projection_impl(
extracted_exprs: &[(Expr, String)],
columns_needed: &IndexSet<Column>,
target: &Arc<LogicalPlan>,
target_schema: &DFSchema,
) -> Result<Projection> {
if let LogicalPlan::Projection(existing) = target.as_ref() {
let mut proj_exprs = existing.expr.clone();
let existing_extractions: IndexMap<Expr, String> = existing
.expr
.iter()
.filter_map(|e| {
if let Expr::Alias(alias) = e
&& alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
{
return Some((*alias.expr.clone(), alias.name.clone()));
}
None
})
.collect();
let replace_map = build_projection_replace_map(existing);
for (expr, alias) in extracted_exprs {
let resolved = replace_cols_by_name(expr.clone().alias(alias), &replace_map)?;
let resolved_inner = if let Expr::Alias(a) = &resolved {
a.expr.as_ref()
} else {
&resolved
};
if let Some(existing_alias) = existing_extractions.get(resolved_inner) {
if existing_alias != alias {
proj_exprs.push(resolved);
}
} else {
proj_exprs.push(resolved);
}
}
let existing_cols: IndexSet<Column> = existing
.expr
.iter()
.filter_map(|e| {
if let Expr::Column(c) = e {
Some(c.clone())
} else {
None
}
})
.collect();
let input_schema = existing.input.schema();
for col in columns_needed {
let col_expr = Expr::Column(col.clone());
let resolved = replace_cols_by_name(col_expr, &replace_map)?;
if let Expr::Column(resolved_col) = &resolved
&& !existing_cols.contains(resolved_col)
&& input_schema.has_column(resolved_col)
{
proj_exprs.push(Expr::Column(resolved_col.clone()));
}
}
Projection::try_new(proj_exprs, Arc::clone(&existing.input))
} else {
let mut proj_exprs = Vec::new();
for (expr, alias) in extracted_exprs {
proj_exprs.push(expr.clone().alias(alias));
}
for (qualifier, field) in target_schema.iter() {
proj_exprs.push(Expr::from((qualifier, field)));
}
Projection::try_new(proj_exprs, Arc::clone(target))
}
}
#[derive(Default, Debug)]
pub struct PushDownLeafProjections {}
impl PushDownLeafProjections {
pub fn new() -> Self {
Self {}
}
}
impl OptimizerRule for PushDownLeafProjections {
fn name(&self) -> &str {
"push_down_leaf_projections"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if !config.options().optimizer.enable_leaf_expression_pushdown {
return Ok(Transformed::no(plan));
}
let alias_generator = config.alias_generator();
match try_push_input(&plan, alias_generator)? {
Some(new_plan) => Ok(Transformed::yes(new_plan)),
None => Ok(Transformed::no(plan)),
}
}
}
fn try_push_input(
input: &LogicalPlan,
alias_generator: &Arc<AliasGenerator>,
) -> Result<Option<LogicalPlan>> {
let LogicalPlan::Projection(proj) = input else {
return Ok(None);
};
split_and_push_projection(proj, alias_generator)
}
fn split_and_push_projection(
proj: &Projection,
alias_generator: &Arc<AliasGenerator>,
) -> Result<Option<LogicalPlan>> {
let has_existing_extracted = proj.expr.iter().any(|e| {
matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX))
});
if !has_existing_extracted && !has_extractable_expr(&proj.expr) {
return Ok(None);
}
let input = &proj.input;
let input_schema = input.schema();
let mut extractors = vec![LeafExpressionExtractor::new(
input_schema.as_ref(),
alias_generator,
)];
let input_column_sets = vec![schema_columns(input_schema.as_ref())];
let original_schema = proj.schema.as_ref();
let mut recovery_exprs: Vec<Expr> = Vec::with_capacity(proj.expr.len());
let mut needs_recovery = false;
let mut has_new_extractions = false;
let mut proj_exprs_captured: usize = 0;
let mut standalone_columns: IndexSet<Column> = IndexSet::new();
for (expr, (qualifier, field)) in proj.expr.iter().zip(original_schema.iter()) {
if let Expr::Alias(alias) = expr
&& alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
{
let alias_name = alias.name.clone();
for col_ref in alias.expr.column_refs() {
extractors[0].columns_needed.insert(col_ref.clone());
}
extractors[0]
.extracted
.insert(expr.clone(), alias_name.clone());
recovery_exprs.push(Expr::Column(Column::new_unqualified(&alias_name)));
proj_exprs_captured += 1;
} else if let Expr::Column(col) = expr {
extractors[0].columns_needed.insert(col.clone());
standalone_columns.insert(col.clone());
recovery_exprs.push(expr.clone());
proj_exprs_captured += 1;
} else {
let transformed =
routing_extract(expr.clone(), &mut extractors, &input_column_sets)?;
if transformed.transformed {
has_new_extractions = true;
}
let transformed_expr = transformed.data;
let original_name = field.name();
let needs_alias = if let Expr::Column(col) = &transformed_expr {
col.name.as_str() != original_name
} else {
let expr_name = transformed_expr.schema_name().to_string();
original_name != &expr_name
};
let recovery_expr = if needs_alias {
needs_recovery = true;
transformed_expr
.clone()
.alias_qualified(qualifier.cloned(), original_name)
} else {
transformed_expr.clone()
};
if transformed.transformed || !matches!(expr, Expr::Column(_)) {
needs_recovery = true;
}
recovery_exprs.push(recovery_expr);
}
}
let extractor = &extractors[0];
let extraction_pairs: Vec<(Expr, String)> = extractor
.extracted
.iter()
.map(|(e, a)| match e {
Expr::Alias(alias) => (*alias.expr.clone(), a.clone()),
_ => (e.clone(), a.clone()),
})
.collect();
let columns_needed = &extractor.columns_needed;
if extraction_pairs.is_empty() {
return Ok(None);
}
if columns_needed
.iter()
.any(|c| !standalone_columns.contains(c))
{
needs_recovery = true;
}
let proj_input = Arc::clone(&proj.input);
let pushed = push_extraction_pairs(
&extraction_pairs,
columns_needed,
proj,
&proj_input,
alias_generator,
proj_exprs_captured,
)?;
let base_plan = match pushed {
Some(plan) => plan,
None => {
if !has_new_extractions {
return Ok(None);
}
let input_arc = Arc::clone(input);
let extraction = build_extraction_projection_impl(
&extraction_pairs,
columns_needed,
&input_arc,
input_schema.as_ref(),
)?;
LogicalPlan::Projection(extraction)
}
};
if needs_recovery {
let recovery = LogicalPlan::Projection(Projection::try_new(
recovery_exprs,
Arc::new(base_plan),
)?);
Ok(Some(recovery))
} else {
Ok(Some(base_plan))
}
}
fn is_pure_extraction_projection(plan: &LogicalPlan) -> bool {
let LogicalPlan::Projection(proj) = plan else {
return false;
};
let mut has_extraction = false;
for expr in &proj.expr {
match expr {
Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX) => {
has_extraction = true;
}
Expr::Column(_) => {}
_ => return false,
}
}
has_extraction
}
fn push_extraction_pairs(
pairs: &[(Expr, String)],
columns_needed: &IndexSet<Column>,
proj: &Projection,
proj_input: &Arc<LogicalPlan>,
alias_generator: &Arc<AliasGenerator>,
proj_exprs_captured: usize,
) -> Result<Option<LogicalPlan>> {
match proj_input.as_ref() {
LogicalPlan::Projection(_) if proj_exprs_captured == proj.expr.len() => {
let target_schema = Arc::clone(proj_input.schema());
let merged = build_extraction_projection_impl(
pairs,
columns_needed,
proj_input,
target_schema.as_ref(),
)?;
let merged_plan = LogicalPlan::Projection(merged);
if is_pure_extraction_projection(&merged_plan)
&& let Some(pushed) = try_push_input(&merged_plan, alias_generator)?
{
return Ok(Some(pushed));
}
Ok(Some(merged_plan))
}
_ => try_push_into_inputs(
pairs,
columns_needed,
proj_input.as_ref(),
alias_generator,
),
}
}
fn route_to_inputs(
pairs: &[(Expr, String)],
columns: &IndexSet<Column>,
node: &LogicalPlan,
input_column_sets: &[std::collections::HashSet<Column>],
input_schemas: &[Arc<DFSchema>],
) -> Result<Option<Vec<ExtractionTarget>>> {
let num_inputs = input_schemas.len();
let mut per_input: Vec<ExtractionTarget> = (0..num_inputs)
.map(|_| ExtractionTarget {
pairs: vec![],
columns: IndexSet::new(),
})
.collect();
if matches!(node, LogicalPlan::Union(_)) {
let union_schema = node.schema();
for (idx, input_schema) in input_schemas.iter().enumerate() {
per_input[idx] =
remap_pairs_and_columns(pairs, columns, union_schema, input_schema)?;
}
} else {
for (expr, alias) in pairs {
match find_owning_input(expr, input_column_sets) {
Some(idx) => per_input[idx].pairs.push((expr.clone(), alias.clone())),
None => return Ok(None), }
}
for col in columns {
let col_expr = Expr::Column(col.clone());
match find_owning_input(&col_expr, input_column_sets) {
Some(idx) => {
per_input[idx].columns.insert(col.clone());
}
None => return Ok(None), }
}
}
if per_input.iter().all(|t| t.pairs.is_empty()) {
return Ok(None);
}
Ok(Some(per_input))
}
fn try_push_into_inputs(
pairs: &[(Expr, String)],
columns_needed: &IndexSet<Column>,
node: &LogicalPlan,
alias_generator: &Arc<AliasGenerator>,
) -> Result<Option<LogicalPlan>> {
let inputs = node.inputs();
if inputs.is_empty() {
return Ok(None);
}
let remapped = if let LogicalPlan::SubqueryAlias(sa) = node {
remap_pairs_and_columns(pairs, columns_needed, &sa.schema, sa.input.schema())?
} else {
ExtractionTarget {
pairs: pairs.to_vec(),
columns: columns_needed.clone(),
}
};
let pairs = &remapped.pairs[..];
let columns_needed = &remapped.columns;
let input_schemas: Vec<Arc<DFSchema>> =
inputs.iter().map(|i| Arc::clone(i.schema())).collect();
let input_column_sets: Vec<std::collections::HashSet<Column>> =
input_schemas.iter().map(|s| schema_columns(s)).collect();
let per_input = match route_to_inputs(
pairs,
columns_needed,
node,
&input_column_sets,
&input_schemas,
)? {
Some(routed) => routed,
None => return Ok(None),
};
let num_inputs = inputs.len();
let mut new_inputs: Vec<LogicalPlan> = Vec::with_capacity(num_inputs);
for (idx, input) in inputs.into_iter().enumerate() {
if per_input[idx].pairs.is_empty() {
new_inputs.push(input.clone());
} else {
let input_arc = Arc::new(input.clone());
let target_schema = Arc::clone(input.schema());
let proj = build_extraction_projection_impl(
&per_input[idx].pairs,
&per_input[idx].columns,
&input_arc,
target_schema.as_ref(),
)?;
let proj_schema = proj.schema.as_ref();
for (_expr, alias) in &per_input[idx].pairs {
if !proj_schema.fields().iter().any(|f| f.name() == alias) {
return Ok(None);
}
}
let proj_plan = LogicalPlan::Projection(proj);
match try_push_input(&proj_plan, alias_generator)? {
Some(pushed) => new_inputs.push(pushed),
None => new_inputs.push(proj_plan),
}
}
}
let new_node = node.with_new_exprs(node.expressions(), new_inputs)?;
let output_schema = new_node.schema();
for (_expr, alias) in pairs {
if !output_schema.fields().iter().any(|f| f.name() == alias) {
return Ok(None);
}
}
Ok(Some(new_node))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::optimize_projections::OptimizeProjections;
use crate::test::udfs::PlacementTestUDF;
use crate::test::*;
use crate::{Optimizer, OptimizerContext};
use datafusion_common::Result;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, ExpressionPlacement};
use datafusion_expr::{
ScalarUDF, col, lit, logical_plan::builder::LogicalPlanBuilder,
};
fn leaf_udf(expr: Expr, name: &str) -> Expr {
Expr::ScalarFunction(ScalarFunction::new_udf(
Arc::new(ScalarUDF::new_from_impl(
PlacementTestUDF::new()
.with_placement(ExpressionPlacement::MoveTowardsLeafNodes),
)),
vec![expr, lit(name)],
))
}
fn format_optimization_stages(plan: &LogicalPlan) -> Result<String> {
let run = |rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>| -> Result<String> {
let ctx = OptimizerContext::new().with_max_passes(1);
let optimizer = Optimizer::with_rules(rules);
let optimized = optimizer.optimize(plan.clone(), &ctx, |_, _| {})?;
Ok(format!("{optimized}"))
};
let original = run(vec![Arc::new(OptimizeProjections::new())])?;
let after_extract = run(vec![
Arc::new(OptimizeProjections::new()),
Arc::new(ExtractLeafExpressions::new()),
])?;
let after_pushdown = run(vec![
Arc::new(OptimizeProjections::new()),
Arc::new(ExtractLeafExpressions::new()),
Arc::new(PushDownLeafProjections::new()),
])?;
let optimized = run(vec![
Arc::new(OptimizeProjections::new()),
Arc::new(ExtractLeafExpressions::new()),
Arc::new(PushDownLeafProjections::new()),
Arc::new(OptimizeProjections::new()),
])?;
let mut out = format!("## Original Plan\n{original}");
out.push_str("\n\n## After Extraction\n");
if after_extract == original {
out.push_str("(same as original)");
} else {
out.push_str(&after_extract);
}
out.push_str("\n\n## After Pushdown\n");
if after_pushdown == after_extract {
out.push_str("(same as after extraction)");
} else {
out.push_str(&after_pushdown);
}
out.push_str("\n\n## Optimized\n");
if optimized == after_pushdown {
out.push_str("(same as after pushdown)");
} else {
out.push_str(&optimized);
}
Ok(out)
}
macro_rules! assert_stages {
($plan:expr, @ $expected:literal $(,)?) => {{
let result = format_optimization_stages(&$plan)?;
insta::assert_snapshot!(result, @ $expected);
Ok::<(), datafusion_common::DataFusionError>(())
}};
}
#[test]
fn test_extract_from_filter() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan.clone())
.filter(leaf_udf(col("user"), "status").eq(lit("active")))?
.select(vec![
table_scan
.schema()
.index_of_column_by_name(None, "id")
.unwrap(),
])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: test.id
Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
TableScan: test projection=[id, user]
## After Extraction
Projection: test.id
Projection: test.id, test.user
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
## After Pushdown
(same as after extraction)
## Optimized
Projection: test.id
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
TableScan: test projection=[id, user]
"#)
}
#[test]
fn test_no_extraction_for_column() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(col("a").eq(lit(1)))?
.build()?;
assert_stages!(plan, @"
## Original Plan
Filter: test.a = Int32(1)
TableScan: test projection=[a, b, c]
## After Extraction
(same as original)
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
")
}
#[test]
fn test_extract_from_projection() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![leaf_udf(col("user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name"))
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: leaf_udf(test.user, Utf8("name"))
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_from_projection_with_subexpression() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![
leaf_udf(col("user"), "name")
.is_not_null()
.alias("has_name"),
])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 IS NOT NULL AS has_name
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
TableScan: test projection=[user]
"#)
}
#[test]
fn test_projection_no_extraction_for_column() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a"), col("b")])?
.build()?;
assert_stages!(plan, @"
## Original Plan
TableScan: test projection=[a, b]
## After Extraction
(same as original)
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
")
}
#[test]
fn test_filter_with_deduplication() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let field_access = leaf_udf(col("user"), "name");
let plan = LogicalPlanBuilder::from(table_scan)
.filter(
field_access
.clone()
.is_not_null()
.and(field_access.is_null()),
)?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL AND leaf_udf(test.user, Utf8("name")) IS NULL
TableScan: test projection=[id, user]
## After Extraction
Projection: test.id, test.user
Filter: __datafusion_extracted_1 IS NOT NULL AND __datafusion_extracted_1 IS NULL
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_already_leaf_expression_in_filter() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(leaf_udf(col("user"), "name").eq(lit("test")))?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Filter: leaf_udf(test.user, Utf8("name")) = Utf8("test")
TableScan: test projection=[id, user]
## After Extraction
Projection: test.id, test.user
Filter: __datafusion_extracted_1 = Utf8("test")
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_extract_from_aggregate_group_by() -> Result<()> {
use datafusion_expr::test::function_stub::count;
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![leaf_udf(col("user"), "status")], vec![count(lit(1))])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Aggregate: groupBy=[[leaf_udf(test.user, Utf8("status"))]], aggr=[[COUNT(Int32(1))]]
TableScan: test projection=[user]
## After Extraction
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## After Pushdown
(same as after extraction)
## Optimized
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_from_aggregate_args() -> Result<()> {
use datafusion_expr::test::function_stub::count;
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(
vec![col("user")],
vec![count(leaf_udf(col("user"), "value"))],
)?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value")))]]
TableScan: test projection=[user]
## After Extraction
Projection: test.user, COUNT(__datafusion_extracted_1) AS COUNT(leaf_udf(test.user,Utf8("value")))
Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1)]]
Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_projection_with_filter_combined() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(leaf_udf(col("user"), "status").eq(lit("active")))?
.project(vec![leaf_udf(col("user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name"))
Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
TableScan: test projection=[user]
## After Extraction
Projection: leaf_udf(test.user, Utf8("name"))
Projection: test.user
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## After Pushdown
Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
TableScan: test projection=[user]
## Optimized
Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
TableScan: test projection=[user]
"#)
}
#[test]
fn test_projection_preserves_alias() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![leaf_udf(col("user"), "name").alias("username")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name")) AS username
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS username
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: leaf_udf(test.user, Utf8("name")) AS username
TableScan: test projection=[user]
"#)
}
#[test]
fn test_projection_different_field_from_filter() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(leaf_udf(col("user"), "value").gt(lit(150)))?
.project(vec![col("user"), leaf_udf(col("user"), "label")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: test.user, leaf_udf(test.user, Utf8("label"))
Filter: leaf_udf(test.user, Utf8("value")) > Int32(150)
TableScan: test projection=[user]
## After Extraction
Projection: test.user, leaf_udf(test.user, Utf8("label"))
Projection: test.user
Filter: __datafusion_extracted_1 > Int32(150)
Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## After Pushdown
Projection: test.user, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("label"))
Filter: __datafusion_extracted_1 > Int32(150)
Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("label")) AS __datafusion_extracted_2
TableScan: test projection=[user]
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_projection_deduplication() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let field = leaf_udf(col("user"), "name");
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![field.clone(), field.clone().alias("name2")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_1 AS name2
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_through_sort() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort(vec![col("user").sort(true, true)])?
.project(vec![leaf_udf(col("user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name"))
Sort: test.user ASC NULLS FIRST
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
Sort: test.user ASC NULLS FIRST
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_extract_through_limit() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.limit(0, Some(10))?
.project(vec![leaf_udf(col("user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name"))
Limit: skip=0, fetch=10
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
Limit: skip=0, fetch=10
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
Limit: skip=0, fetch=10
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_from_aliased_aggregate() -> Result<()> {
use datafusion_expr::test::function_stub::count;
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(
vec![col("user")],
vec![count(leaf_udf(col("user"), "value")).alias("cnt")],
)?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value"))) AS cnt]]
TableScan: test projection=[user]
## After Extraction
Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1) AS cnt]]
Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_aggregate_no_extraction() -> Result<()> {
use datafusion_expr::test::function_stub::count;
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![count(col("b"))])?
.build()?;
assert_stages!(plan, @"
## Original Plan
Aggregate: groupBy=[[test.a]], aggr=[[COUNT(test.b)]]
TableScan: test projection=[a, b]
## After Extraction
(same as original)
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
")
}
#[test]
fn test_skip_extracted_projection() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![
leaf_udf(col("user"), "name").alias("__datafusion_extracted_manual"),
col("user"),
])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_manual, test.user
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_merge_into_existing_extracted_projection() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(leaf_udf(col("user"), "status").eq(lit("active")))?
.filter(leaf_udf(col("user"), "name").is_not_null())?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL
Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
TableScan: test projection=[id, user]
## After Extraction
Projection: test.id, test.user
Filter: __datafusion_extracted_1 IS NOT NULL
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
Projection: test.id, test.user
Filter: __datafusion_extracted_2 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user
TableScan: test projection=[id, user]
## After Pushdown
Projection: test.id, test.user
Filter: __datafusion_extracted_1 IS NOT NULL
Filter: __datafusion_extracted_2 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
TableScan: test projection=[id, user]
## Optimized
Projection: test.id, test.user
Filter: __datafusion_extracted_1 IS NOT NULL
Projection: test.id, test.user, __datafusion_extracted_1
Filter: __datafusion_extracted_2 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
TableScan: test projection=[id, user]
"#)
}
#[test]
fn test_extract_through_passthrough_projection() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("user")])?
.project(vec![leaf_udf(col("user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name"))
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: leaf_udf(test.user, Utf8("name"))
TableScan: test projection=[user]
"#)
}
#[test]
fn test_projection_early_return_no_extraction() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("x"), col("b")])?
.build()?;
assert_stages!(plan, @"
## Original Plan
Projection: test.a AS x, test.b
TableScan: test projection=[a, b]
## After Extraction
(same as original)
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
")
}
#[test]
fn test_projection_with_arithmetic_no_extraction() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![(col("a") + col("b")).alias("sum")])?
.build()?;
assert_stages!(plan, @"
## Original Plan
Projection: test.a + test.b AS sum
TableScan: test projection=[a, b]
## After Extraction
(same as original)
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
")
}
#[test]
fn test_aggregate_merge_into_extracted_projection() -> Result<()> {
use datafusion_expr::test::function_stub::count;
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(leaf_udf(col("user"), "status").eq(lit("active")))?
.aggregate(vec![leaf_udf(col("user"), "name")], vec![count(lit(1))])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Aggregate: groupBy=[[leaf_udf(test.user, Utf8("name"))]], aggr=[[COUNT(Int32(1))]]
Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
TableScan: test projection=[user]
## After Extraction
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
Projection: test.user
Filter: __datafusion_extracted_2 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user
TableScan: test projection=[user]
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
Filter: __datafusion_extracted_2 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
TableScan: test projection=[user]
## Optimized
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
Projection: __datafusion_extracted_1
Filter: __datafusion_extracted_2 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
TableScan: test projection=[user]
"#)
}
#[test]
fn test_projection_with_leaf_expr_above_aggregate() -> Result<()> {
use datafusion_expr::test::function_stub::count;
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("user")], vec![count(lit(1))])?
.project(vec![
leaf_udf(col("user"), "name")
.is_not_null()
.alias("has_name"),
col("COUNT(Int32(1))"),
])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 IS NOT NULL AS has_name, COUNT(Int32(1))
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user, COUNT(Int32(1))
Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
TableScan: test projection=[user]
## Optimized
Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
TableScan: test projection=[user]
"#)
}
#[test]
fn test_merge_with_new_columns() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(leaf_udf(col("a"), "x").eq(lit(1)))?
.filter(leaf_udf(col("b"), "y").eq(lit(2)))?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Filter: leaf_udf(test.b, Utf8("y")) = Int32(2)
Filter: leaf_udf(test.a, Utf8("x")) = Int32(1)
TableScan: test projection=[a, b, c]
## After Extraction
Projection: test.a, test.b, test.c
Filter: __datafusion_extracted_1 = Int32(2)
Projection: leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1, test.a, test.b, test.c
Projection: test.a, test.b, test.c
Filter: __datafusion_extracted_2 = Int32(1)
Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c
TableScan: test projection=[a, b, c]
## After Pushdown
Projection: test.a, test.b, test.c
Filter: __datafusion_extracted_1 = Int32(2)
Filter: __datafusion_extracted_2 = Int32(1)
Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
TableScan: test projection=[a, b, c]
## Optimized
Projection: test.a, test.b, test.c
Filter: __datafusion_extracted_1 = Int32(2)
Projection: test.a, test.b, test.c, __datafusion_extracted_1
Filter: __datafusion_extracted_2 = Int32(1)
Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
TableScan: test projection=[a, b, c]
"#)
}
fn test_table_scan_with_struct_named(name: &str) -> Result<LogicalPlan> {
use arrow::datatypes::Schema;
let schema = Schema::new(test_table_scan_with_struct_fields());
datafusion_expr::logical_plan::table_scan(Some(name), &schema, None)?.build()
}
#[test]
fn test_extract_from_join_on() -> Result<()> {
use datafusion_expr::JoinType;
let left = test_table_scan_with_struct()?;
let right = test_table_scan_with_struct_named("right")?;
let plan = LogicalPlanBuilder::from(left)
.join_with_expr_keys(
right,
JoinType::Inner,
(
vec![leaf_udf(col("user"), "id")],
vec![leaf_udf(col("user"), "id")],
),
None,
)?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Extraction
Projection: test.id, test.user, right.id, right.user
Inner Join: __datafusion_extracted_1 = __datafusion_extracted_2
Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_2, right.id, right.user
TableScan: right projection=[id, user]
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_extract_from_join_filter() -> Result<()> {
use datafusion_expr::JoinType;
let left = test_table_scan_with_struct()?;
let right = test_table_scan_with_struct_named("right")?;
let plan = LogicalPlanBuilder::from(left)
.join_on(
right,
JoinType::Inner,
vec![
col("test.user").eq(col("right.user")),
leaf_udf(col("test.user"), "status").eq(lit("active")),
],
)?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Inner Join: Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active")
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Extraction
Projection: test.id, test.user, right.id, right.user
Inner Join: Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_extract_from_join_both_sides() -> Result<()> {
use datafusion_expr::JoinType;
let left = test_table_scan_with_struct()?;
let right = test_table_scan_with_struct_named("right")?;
let plan = LogicalPlanBuilder::from(left)
.join_on(
right,
JoinType::Inner,
vec![
col("test.user").eq(col("right.user")),
leaf_udf(col("test.user"), "status").eq(lit("active")),
leaf_udf(col("right.user"), "role").eq(lit("admin")),
],
)?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Inner Join: Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active") AND leaf_udf(right.user, Utf8("role")) = Utf8("admin")
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Extraction
Projection: test.id, test.user, right.id, right.user
Inner Join: Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active") AND __datafusion_extracted_2 = Utf8("admin")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
TableScan: right projection=[id, user]
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_extract_from_join_no_extraction() -> Result<()> {
use datafusion_expr::JoinType;
let left = test_table_scan()?;
let right = test_table_scan_with_name("right")?;
let plan = LogicalPlanBuilder::from(left)
.join(right, JoinType::Inner, (vec!["a"], vec!["a"]), None)?
.build()?;
assert_stages!(plan, @"
## Original Plan
Inner Join: test.a = right.a
TableScan: test projection=[a, b, c]
TableScan: right projection=[a, b, c]
## After Extraction
(same as original)
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
")
}
#[test]
fn test_extract_from_filter_above_join() -> Result<()> {
use datafusion_expr::JoinType;
let left = test_table_scan_with_struct()?;
let right = test_table_scan_with_struct_named("right")?;
let plan = LogicalPlanBuilder::from(left)
.join_with_expr_keys(
right,
JoinType::Inner,
(
vec![leaf_udf(col("user"), "id")],
vec![leaf_udf(col("user"), "id")],
),
None,
)?
.filter(leaf_udf(col("test.user"), "status").eq(lit("active")))?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Extraction
Projection: test.id, test.user, right.id, right.user
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, right.id, right.user
Projection: test.id, test.user, right.id, right.user
Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
TableScan: right projection=[id, user]
## After Pushdown
Projection: test.id, test.user, right.id, right.user
Filter: __datafusion_extracted_1 = Utf8("active")
Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
TableScan: right projection=[id, user]
## Optimized
Projection: test.id, test.user, right.id, right.user
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: test.id, test.user, __datafusion_extracted_1, right.id, right.user
Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
TableScan: right projection=[id, user]
"#)
}
#[test]
fn test_extract_projection_above_join() -> Result<()> {
use datafusion_expr::JoinType;
let left = test_table_scan_with_struct()?;
let right = test_table_scan_with_struct_named("right")?;
let plan = LogicalPlanBuilder::from(left)
.join(right, JoinType::Inner, (vec!["id"], vec!["id"]), None)?
.project(vec![
leaf_udf(col("test.user"), "status"),
leaf_udf(col("right.user"), "role"),
])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(test.user, Utf8("status")), leaf_udf(right.user, Utf8("role"))
Inner Join: test.id = right.id
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
Inner Join: test.id = right.id
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
TableScan: right projection=[id, user]
## Optimized
Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
Inner Join: test.id = right.id
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id
TableScan: right projection=[id, user]
"#)
}
#[test]
fn test_extract_from_join_qualified_right_side() -> Result<()> {
use datafusion_expr::JoinType;
let left = test_table_scan_with_struct()?;
let right = test_table_scan_with_struct_named("right")?;
let plan = LogicalPlanBuilder::from(left)
.join_on(
right,
JoinType::Inner,
vec![
col("test.id").eq(col("right.id")),
leaf_udf(col("right.user"), "status").eq(lit("active")),
],
)?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Inner Join: Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) = Utf8("active")
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Extraction
Projection: test.id, test.user, right.id, right.user
Inner Join: Filter: test.id = right.id AND __datafusion_extracted_1 = Utf8("active")
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
TableScan: right projection=[id, user]
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_find_owning_input_ambiguous_unqualified_column() {
use std::collections::HashSet;
let left_cols: HashSet<Column> = [
Column::new(Some("test"), "user"),
Column::new_unqualified("user"),
]
.into_iter()
.collect();
let right_cols: HashSet<Column> = [
Column::new(Some("right"), "user"),
Column::new_unqualified("user"),
]
.into_iter()
.collect();
let input_column_sets = vec![left_cols, right_cols];
let unqualified = Expr::Column(Column::new_unqualified("user"));
assert_eq!(find_owning_input(&unqualified, &input_column_sets), None);
let qualified_right = Expr::Column(Column::new(Some("right"), "user"));
assert_eq!(
find_owning_input(&qualified_right, &input_column_sets),
Some(1)
);
let qualified_left = Expr::Column(Column::new(Some("test"), "user"));
assert_eq!(
find_owning_input(&qualified_left, &input_column_sets),
Some(0)
);
}
#[test]
fn test_extract_from_join_cross_input_expression() -> Result<()> {
let left = test_table_scan_with_struct()?;
let right = test_table_scan_with_struct_named("right")?;
let plan = LogicalPlanBuilder::from(left)
.join_on(
right,
datafusion_expr::JoinType::Inner,
vec![col("test.id").eq(col("right.id"))],
)?
.filter(
leaf_udf(col("test.user"), "status")
.eq(leaf_udf(col("right.user"), "status")),
)?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Filter: leaf_udf(test.user, Utf8("status")) = leaf_udf(right.user, Utf8("status"))
Inner Join: Filter: test.id = right.id
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Extraction
Projection: test.id, test.user, right.id, right.user
Filter: __datafusion_extracted_1 = __datafusion_extracted_2
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, right.id, right.user
Inner Join: Filter: test.id = right.id
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Pushdown
Projection: test.id, test.user, right.id, right.user
Filter: __datafusion_extracted_1 = __datafusion_extracted_2
Inner Join: Filter: test.id = right.id
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, right.id, right.user
TableScan: right projection=[id, user]
## Optimized
(same as after pushdown)
"#)
}
#[test]
fn test_extract_through_filter_with_column_rename() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("user").alias("x")])?
.filter(col("x").is_not_null())?
.project(vec![leaf_udf(col("x"), "a")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(x, Utf8("a"))
Filter: x IS NOT NULL
Projection: test.user AS x
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
Filter: x IS NOT NULL
Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
Filter: x IS NOT NULL
Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_partial_through_filter_with_column_rename() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("user").alias("x")])?
.filter(col("x").is_not_null())?
.project(vec![leaf_udf(col("x"), "a").is_not_null()])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(x, Utf8("a")) IS NOT NULL
Filter: x IS NOT NULL
Projection: test.user AS x
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
Filter: x IS NOT NULL
Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
Filter: x IS NOT NULL
Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_from_filter_above_renaming_projection() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("user").alias("x")])?
.filter(leaf_udf(col("x"), "a").eq(lit("active")))?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Filter: leaf_udf(x, Utf8("a")) = Utf8("active")
Projection: test.user AS x
TableScan: test projection=[user]
## After Extraction
Projection: x
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## After Pushdown
(same as after extraction)
## Optimized
Projection: x
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_through_subquery_alias() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.alias("sub")?
.project(vec![leaf_udf(col("sub.user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(sub.user, Utf8("name"))
SubqueryAlias: sub
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
SubqueryAlias: sub
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
SubqueryAlias: sub
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_through_subquery_alias_with_filter() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.alias("sub")?
.filter(leaf_udf(col("sub.user"), "status").eq(lit("active")))?
.project(vec![leaf_udf(col("sub.user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(sub.user, Utf8("name"))
Filter: leaf_udf(sub.user, Utf8("status")) = Utf8("active")
SubqueryAlias: sub
TableScan: test projection=[user]
## After Extraction
Projection: leaf_udf(sub.user, Utf8("name"))
Projection: sub.user
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(sub.user, Utf8("status")) AS __datafusion_extracted_1, sub.user
SubqueryAlias: sub
TableScan: test projection=[user]
## After Pushdown
Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
Filter: __datafusion_extracted_1 = Utf8("active")
SubqueryAlias: sub
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.user
TableScan: test projection=[user]
## Optimized
Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
Filter: __datafusion_extracted_1 = Utf8("active")
SubqueryAlias: sub
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
TableScan: test projection=[user]
"#)
}
#[test]
fn test_extract_through_nested_subquery_alias() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.alias("inner_sub")?
.alias("outer_sub")?
.project(vec![leaf_udf(col("outer_sub.user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: leaf_udf(outer_sub.user, Utf8("name"))
SubqueryAlias: outer_sub
SubqueryAlias: inner_sub
TableScan: test projection=[user]
## After Extraction
(same as original)
## After Pushdown
Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
SubqueryAlias: outer_sub
SubqueryAlias: inner_sub
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
TableScan: test projection=[user]
## Optimized
Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
SubqueryAlias: outer_sub
SubqueryAlias: inner_sub
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
TableScan: test projection=[user]
"#)
}
#[test]
fn test_subquery_alias_no_extraction() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.alias("sub")?
.project(vec![col("sub.a"), col("sub.b")])?
.build()?;
assert_stages!(plan, @"
## Original Plan
SubqueryAlias: sub
TableScan: test projection=[a, b]
## After Extraction
(same as original)
## After Pushdown
(same as after extraction)
## Optimized
(same as after pushdown)
")
}
#[test]
fn test_different_udfs_same_schema_name_not_deduplicated() -> Result<()> {
let udf_a = Arc::new(ScalarUDF::new_from_impl(
PlacementTestUDF::new()
.with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
.with_id(1),
));
let udf_b = Arc::new(ScalarUDF::new_from_impl(
PlacementTestUDF::new()
.with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
.with_id(2),
));
let expr_a = Expr::ScalarFunction(ScalarFunction::new_udf(
udf_a,
vec![col("user"), lit("field")],
));
let expr_b = Expr::ScalarFunction(ScalarFunction::new_udf(
udf_b,
vec![col("user"), lit("field")],
));
assert_eq!(
expr_a.schema_name().to_string(),
expr_b.schema_name().to_string(),
"Both expressions should have the same schema_name"
);
assert_ne!(
expr_a, expr_b,
"Expressions should NOT be equal (different UDF instances)"
);
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan.clone())
.filter(expr_a.clone().eq(lit("a")).and(expr_b.clone().eq(lit("b"))))?
.select(vec![
table_scan
.schema()
.index_of_column_by_name(None, "id")
.unwrap(),
])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: test.id
Filter: leaf_udf(test.user, Utf8("field")) = Utf8("a") AND leaf_udf(test.user, Utf8("field")) = Utf8("b")
TableScan: test projection=[id, user]
## After Extraction
Projection: test.id
Projection: test.id, test.user
Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id, test.user
TableScan: test projection=[id, user]
## After Pushdown
(same as after extraction)
## Optimized
Projection: test.id
Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id
TableScan: test projection=[id, user]
"#)
}
#[test]
fn test_extraction_pushdown_through_filter_with_extracted_predicate() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(leaf_udf(col("user"), "status").eq(lit("active")))?
.project(vec![col("id"), leaf_udf(col("user"), "name")])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: test.id, leaf_udf(test.user, Utf8("name"))
Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
TableScan: test projection=[id, user]
## After Extraction
Projection: test.id, leaf_udf(test.user, Utf8("name"))
Projection: test.id, test.user
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
## After Pushdown
Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
TableScan: test projection=[id, user]
## Optimized
Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
Filter: __datafusion_extracted_1 = Utf8("active")
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
TableScan: test projection=[id, user]
"#)
}
#[test]
fn test_extraction_pushdown_same_expr_in_filter_and_projection() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let field_expr = leaf_udf(col("user"), "status");
let plan = LogicalPlanBuilder::from(table_scan)
.filter(field_expr.clone().gt(lit(5)))?
.project(vec![col("id"), field_expr])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: test.id, leaf_udf(test.user, Utf8("status"))
Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
TableScan: test projection=[id, user]
## After Extraction
Projection: test.id, leaf_udf(test.user, Utf8("status"))
Projection: test.id, test.user
Filter: __datafusion_extracted_1 > Int32(5)
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
## After Pushdown
Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
Filter: __datafusion_extracted_1 > Int32(5)
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
TableScan: test projection=[id, user]
## Optimized
Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
Filter: __datafusion_extracted_1 > Int32(5)
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
TableScan: test projection=[id, user]
"#)
}
#[test]
fn test_left_join_with_filter_and_projection_extraction() -> Result<()> {
use datafusion_expr::JoinType;
let left = test_table_scan_with_struct()?;
let right = test_table_scan_with_struct_named("right")?;
let plan = LogicalPlanBuilder::from(left)
.join_on(
right,
JoinType::Left,
vec![
col("test.id").eq(col("right.id")),
leaf_udf(col("right.user"), "status").gt(lit(5)),
],
)?
.project(vec![
col("test.id"),
leaf_udf(col("test.user"), "name"),
leaf_udf(col("right.user"), "status"),
])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
Left Join: Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) > Int32(5)
TableScan: test projection=[id, user]
TableScan: right projection=[id, user]
## After Extraction
Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
Projection: test.id, test.user, right.id, right.user
Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
TableScan: right projection=[id, user]
## After Pushdown
Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id, test.user
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
TableScan: right projection=[id, user]
## Optimized
Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id
TableScan: test projection=[id, user]
Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
TableScan: right projection=[id, user]
"#)
}
#[test]
fn test_pure_extraction_proj_push_through_filter() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(leaf_udf(col("user"), "status").gt(lit(5)))?
.project(vec![
col("id"),
leaf_udf(col("user"), "name"),
leaf_udf(col("user"), "status"),
])?
.build()?;
assert_stages!(plan, @r#"
## Original Plan
Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
TableScan: test projection=[id, user]
## After Extraction
Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
Projection: test.id, test.user
Filter: __datafusion_extracted_1 > Int32(5)
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
TableScan: test projection=[id, user]
## After Pushdown
Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
Filter: __datafusion_extracted_1 > Int32(5)
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
TableScan: test projection=[id, user]
## Optimized
Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
Filter: __datafusion_extracted_1 > Int32(5)
Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
TableScan: test projection=[id, user]
"#)
}
#[test]
fn test_merge_extraction_into_projection_with_column_ref_inflation() -> Result<()> {
let table_scan = test_table_scan_with_struct()?;
let inner = LogicalPlanBuilder::from(table_scan)
.project(vec![col("user"), col("id")])?
.build()?;
let plan = LogicalPlanBuilder::from(inner)
.project(vec![
leaf_udf(col("user"), "status")
.alias(format!("{EXTRACTED_EXPR_PREFIX}_1")),
col("id"),
])?
.build()?;
let ctx = OptimizerContext::new().with_max_passes(1);
let optimizer =
Optimizer::with_rules(vec![Arc::new(PushDownLeafProjections::new())]);
let result = optimizer.optimize(plan, &ctx, |_, _| {})?;
insta::assert_snapshot!(format!("{result}"), @r#"
Projection: __datafusion_extracted_1, test.id
Projection: test.user, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
TableScan: test
"#);
Ok(())
}
}