pub trait PhysicalExprAdapter:
Send
+ Sync
+ Debug {
// Required methods
fn rewrite(
&self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Arc<dyn PhysicalExpr>>;
fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter>;
}Expand description
Trait for adapting physical expressions to match a target schema.
This is used in file scans to rewrite expressions so that they can be evaluated against the physical schema of the file being scanned. It allows for handling differences between logical and physical schemas, such as type mismatches or missing columns.
§Overview
The PhysicalExprAdapter allows rewriting physical expressions to match different schemas, including:
-
Type casting: When logical and physical schemas have different types, expressions are automatically wrapped with cast operations. For example,
lit(ScalarValue::Int32(123)) = int64_columngets rewritten tolit(ScalarValue::Int32(123)) = cast(int64_column, 'Int32'). Note that this does not attempt to simplify such expressions - that is done by shared simplifiers. -
Missing columns: When a column exists in the logical schema but not in the physical schema, references to it are replaced with null literals.
-
Struct field access: Expressions like
struct_column.field_that_is_missing_in_schemaare rewritten tonullwhen the field doesn’t exist in the physical schema. -
Partition columns: Partition column references can be replaced with their literal values when scanning specific partitions.
§Custom Implementations
You can create a custom implementation of this trait to handle specific rewriting logic. For example, to fill in missing columns with default values instead of nulls:
use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory};
use arrow::datatypes::{Schema, Field, DataType, FieldRef, SchemaRef};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_common::{Result, ScalarValue, tree_node::{Transformed, TransformedResult, TreeNode}};
use datafusion_physical_expr::expressions::{self, Column};
use std::sync::Arc;
#[derive(Debug)]
pub struct CustomPhysicalExprAdapter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
}
impl PhysicalExprAdapter for CustomPhysicalExprAdapter {
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
expr.transform(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
// Check if the column exists in the physical schema
if self.physical_file_schema.index_of(column.name()).is_err() {
// If the column is missing, fill it with a default value instead of null
// The default value could be stored in the table schema's column metadata for example.
let default_value = ScalarValue::Int32(Some(0));
return Ok(Transformed::yes(expressions::lit(default_value)));
}
}
// If the column exists, return it as is
Ok(Transformed::no(expr))
}).data()
}
fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
// For simplicity, this example ignores partition values
Arc::new(CustomPhysicalExprAdapter {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
})
}
}
#[derive(Debug)]
pub struct CustomPhysicalExprAdapterFactory;
impl PhysicalExprAdapterFactory for CustomPhysicalExprAdapterFactory {
fn create(
&self,
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(CustomPhysicalExprAdapter {
logical_file_schema,
physical_file_schema,
})
}
}Required Methods§
Sourcefn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>
Rewrite a physical expression to match the target schema.
This method should return a transformed expression that matches the target schema.
Arguments:
expr: The physical expression to rewrite.logical_file_schema: The logical schema of the table being queried, excluding any partition columns.physical_file_schema: The physical schema of the file being scanned.partition_values: Optional partition values to use for rewriting partition column references. These are handled as if they were columns appended onto the logical file schema.
Returns:
Arc<dyn PhysicalExpr>: The rewritten physical expression that can be evaluated against the physical schema.