Trait PhysicalExprAdapter

Source
pub trait PhysicalExprAdapter:
    Send
    + Sync
    + Debug {
    // Required methods
    fn rewrite(
        &self,
        expr: Arc<dyn PhysicalExpr>,
    ) -> Result<Arc<dyn PhysicalExpr>>;
    fn with_partition_values(
        &self,
        partition_values: Vec<(FieldRef, ScalarValue)>,
    ) -> Arc<dyn PhysicalExprAdapter>;
}
Expand description

Trait for adapting physical expressions to match a target schema.

This is used in file scans to rewrite expressions so that they can be evaluated against the physical schema of the file being scanned. It allows for handling differences between logical and physical schemas, such as type mismatches or missing columns.

You can create a custom implemention of this trait to handle specific rewriting logic. For example, to fill in missing columns with default values instead of nulls:

use datafusion_physical_expr::schema_rewriter::{PhysicalExprAdapter, PhysicalExprAdapterFactory};
use arrow::datatypes::{Schema, Field, DataType, FieldRef, SchemaRef};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_common::{Result, ScalarValue, tree_node::{Transformed, TransformedResult, TreeNode}};
use datafusion_physical_expr::expressions::{self, Column};
use std::sync::Arc;

#[derive(Debug)]
pub struct CustomPhysicalExprAdapter {
    logical_file_schema: SchemaRef,
    physical_file_schema: SchemaRef,
}

impl PhysicalExprAdapter for CustomPhysicalExprAdapter {
    fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
        expr.transform(|expr| {
            if let Some(column) = expr.as_any().downcast_ref::<Column>() {
                // Check if the column exists in the physical schema
                if self.physical_file_schema.index_of(column.name()).is_err() {
                    // If the column is missing, fill it with a default value instead of null
                    // The default value could be stored in the table schema's column metadata for example.
                    let default_value = ScalarValue::Int32(Some(0));
                    return Ok(Transformed::yes(expressions::lit(default_value)));
                }
            }
            // If the column exists, return it as is
            Ok(Transformed::no(expr))
        }).data()
    }

    fn with_partition_values(
        &self,
        partition_values: Vec<(FieldRef, ScalarValue)>,
    ) -> Arc<dyn PhysicalExprAdapter> {
        // For simplicity, this example ignores partition values
        Arc::new(CustomPhysicalExprAdapter {
            logical_file_schema: self.logical_file_schema.clone(),
            physical_file_schema: self.physical_file_schema.clone(),
        })
    }
}

#[derive(Debug)]
pub struct CustomPhysicalExprAdapterFactory;

impl PhysicalExprAdapterFactory for CustomPhysicalExprAdapterFactory {
    fn create(
        &self,
        logical_file_schema: SchemaRef,
        physical_file_schema: SchemaRef,
    ) -> Arc<dyn PhysicalExprAdapter> {
        Arc::new(CustomPhysicalExprAdapter {
            logical_file_schema,
            physical_file_schema,
        })
    }
}

Required Methods§

Source

fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>

Rewrite a physical expression to match the target schema.

This method should return a transformed expression that matches the target schema.

Arguments:

  • expr: The physical expression to rewrite.
  • logical_file_schema: The logical schema of the table being queried, excluding any partition columns.
  • physical_file_schema: The physical schema of the file being scanned.
  • partition_values: Optional partition values to use for rewriting partition column references. These are handled as if they were columns appended onto the logical file schema.

Returns:

  • Arc<dyn PhysicalExpr>: The rewritten physical expression that can be evaluated against the physical schema.
Source

fn with_partition_values( &self, partition_values: Vec<(FieldRef, ScalarValue)>, ) -> Arc<dyn PhysicalExprAdapter>

Implementors§