datafusion_physical_expr_adapter/
schema_rewriter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical expression schema rewriting utilities: [`PhysicalExprAdapter`],
19//! [`PhysicalExprAdapterFactory`], default implementations,
20//! and [`replace_columns_with_literals`].
21
22use std::borrow::Borrow;
23use std::collections::HashMap;
24use std::hash::Hash;
25use std::sync::Arc;
26
27use arrow::compute::can_cast_types;
28use arrow::datatypes::{DataType, Schema, SchemaRef};
29use datafusion_common::{
30    Result, ScalarValue, exec_err,
31    nested_struct::validate_struct_compatibility,
32    tree_node::{Transformed, TransformedResult, TreeNode},
33};
34use datafusion_functions::core::getfield::GetFieldFunc;
35use datafusion_physical_expr::expressions::CastColumnExpr;
36use datafusion_physical_expr::{
37    ScalarFunctionExpr,
38    expressions::{self, Column},
39};
40use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
41
42/// Replace column references in the given physical expression with literal values.
43///
44/// Some use cases for this include:
45/// - Partition column pruning: When scanning partitioned data, partition column references
46///   can be replaced with their literal values for the specific partition being scanned.
47/// - Constant folding: In some cases, columns that can be proven to be constant
48///   from statistical analysis may be replaced with their literal values to optimize expression evaluation.
49/// - Filling in non-null default values: in a custom [`PhysicalExprAdapter`] implementation,
50///   column references can be replaced with default literal values instead of nulls.
51///
52/// # Arguments
53/// - `expr`: The physical expression in which to replace column references.
54/// - `replacements`: A mapping from column names to their corresponding literal `ScalarValue`s.
55///   Accepts various HashMap types including `HashMap<&str, &ScalarValue>`,
56///   `HashMap<String, ScalarValue>`, `HashMap<String, &ScalarValue>`, etc.
57///
58/// # Returns
59/// - `Result<Arc<dyn PhysicalExpr>>`: The rewritten physical expression with columns replaced by literals.
60pub fn replace_columns_with_literals<K, V>(
61    expr: Arc<dyn PhysicalExpr>,
62    replacements: &HashMap<K, V>,
63) -> Result<Arc<dyn PhysicalExpr>>
64where
65    K: Borrow<str> + Eq + Hash,
66    V: Borrow<ScalarValue>,
67{
68    expr.transform_down(|expr| {
69        if let Some(column) = expr.as_any().downcast_ref::<Column>()
70            && let Some(replacement_value) = replacements.get(column.name())
71        {
72            return Ok(Transformed::yes(expressions::lit(
73                replacement_value.borrow().clone(),
74            )));
75        }
76        Ok(Transformed::no(expr))
77    })
78    .data()
79}
80
81/// Trait for adapting [`PhysicalExpr`] expressions to match a target schema.
82///
83/// This is used in file scans to rewrite expressions so that they can be
84/// evaluated against the physical schema of the file being scanned. It allows
85/// for handling differences between logical and physical schemas, such as type
86/// mismatches or missing columns common in [Schema evolution] scenarios.
87///
88/// [Schema evolution]: https://www.dremio.com/wiki/schema-evolution/
89///
90/// ## Default Implementations
91///
92/// The default implementation [`DefaultPhysicalExprAdapter`]  handles common
93/// cases.
94///
95/// ## Custom Implementations
96///
97/// You can create a custom implementation of this trait to handle specific rewriting logic.
98/// For example, to fill in missing columns with default values instead of nulls:
99///
100/// ```rust
101/// use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory};
102/// use arrow::datatypes::{Schema, Field, DataType, FieldRef, SchemaRef};
103/// use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
104/// use datafusion_common::{Result, ScalarValue, tree_node::{Transformed, TransformedResult, TreeNode}};
105/// use datafusion_physical_expr::expressions::{self, Column};
106/// use std::sync::Arc;
107///
108/// #[derive(Debug)]
109/// pub struct CustomPhysicalExprAdapter {
110///     logical_file_schema: SchemaRef,
111///     physical_file_schema: SchemaRef,
112/// }
113///
114/// impl PhysicalExprAdapter for CustomPhysicalExprAdapter {
115///     fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
116///         expr.transform(|expr| {
117///             if let Some(column) = expr.as_any().downcast_ref::<Column>() {
118///                 // Check if the column exists in the physical schema
119///                 if self.physical_file_schema.index_of(column.name()).is_err() {
120///                     // If the column is missing, fill it with a default value instead of null
121///                     // The default value could be stored in the table schema's column metadata for example.
122///                     let default_value = ScalarValue::Int32(Some(0));
123///                     return Ok(Transformed::yes(expressions::lit(default_value)));
124///                 }
125///             }
126///             // If the column exists, return it as is
127///             Ok(Transformed::no(expr))
128///         }).data()
129///     }
130/// }
131///
132/// #[derive(Debug)]
133/// pub struct CustomPhysicalExprAdapterFactory;
134///
135/// impl PhysicalExprAdapterFactory for CustomPhysicalExprAdapterFactory {
136///     fn create(
137///         &self,
138///         logical_file_schema: SchemaRef,
139///         physical_file_schema: SchemaRef,
140///     ) -> Arc<dyn PhysicalExprAdapter> {
141///         Arc::new(CustomPhysicalExprAdapter {
142///             logical_file_schema,
143///             physical_file_schema,
144///         })
145///     }
146/// }
147/// ```
148pub trait PhysicalExprAdapter: Send + Sync + std::fmt::Debug {
149    /// Rewrite a physical expression to match the target schema.
150    ///
151    /// This method should return a transformed expression that matches the target schema.
152    ///
153    /// Arguments:
154    /// - `expr`: The physical expression to rewrite.
155    /// - `logical_file_schema`: The logical schema of the table being queried, excluding any partition columns.
156    /// - `physical_file_schema`: The physical schema of the file being scanned.
157    /// - `partition_values`: Optional partition values to use for rewriting partition column references.
158    ///   These are handled as if they were columns appended onto the logical file schema.
159    ///
160    /// Returns:
161    /// - `Arc<dyn PhysicalExpr>`: The rewritten physical expression that can be evaluated against the physical schema.
162    ///
163    /// See Also:
164    /// - [`replace_columns_with_literals`]: for replacing partition column references with their literal values.
165    fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>>;
166}
167
168/// Creates instances of [`PhysicalExprAdapter`] for given logical and physical schemas.
169///
170/// See [`DefaultPhysicalExprAdapterFactory`] for the default implementation.
171pub trait PhysicalExprAdapterFactory: Send + Sync + std::fmt::Debug {
172    /// Create a new instance of the physical expression adapter.
173    fn create(
174        &self,
175        logical_file_schema: SchemaRef,
176        physical_file_schema: SchemaRef,
177    ) -> Arc<dyn PhysicalExprAdapter>;
178}
179
180#[derive(Debug, Clone)]
181pub struct DefaultPhysicalExprAdapterFactory;
182
183impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory {
184    fn create(
185        &self,
186        logical_file_schema: SchemaRef,
187        physical_file_schema: SchemaRef,
188    ) -> Arc<dyn PhysicalExprAdapter> {
189        Arc::new(DefaultPhysicalExprAdapter {
190            logical_file_schema,
191            physical_file_schema,
192        })
193    }
194}
195
196/// Default implementation of [`PhysicalExprAdapter`] for rewriting physical
197/// expressions to match different schemas.
198///
199/// ## Overview
200///
201///  [`DefaultPhysicalExprAdapter`] rewrites physical expressions to match
202///  different schemas, including:
203///
204/// - **Type casting**: When logical and physical schemas have different types, expressions are
205///   automatically wrapped with cast operations. For example, `lit(ScalarValue::Int32(123)) = int64_column`
206///   gets rewritten to `lit(ScalarValue::Int32(123)) = cast(int64_column, 'Int32')`.
207///   Note that this does not attempt to simplify such expressions - that is done by shared simplifiers.
208///
209/// - **Missing columns**: When a column exists in the logical schema but not in the physical schema,
210///   references to it are replaced with null literals.
211///
212/// - **Struct field access**: Expressions like `struct_column.field_that_is_missing_in_schema` are
213///   rewritten to `null` when the field doesn't exist in the physical schema.
214///
215/// - **Default column values**: Partition column references can be replaced with their literal values
216///   when scanning specific partitions. See [`replace_columns_with_literals`] for more details.
217///
218/// # Example
219///
220/// ```rust
221/// # use datafusion_physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory};
222/// # use arrow::datatypes::Schema;
223/// # use std::sync::Arc;
224/// #
225/// # fn example(
226/// #     predicate: std::sync::Arc<dyn datafusion_physical_expr_common::physical_expr::PhysicalExpr>,
227/// #     physical_file_schema: &Schema,
228/// #     logical_file_schema: &Schema,
229/// # ) -> datafusion_common::Result<()> {
230/// let factory = DefaultPhysicalExprAdapterFactory;
231/// let adapter = factory.create(Arc::new(logical_file_schema.clone()), Arc::new(physical_file_schema.clone()));
232/// let adapted_predicate = adapter.rewrite(predicate)?;
233/// # Ok(())
234/// # }
235/// ```
236#[derive(Debug, Clone)]
237pub struct DefaultPhysicalExprAdapter {
238    logical_file_schema: SchemaRef,
239    physical_file_schema: SchemaRef,
240}
241
242impl DefaultPhysicalExprAdapter {
243    /// Create a new instance of the default physical expression adapter.
244    ///
245    /// This adapter rewrites expressions to match the physical schema of the file being scanned,
246    /// handling type mismatches and missing columns by filling them with default values.
247    pub fn new(logical_file_schema: SchemaRef, physical_file_schema: SchemaRef) -> Self {
248        Self {
249            logical_file_schema,
250            physical_file_schema,
251        }
252    }
253}
254
255impl PhysicalExprAdapter for DefaultPhysicalExprAdapter {
256    fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
257        let rewriter = DefaultPhysicalExprAdapterRewriter {
258            logical_file_schema: &self.logical_file_schema,
259            physical_file_schema: &self.physical_file_schema,
260        };
261        expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr)))
262            .data()
263    }
264}
265
266struct DefaultPhysicalExprAdapterRewriter<'a> {
267    logical_file_schema: &'a Schema,
268    physical_file_schema: &'a Schema,
269}
270
271impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
272    fn rewrite_expr(
273        &self,
274        expr: Arc<dyn PhysicalExpr>,
275    ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
276        if let Some(transformed) = self.try_rewrite_struct_field_access(&expr)? {
277            return Ok(Transformed::yes(transformed));
278        }
279
280        if let Some(column) = expr.as_any().downcast_ref::<Column>() {
281            return self.rewrite_column(Arc::clone(&expr), column);
282        }
283
284        Ok(Transformed::no(expr))
285    }
286
287    /// Attempt to rewrite struct field access expressions to return null if the field does not exist in the physical schema.
288    /// Note that this does *not* handle nested struct fields, only top-level struct field access.
289    /// See <https://github.com/apache/datafusion/issues/17114> for more details.
290    fn try_rewrite_struct_field_access(
291        &self,
292        expr: &Arc<dyn PhysicalExpr>,
293    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
294        let get_field_expr =
295            match ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(expr.as_ref()) {
296                Some(expr) => expr,
297                None => return Ok(None),
298            };
299
300        let source_expr = match get_field_expr.args().first() {
301            Some(expr) => expr,
302            None => return Ok(None),
303        };
304
305        let field_name_expr = match get_field_expr.args().get(1) {
306            Some(expr) => expr,
307            None => return Ok(None),
308        };
309
310        let lit = match field_name_expr
311            .as_any()
312            .downcast_ref::<expressions::Literal>()
313        {
314            Some(lit) => lit,
315            None => return Ok(None),
316        };
317
318        let field_name = match lit.value().try_as_str().flatten() {
319            Some(name) => name,
320            None => return Ok(None),
321        };
322
323        let column = match source_expr.as_any().downcast_ref::<Column>() {
324            Some(column) => column,
325            None => return Ok(None),
326        };
327
328        let physical_field =
329            match self.physical_file_schema.field_with_name(column.name()) {
330                Ok(field) => field,
331                Err(_) => return Ok(None),
332            };
333
334        let physical_struct_fields = match physical_field.data_type() {
335            DataType::Struct(fields) => fields,
336            _ => return Ok(None),
337        };
338
339        if physical_struct_fields
340            .iter()
341            .any(|f| f.name() == field_name)
342        {
343            return Ok(None);
344        }
345
346        let logical_field = match self.logical_file_schema.field_with_name(column.name())
347        {
348            Ok(field) => field,
349            Err(_) => return Ok(None),
350        };
351
352        let logical_struct_fields = match logical_field.data_type() {
353            DataType::Struct(fields) => fields,
354            _ => return Ok(None),
355        };
356
357        let logical_struct_field = match logical_struct_fields
358            .iter()
359            .find(|f| f.name() == field_name)
360        {
361            Some(field) => field,
362            None => return Ok(None),
363        };
364
365        let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?;
366        Ok(Some(expressions::lit(null_value)))
367    }
368
369    fn rewrite_column(
370        &self,
371        expr: Arc<dyn PhysicalExpr>,
372        column: &Column,
373    ) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
374        // Get the logical field for this column if it exists in the logical schema
375        let logical_field = match self.logical_file_schema.field_with_name(column.name())
376        {
377            Ok(field) => field,
378            Err(e) => {
379                // This can be hit if a custom rewrite injected a reference to a column that doesn't exist in the logical schema.
380                // For example, a pre-computed column that is kept only in the physical schema.
381                // If the column exists in the physical schema, we can still use it.
382                if let Ok(physical_field) =
383                    self.physical_file_schema.field_with_name(column.name())
384                {
385                    // If the column exists in the physical schema, we can use it in place of the logical column.
386                    // This is nice to users because if they do a rewrite that results in something like `physical_int32_col = 123u64`
387                    // we'll at least handle the casts for them.
388                    physical_field
389                } else {
390                    // A completely unknown column that doesn't exist in either schema!
391                    // This should probably never be hit unless something upstream broke, but nonetheless it's better
392                    // for us to return a handleable error than to panic / do something unexpected.
393                    return Err(e.into());
394                }
395            }
396        };
397
398        // Check if the column exists in the physical schema
399        let physical_column_index = match self
400            .physical_file_schema
401            .index_of(column.name())
402        {
403            Ok(index) => index,
404            Err(_) => {
405                if !logical_field.is_nullable() {
406                    return exec_err!(
407                        "Non-nullable column '{}' is missing from the physical schema",
408                        column.name()
409                    );
410                }
411                // If the column is missing from the physical schema fill it in with nulls.
412                // For a different behavior, provide a custom `PhysicalExprAdapter` implementation.
413                let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?;
414                return Ok(Transformed::yes(expressions::lit(null_value)));
415            }
416        };
417        let physical_field = self.physical_file_schema.field(physical_column_index);
418
419        let column = match (
420            column.index() == physical_column_index,
421            logical_field.data_type() == physical_field.data_type(),
422        ) {
423            // If the column index matches and the data types match, we can use the column as is
424            (true, true) => return Ok(Transformed::no(expr)),
425            // If the indexes or data types do not match, we need to create a new column expression
426            (true, _) => column.clone(),
427            (false, _) => {
428                Column::new_with_schema(logical_field.name(), self.physical_file_schema)?
429            }
430        };
431
432        if logical_field.data_type() == physical_field.data_type() {
433            // If the data types match, we can use the column as is
434            return Ok(Transformed::yes(Arc::new(column)));
435        }
436
437        // We need to cast the column to the logical data type
438        // TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123`
439        // since that's much cheaper to evalaute.
440        // See https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
441        //
442        // For struct types, use validate_struct_compatibility which handles:
443        // - Missing fields in source (filled with nulls)
444        // - Extra fields in source (ignored)
445        // - Recursive validation of nested structs
446        // For non-struct types, use Arrow's can_cast_types
447        match (physical_field.data_type(), logical_field.data_type()) {
448            (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => {
449                validate_struct_compatibility(physical_fields, logical_fields)?;
450            }
451            _ => {
452                let is_compatible =
453                    can_cast_types(physical_field.data_type(), logical_field.data_type());
454                if !is_compatible {
455                    return exec_err!(
456                        "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)",
457                        column.name(),
458                        physical_field.data_type(),
459                        logical_field.data_type()
460                    );
461                }
462            }
463        }
464
465        let cast_expr = Arc::new(CastColumnExpr::new(
466            Arc::new(column),
467            Arc::new(physical_field.clone()),
468            Arc::new(logical_field.clone()),
469            None,
470        ));
471
472        Ok(Transformed::yes(cast_expr))
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use arrow::array::{
480        BooleanArray, Int32Array, Int64Array, RecordBatch, RecordBatchOptions,
481        StringArray, StringViewArray, StructArray,
482    };
483    use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
484    use datafusion_common::{Result, ScalarValue, assert_contains, record_batch};
485    use datafusion_expr::Operator;
486    use datafusion_physical_expr::expressions::{Column, Literal, col, lit};
487    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
488    use itertools::Itertools;
489    use std::sync::Arc;
490
491    fn create_test_schema() -> (Schema, Schema) {
492        let physical_schema = Schema::new(vec![
493            Field::new("a", DataType::Int32, false),
494            Field::new("b", DataType::Utf8, true),
495        ]);
496
497        let logical_schema = Schema::new(vec![
498            Field::new("a", DataType::Int64, false), // Different type
499            Field::new("b", DataType::Utf8, true),
500            Field::new("c", DataType::Float64, true), // Missing from physical
501        ]);
502
503        (physical_schema, logical_schema)
504    }
505
506    #[test]
507    fn test_rewrite_column_with_type_cast() {
508        let (physical_schema, logical_schema) = create_test_schema();
509
510        let factory = DefaultPhysicalExprAdapterFactory;
511        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
512        let column_expr = Arc::new(Column::new("a", 0));
513
514        let result = adapter.rewrite(column_expr).unwrap();
515
516        // Should be wrapped in a cast expression
517        assert!(result.as_any().downcast_ref::<CastColumnExpr>().is_some());
518    }
519
520    #[test]
521    fn test_rewrite_multi_column_expr_with_type_cast() {
522        let (physical_schema, logical_schema) = create_test_schema();
523        let factory = DefaultPhysicalExprAdapterFactory;
524        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
525
526        // Create a complex expression: (a + 5) OR (c > 0.0) that tests the recursive case of the rewriter
527        let column_a = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
528        let column_c = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
529        let expr = expressions::BinaryExpr::new(
530            Arc::clone(&column_a),
531            Operator::Plus,
532            Arc::new(expressions::Literal::new(ScalarValue::Int64(Some(5)))),
533        );
534        let expr = expressions::BinaryExpr::new(
535            Arc::new(expr),
536            Operator::Or,
537            Arc::new(expressions::BinaryExpr::new(
538                Arc::clone(&column_c),
539                Operator::Gt,
540                Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))),
541            )),
542        );
543
544        let result = adapter.rewrite(Arc::new(expr)).unwrap();
545        println!("Rewritten expression: {result}");
546
547        let expected = expressions::BinaryExpr::new(
548            Arc::new(CastColumnExpr::new(
549                Arc::new(Column::new("a", 0)),
550                Arc::new(Field::new("a", DataType::Int32, false)),
551                Arc::new(Field::new("a", DataType::Int64, false)),
552                None,
553            )),
554            Operator::Plus,
555            Arc::new(expressions::Literal::new(ScalarValue::Int64(Some(5)))),
556        );
557        let expected = Arc::new(expressions::BinaryExpr::new(
558            Arc::new(expected),
559            Operator::Or,
560            Arc::new(expressions::BinaryExpr::new(
561                lit(ScalarValue::Float64(None)), // c is missing, so it becomes null
562                Operator::Gt,
563                Arc::new(expressions::Literal::new(ScalarValue::Float64(Some(0.0)))),
564            )),
565        )) as Arc<dyn PhysicalExpr>;
566
567        assert_eq!(
568            result.to_string(),
569            expected.to_string(),
570            "The rewritten expression did not match the expected output"
571        );
572    }
573
574    #[test]
575    fn test_rewrite_struct_column_incompatible() {
576        let physical_schema = Schema::new(vec![Field::new(
577            "data",
578            DataType::Struct(vec![Field::new("field1", DataType::Binary, true)].into()),
579            true,
580        )]);
581
582        let logical_schema = Schema::new(vec![Field::new(
583            "data",
584            DataType::Struct(vec![Field::new("field1", DataType::Int32, true)].into()),
585            true,
586        )]);
587
588        let factory = DefaultPhysicalExprAdapterFactory;
589        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
590        let column_expr = Arc::new(Column::new("data", 0));
591
592        let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
593        // validate_struct_compatibility provides more specific error about which field can't be cast
594        assert_contains!(
595            error_msg,
596            "Cannot cast struct field 'field1' from type Binary to type Int32"
597        );
598    }
599
600    #[test]
601    fn test_rewrite_struct_compatible_cast() {
602        let physical_schema = Schema::new(vec![Field::new(
603            "data",
604            DataType::Struct(
605                vec![
606                    Field::new("id", DataType::Int32, false),
607                    Field::new("name", DataType::Utf8, true),
608                ]
609                .into(),
610            ),
611            false,
612        )]);
613
614        let logical_schema = Schema::new(vec![Field::new(
615            "data",
616            DataType::Struct(
617                vec![
618                    Field::new("id", DataType::Int64, false),
619                    Field::new("name", DataType::Utf8View, true),
620                ]
621                .into(),
622            ),
623            false,
624        )]);
625
626        let factory = DefaultPhysicalExprAdapterFactory;
627        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
628        let column_expr = Arc::new(Column::new("data", 0));
629
630        let result = adapter.rewrite(column_expr).unwrap();
631
632        let expected = Arc::new(CastColumnExpr::new(
633            Arc::new(Column::new("data", 0)),
634            Arc::new(Field::new(
635                "data",
636                DataType::Struct(
637                    vec![
638                        Field::new("id", DataType::Int32, false),
639                        Field::new("name", DataType::Utf8, true),
640                    ]
641                    .into(),
642                ),
643                false,
644            )),
645            Arc::new(Field::new(
646                "data",
647                DataType::Struct(
648                    vec![
649                        Field::new("id", DataType::Int64, false),
650                        Field::new("name", DataType::Utf8View, true),
651                    ]
652                    .into(),
653                ),
654                false,
655            )),
656            None,
657        )) as Arc<dyn PhysicalExpr>;
658
659        assert_eq!(result.to_string(), expected.to_string());
660    }
661
662    #[test]
663    fn test_rewrite_missing_column() -> Result<()> {
664        let (physical_schema, logical_schema) = create_test_schema();
665
666        let factory = DefaultPhysicalExprAdapterFactory;
667        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
668        let column_expr = Arc::new(Column::new("c", 2));
669
670        let result = adapter.rewrite(column_expr)?;
671
672        // Should be replaced with a literal null
673        if let Some(literal) = result.as_any().downcast_ref::<expressions::Literal>() {
674            assert_eq!(*literal.value(), ScalarValue::Float64(None));
675        } else {
676            panic!("Expected literal expression");
677        }
678
679        Ok(())
680    }
681
682    #[test]
683    fn test_rewrite_missing_column_non_nullable_error() {
684        let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
685        let logical_schema = Schema::new(vec![
686            Field::new("a", DataType::Int64, false),
687            Field::new("b", DataType::Utf8, false), // Missing and non-nullable
688        ]);
689
690        let factory = DefaultPhysicalExprAdapterFactory;
691        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
692        let column_expr = Arc::new(Column::new("b", 1));
693
694        let error_msg = adapter.rewrite(column_expr).unwrap_err().to_string();
695        assert_contains!(error_msg, "Non-nullable column 'b' is missing");
696    }
697
698    #[test]
699    fn test_rewrite_missing_column_nullable() {
700        let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
701        let logical_schema = Schema::new(vec![
702            Field::new("a", DataType::Int64, false),
703            Field::new("b", DataType::Utf8, true), // Missing but nullable
704        ]);
705
706        let factory = DefaultPhysicalExprAdapterFactory;
707        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
708        let column_expr = Arc::new(Column::new("b", 1));
709
710        let result = adapter.rewrite(column_expr).unwrap();
711
712        let expected =
713            Arc::new(Literal::new(ScalarValue::Utf8(None))) as Arc<dyn PhysicalExpr>;
714
715        assert_eq!(result.to_string(), expected.to_string());
716    }
717
718    #[test]
719    fn test_replace_columns_with_literals() -> Result<()> {
720        let partition_value = ScalarValue::Utf8(Some("test_value".to_string()));
721        let replacements = HashMap::from([("partition_col", &partition_value)]);
722
723        let column_expr =
724            Arc::new(Column::new("partition_col", 0)) as Arc<dyn PhysicalExpr>;
725        let result = replace_columns_with_literals(column_expr, &replacements)?;
726
727        // Should be replaced with the partition value
728        let literal = result
729            .as_any()
730            .downcast_ref::<expressions::Literal>()
731            .expect("Expected literal expression");
732        assert_eq!(*literal.value(), partition_value);
733
734        Ok(())
735    }
736
737    #[test]
738    fn test_replace_columns_with_literals_no_match() -> Result<()> {
739        let value = ScalarValue::Utf8(Some("test_value".to_string()));
740        let replacements = HashMap::from([("other_col", &value)]);
741
742        let column_expr =
743            Arc::new(Column::new("partition_col", 0)) as Arc<dyn PhysicalExpr>;
744        let result = replace_columns_with_literals(column_expr, &replacements)?;
745
746        assert!(result.as_any().downcast_ref::<Column>().is_some());
747        Ok(())
748    }
749
750    #[test]
751    fn test_replace_columns_with_literals_nested_expr() -> Result<()> {
752        let value_a = ScalarValue::Int64(Some(10));
753        let value_b = ScalarValue::Int64(Some(20));
754        let replacements = HashMap::from([("a", &value_a), ("b", &value_b)]);
755
756        let expr = Arc::new(expressions::BinaryExpr::new(
757            Arc::new(Column::new("a", 0)),
758            Operator::Plus,
759            Arc::new(Column::new("b", 1)),
760        )) as Arc<dyn PhysicalExpr>;
761
762        let result = replace_columns_with_literals(expr, &replacements)?;
763        assert_eq!(result.to_string(), "10 + 20");
764
765        Ok(())
766    }
767
768    #[test]
769    fn test_rewrite_no_change_needed() -> Result<()> {
770        let (physical_schema, logical_schema) = create_test_schema();
771
772        let factory = DefaultPhysicalExprAdapterFactory;
773        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
774        let column_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
775
776        let result = adapter.rewrite(Arc::clone(&column_expr))?;
777
778        // Should be the same expression (no transformation needed)
779        // We compare the underlying pointer through the trait object
780        assert!(std::ptr::eq(
781            column_expr.as_ref() as *const dyn PhysicalExpr,
782            result.as_ref() as *const dyn PhysicalExpr
783        ));
784
785        Ok(())
786    }
787
788    #[test]
789    fn test_non_nullable_missing_column_error() {
790        let physical_schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
791        let logical_schema = Schema::new(vec![
792            Field::new("a", DataType::Int32, false),
793            Field::new("b", DataType::Utf8, false), // Non-nullable missing column
794        ]);
795
796        let factory = DefaultPhysicalExprAdapterFactory;
797        let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema));
798        let column_expr = Arc::new(Column::new("b", 1));
799
800        let result = adapter.rewrite(column_expr);
801        assert!(result.is_err());
802        assert_contains!(
803            result.unwrap_err().to_string(),
804            "Non-nullable column 'b' is missing from the physical schema"
805        );
806    }
807
808    /// Helper function to project expressions onto a RecordBatch
809    fn batch_project(
810        expr: Vec<Arc<dyn PhysicalExpr>>,
811        batch: &RecordBatch,
812        schema: SchemaRef,
813    ) -> Result<RecordBatch> {
814        let arrays = expr
815            .iter()
816            .map(|expr| {
817                expr.evaluate(batch)
818                    .and_then(|v| v.into_array(batch.num_rows()))
819            })
820            .collect::<Result<Vec<_>>>()?;
821
822        if arrays.is_empty() {
823            let options =
824                RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
825            RecordBatch::try_new_with_options(Arc::clone(&schema), arrays, &options)
826                .map_err(Into::into)
827        } else {
828            RecordBatch::try_new(Arc::clone(&schema), arrays).map_err(Into::into)
829        }
830    }
831
832    /// Example showing how we can use the `DefaultPhysicalExprAdapter` to adapt RecordBatches during a scan
833    /// to apply projections, type conversions and handling of missing columns all at once.
834    #[test]
835    fn test_adapt_batches() {
836        let physical_batch = record_batch!(
837            ("a", Int32, vec![Some(1), None, Some(3)]),
838            ("extra", Utf8, vec![Some("x"), Some("y"), None])
839        )
840        .unwrap();
841
842        let physical_schema = physical_batch.schema();
843
844        let logical_schema = Arc::new(Schema::new(vec![
845            Field::new("a", DataType::Int64, true), // Different type
846            Field::new("b", DataType::Utf8, true),  // Missing from physical
847        ]));
848
849        let projection = vec![
850            col("b", &logical_schema).unwrap(),
851            col("a", &logical_schema).unwrap(),
852        ];
853
854        let factory = DefaultPhysicalExprAdapterFactory;
855        let adapter =
856            factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema));
857
858        let adapted_projection = projection
859            .into_iter()
860            .map(|expr| adapter.rewrite(expr).unwrap())
861            .collect_vec();
862
863        let adapted_schema = Arc::new(Schema::new(
864            adapted_projection
865                .iter()
866                .map(|expr| expr.return_field(&physical_schema).unwrap())
867                .collect_vec(),
868        ));
869
870        let res = batch_project(
871            adapted_projection,
872            &physical_batch,
873            Arc::clone(&adapted_schema),
874        )
875        .unwrap();
876
877        assert_eq!(res.num_columns(), 2);
878        assert_eq!(res.column(0).data_type(), &DataType::Utf8);
879        assert_eq!(res.column(1).data_type(), &DataType::Int64);
880        assert_eq!(
881            res.column(0)
882                .as_any()
883                .downcast_ref::<arrow::array::StringArray>()
884                .unwrap()
885                .iter()
886                .collect_vec(),
887            vec![None, None, None]
888        );
889        assert_eq!(
890            res.column(1)
891                .as_any()
892                .downcast_ref::<arrow::array::Int64Array>()
893                .unwrap()
894                .iter()
895                .collect_vec(),
896            vec![Some(1), None, Some(3)]
897        );
898    }
899
900    /// Test that struct columns are properly adapted including:
901    /// - Type casting of subfields (Int32 -> Int64, Utf8 -> Utf8View)
902    /// - Missing fields in logical schema are filled with nulls
903    #[test]
904    fn test_adapt_struct_batches() {
905        // Physical struct: {id: Int32, name: Utf8}
906        let physical_struct_fields: Fields = vec![
907            Field::new("id", DataType::Int32, false),
908            Field::new("name", DataType::Utf8, true),
909        ]
910        .into();
911
912        let struct_array = StructArray::new(
913            physical_struct_fields.clone(),
914            vec![
915                Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
916                Arc::new(StringArray::from(vec![
917                    Some("alice"),
918                    None,
919                    Some("charlie"),
920                ])) as _,
921            ],
922            None,
923        );
924
925        let physical_schema = Arc::new(Schema::new(vec![Field::new(
926            "data",
927            DataType::Struct(physical_struct_fields),
928            false,
929        )]));
930
931        let physical_batch = RecordBatch::try_new(
932            Arc::clone(&physical_schema),
933            vec![Arc::new(struct_array)],
934        )
935        .unwrap();
936
937        // Logical struct: {id: Int64, name: Utf8View, extra: Boolean}
938        // - id: cast from Int32 to Int64
939        // - name: cast from Utf8 to Utf8View
940        // - extra: missing from physical, should be filled with nulls
941        let logical_struct_fields: Fields = vec![
942            Field::new("id", DataType::Int64, false),
943            Field::new("name", DataType::Utf8View, true),
944            Field::new("extra", DataType::Boolean, true), // New field, not in physical
945        ]
946        .into();
947
948        let logical_schema = Arc::new(Schema::new(vec![Field::new(
949            "data",
950            DataType::Struct(logical_struct_fields),
951            false,
952        )]));
953
954        let projection = vec![col("data", &logical_schema).unwrap()];
955
956        let factory = DefaultPhysicalExprAdapterFactory;
957        let adapter =
958            factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema));
959
960        let adapted_projection = projection
961            .into_iter()
962            .map(|expr| adapter.rewrite(expr).unwrap())
963            .collect_vec();
964
965        let adapted_schema = Arc::new(Schema::new(
966            adapted_projection
967                .iter()
968                .map(|expr| expr.return_field(&physical_schema).unwrap())
969                .collect_vec(),
970        ));
971
972        let res = batch_project(
973            adapted_projection,
974            &physical_batch,
975            Arc::clone(&adapted_schema),
976        )
977        .unwrap();
978
979        assert_eq!(res.num_columns(), 1);
980
981        let result_struct = res
982            .column(0)
983            .as_any()
984            .downcast_ref::<StructArray>()
985            .unwrap();
986
987        // Verify id field is cast to Int64
988        let id_col = result_struct.column_by_name("id").unwrap();
989        assert_eq!(id_col.data_type(), &DataType::Int64);
990        let id_values = id_col.as_any().downcast_ref::<Int64Array>().unwrap();
991        assert_eq!(
992            id_values.iter().collect_vec(),
993            vec![Some(1), Some(2), Some(3)]
994        );
995
996        // Verify name field is cast to Utf8View
997        let name_col = result_struct.column_by_name("name").unwrap();
998        assert_eq!(name_col.data_type(), &DataType::Utf8View);
999        let name_values = name_col.as_any().downcast_ref::<StringViewArray>().unwrap();
1000        assert_eq!(
1001            name_values.iter().collect_vec(),
1002            vec![Some("alice"), None, Some("charlie")]
1003        );
1004
1005        // Verify extra field (missing from physical) is filled with nulls
1006        let extra_col = result_struct.column_by_name("extra").unwrap();
1007        assert_eq!(extra_col.data_type(), &DataType::Boolean);
1008        let extra_values = extra_col.as_any().downcast_ref::<BooleanArray>().unwrap();
1009        assert_eq!(extra_values.iter().collect_vec(), vec![None, None, None]);
1010    }
1011
1012    #[test]
1013    fn test_try_rewrite_struct_field_access() {
1014        // Test the core logic of try_rewrite_struct_field_access
1015        let physical_schema = Schema::new(vec![Field::new(
1016            "struct_col",
1017            DataType::Struct(
1018                vec![Field::new("existing_field", DataType::Int32, true)].into(),
1019            ),
1020            true,
1021        )]);
1022
1023        let logical_schema = Schema::new(vec![Field::new(
1024            "struct_col",
1025            DataType::Struct(
1026                vec![
1027                    Field::new("existing_field", DataType::Int32, true),
1028                    Field::new("missing_field", DataType::Utf8, true),
1029                ]
1030                .into(),
1031            ),
1032            true,
1033        )]);
1034
1035        let rewriter = DefaultPhysicalExprAdapterRewriter {
1036            logical_file_schema: &logical_schema,
1037            physical_file_schema: &physical_schema,
1038        };
1039
1040        // Test that when a field exists in physical schema, it returns None
1041        let column = Arc::new(Column::new("struct_col", 0)) as Arc<dyn PhysicalExpr>;
1042        let result = rewriter.try_rewrite_struct_field_access(&column).unwrap();
1043        assert!(result.is_none());
1044
1045        // The actual test for the get_field expression would require creating a proper ScalarFunctionExpr
1046        // with ScalarUDF, which is complex to set up in a unit test. The integration tests in
1047        // datafusion/core/tests/parquet/schema_adapter.rs provide better coverage for this functionality.
1048    }
1049}