datafusion_datasource/
schema_adapter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
19//!
20//! Adapter provides a method of translating the RecordBatches that come out of the
21//! physical format into how they should be used by DataFusion.  For instance, a schema
22//! can be stored external to a parquet file that maps parquet logical types to arrow types.
23use arrow::{
24    array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions},
25    compute::can_cast_types,
26    datatypes::{DataType, Field, Schema, SchemaRef},
27};
28use datafusion_common::{
29    nested_struct::{cast_column, validate_struct_compatibility},
30    plan_err, ColumnStatistics,
31};
32use std::{fmt::Debug, sync::Arc};
33/// Function used by [`SchemaMapping`] to adapt a column from the file schema to
34/// the table schema.
35pub type CastColumnFn =
36    dyn Fn(&ArrayRef, &Field) -> datafusion_common::Result<ArrayRef> + Send + Sync;
37
38/// Factory for creating [`SchemaAdapter`]
39///
40/// This interface provides a way to implement custom schema adaptation logic
41/// for DataSourceExec (for example, to fill missing columns with default value
42/// other than null).
43///
44/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
45/// more details and examples.
46pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
47    /// Create a [`SchemaAdapter`]
48    ///
49    /// Arguments:
50    ///
51    /// * `projected_table_schema`: The schema for the table, projected to
52    ///   include only the fields being output (projected) by the this mapping.
53    ///
54    /// * `table_schema`: The entire table schema for the table
55    fn create(
56        &self,
57        projected_table_schema: SchemaRef,
58        table_schema: SchemaRef,
59    ) -> Box<dyn SchemaAdapter>;
60
61    /// Create a [`SchemaAdapter`] using only the projected table schema.
62    ///
63    /// This is a convenience method for cases where the table schema and the
64    /// projected table schema are the same.
65    fn create_with_projected_schema(
66        &self,
67        projected_table_schema: SchemaRef,
68    ) -> Box<dyn SchemaAdapter> {
69        self.create(Arc::clone(&projected_table_schema), projected_table_schema)
70    }
71}
72
73/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
74/// schema, which may have a schema obtained from merging multiple file-level
75/// schemas.
76///
77/// This is useful for implementing schema evolution in partitioned datasets.
78///
79/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
80pub trait SchemaAdapter: Send + Sync {
81    /// Map a column index in the table schema to a column index in a particular
82    /// file schema
83    ///
84    /// This is used while reading a file to push down projections by mapping
85    /// projected column indexes from the table schema to the file schema
86    ///
87    /// Panics if index is not in range for the table schema
88    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
89
90    /// Creates a mapping for casting columns from the file schema to the table
91    /// schema.
92    ///
93    /// This is used after reading a record batch. The returned [`SchemaMapper`]:
94    ///
95    /// 1. Maps columns to the expected columns indexes
96    /// 2. Handles missing values (e.g. fills nulls or a default value) for
97    ///    columns in the in the table schema not in the file schema
98    /// 2. Handles different types: if the column in the file schema has a
99    ///    different type than `table_schema`, the mapper will resolve this
100    ///    difference (e.g. by casting to the appropriate type)
101    ///
102    /// Returns:
103    /// * a [`SchemaMapper`]
104    /// * an ordered list of columns to project from the file
105    fn map_schema(
106        &self,
107        file_schema: &Schema,
108    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
109}
110
111/// Maps, columns from a specific file schema to the table schema.
112///
113/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
114pub trait SchemaMapper: Debug + Send + Sync {
115    /// Adapts a `RecordBatch` to match the `table_schema`
116    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
117
118    /// Adapts file-level column `Statistics` to match the `table_schema`
119    fn map_column_statistics(
120        &self,
121        file_col_statistics: &[ColumnStatistics],
122    ) -> datafusion_common::Result<Vec<ColumnStatistics>>;
123}
124
125/// Default  [`SchemaAdapterFactory`] for mapping schemas.
126///
127/// This can be used to adapt file-level record batches to a table schema and
128/// implement schema evolution.
129///
130/// Given an input file schema and a table schema, this factory returns
131/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
132///
133/// 1. Reorder columns
134/// 2. Cast columns to the correct type
135/// 3. Fill missing columns with nulls
136///
137/// # Errors:
138///
139/// * If a column in the table schema is non-nullable but is not present in the
140///   file schema (i.e. it is missing), the returned mapper tries to fill it with
141///   nulls resulting in a schema error.
142///
143/// # Illustration of Schema Mapping
144///
145/// ```text
146/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
147///  ┌───────┐   ┌───────┐ │                  ┌───────┐   ┌───────┐   ┌───────┐ │
148/// ││  1.0  │   │ "foo" │                   ││ NULL  │   │ "foo" │   │ "1.0" │
149///  ├───────┤   ├───────┤ │ Schema mapping   ├───────┤   ├───────┤   ├───────┤ │
150/// ││  2.0  │   │ "bar" │                   ││  NULL │   │ "bar" │   │ "2.0" │
151///  └───────┘   └───────┘ │────────────────▶ └───────┘   └───────┘   └───────┘ │
152/// │                                        │
153///  column "c"  column "b"│                  column "a"  column "b"  column "c"│
154/// │ Float64       Utf8                     │  Int32        Utf8        Utf8
155///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
156///     Input Record Batch                         Output Record Batch
157///
158///     Schema {                                   Schema {
159///      "c": Float64,                              "a": Int32,
160///      "b": Utf8,                                 "b": Utf8,
161///     }                                           "c": Utf8,
162///                                                }
163/// ```
164///
165/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
166///
167/// Note `SchemaMapping` also supports mapping partial batches, which is used as
168/// part of predicate pushdown.
169///
170/// ```
171/// # use std::sync::Arc;
172/// # use arrow::datatypes::{DataType, Field, Schema};
173/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
174/// # use datafusion_common::record_batch;
175/// // Table has fields "a",  "b" and "c"
176/// let table_schema = Schema::new(vec![
177///     Field::new("a", DataType::Int32, true),
178///     Field::new("b", DataType::Utf8, true),
179///     Field::new("c", DataType::Utf8, true),
180/// ]);
181///
182/// // create an adapter to map the table schema to the file schema
183/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
184///
185/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
186/// // instead of 'Utf8'
187/// let file_schema = Schema::new(vec![
188///    Field::new("c", DataType::Utf8, true),
189///    Field::new("b", DataType::Float64, true),
190/// ]);
191///
192/// // Get a mapping from the file schema to the table schema
193/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
194///
195/// let file_batch = record_batch!(
196///     ("c", Utf8, vec!["foo", "bar"]),
197///     ("b", Float64, vec![1.0, 2.0])
198/// ).unwrap();
199///
200/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
201///
202/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
203/// let expected_batch = record_batch!(
204///    ("a", Int32, vec![None, None]),  // missing column filled with nulls
205///    ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
206///    ("c", Utf8, vec!["foo", "bar"])
207/// ).unwrap();
208/// assert_eq!(mapped_batch, expected_batch);
209/// ```
210#[derive(Clone, Debug, Default)]
211pub struct DefaultSchemaAdapterFactory;
212
213impl DefaultSchemaAdapterFactory {
214    /// Create a new factory for mapping batches from a file schema to a table
215    /// schema.
216    ///
217    /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
218    /// the same schema for both the projected table schema and the table
219    /// schema.
220    pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
221        Self.create(Arc::clone(&table_schema), table_schema)
222    }
223}
224
225impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
226    fn create(
227        &self,
228        projected_table_schema: SchemaRef,
229        _table_schema: SchemaRef,
230    ) -> Box<dyn SchemaAdapter> {
231        Box::new(DefaultSchemaAdapter {
232            projected_table_schema,
233        })
234    }
235}
236
237/// This SchemaAdapter requires both the table schema and the projected table
238/// schema. See  [`SchemaMapping`] for more details
239#[derive(Clone, Debug)]
240pub(crate) struct DefaultSchemaAdapter {
241    /// The schema for the table, projected to include only the fields being output (projected) by the
242    /// associated ParquetSource
243    projected_table_schema: SchemaRef,
244}
245
246/// Checks if a file field can be cast to a table field
247///
248/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible
249pub(crate) fn can_cast_field(
250    file_field: &Field,
251    table_field: &Field,
252) -> datafusion_common::Result<bool> {
253    match (file_field.data_type(), table_field.data_type()) {
254        (DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
255            validate_struct_compatibility(source_fields, target_fields)
256        }
257        _ => {
258            if can_cast_types(file_field.data_type(), table_field.data_type()) {
259                Ok(true)
260            } else {
261                plan_err!(
262                    "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
263                    file_field.name(),
264                    file_field.data_type(),
265                    table_field.data_type()
266                )
267            }
268        }
269    }
270}
271
272impl SchemaAdapter for DefaultSchemaAdapter {
273    /// Map a column index in the table schema to a column index in a particular
274    /// file schema
275    ///
276    /// Panics if index is not in range for the table schema
277    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
278        let field = self.projected_table_schema.field(index);
279        Some(file_schema.fields.find(field.name())?.0)
280    }
281
282    /// Creates a `SchemaMapping` for casting or mapping the columns from the
283    /// file schema to the table schema.
284    ///
285    /// If the provided `file_schema` contains columns of a different type to
286    /// the expected `table_schema`, the method will attempt to cast the array
287    /// data from the file schema to the table schema where possible.
288    ///
289    /// Returns a [`SchemaMapping`] that can be applied to the output batch
290    /// along with an ordered list of columns to project from the file
291    fn map_schema(
292        &self,
293        file_schema: &Schema,
294    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
295        let (field_mappings, projection) = create_field_mapping(
296            file_schema,
297            &self.projected_table_schema,
298            can_cast_field,
299        )?;
300
301        Ok((
302            Arc::new(SchemaMapping::new(
303                Arc::clone(&self.projected_table_schema),
304                field_mappings,
305                Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)),
306            )),
307            projection,
308        ))
309    }
310}
311
312/// Helper function that creates field mappings between file schema and table schema
313///
314/// Maps columns from the file schema to their corresponding positions in the table schema,
315/// applying type compatibility checking via the provided predicate function.
316///
317/// Returns field mappings (for column reordering) and a projection (for field selection).
318pub(crate) fn create_field_mapping<F>(
319    file_schema: &Schema,
320    projected_table_schema: &SchemaRef,
321    can_map_field: F,
322) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
323where
324    F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
325{
326    let mut projection = Vec::with_capacity(file_schema.fields().len());
327    let mut field_mappings = vec![None; projected_table_schema.fields().len()];
328
329    for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
330        if let Some((table_idx, table_field)) =
331            projected_table_schema.fields().find(file_field.name())
332        {
333            if can_map_field(file_field, table_field)? {
334                field_mappings[table_idx] = Some(projection.len());
335                projection.push(file_idx);
336            }
337        }
338    }
339
340    Ok((field_mappings, projection))
341}
342
343/// The SchemaMapping struct holds a mapping from the file schema to the table
344/// schema and any necessary type conversions.
345///
346/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
347/// has the projected schema, since that's the schema which is supposed to come
348/// out of the execution of this query. Thus `map_batch` uses
349/// `projected_table_schema` as it can only operate on the projected fields.
350///
351/// [`map_batch`]: Self::map_batch
352pub struct SchemaMapping {
353    /// The schema of the table. This is the expected schema after conversion
354    /// and it should match the schema of the query result.
355    projected_table_schema: SchemaRef,
356    /// Mapping from field index in `projected_table_schema` to index in
357    /// projected file_schema.
358    ///
359    /// They are Options instead of just plain `usize`s because the table could
360    /// have fields that don't exist in the file.
361    field_mappings: Vec<Option<usize>>,
362    /// Function used to adapt a column from the file schema to the table schema
363    /// when it exists in both schemas
364    cast_column: Arc<CastColumnFn>,
365}
366
367impl Debug for SchemaMapping {
368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369        f.debug_struct("SchemaMapping")
370            .field("projected_table_schema", &self.projected_table_schema)
371            .field("field_mappings", &self.field_mappings)
372            .field("cast_column", &"<fn>")
373            .finish()
374    }
375}
376
377impl SchemaMapping {
378    /// Creates a new SchemaMapping instance
379    ///
380    /// Initializes the field mappings needed to transform file data to the projected table schema
381    pub fn new(
382        projected_table_schema: SchemaRef,
383        field_mappings: Vec<Option<usize>>,
384        cast_column: Arc<CastColumnFn>,
385    ) -> Self {
386        Self {
387            projected_table_schema,
388            field_mappings,
389            cast_column,
390        }
391    }
392}
393
394impl SchemaMapper for SchemaMapping {
395    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
396    /// conversions.
397    /// The produced RecordBatch has a schema that contains only the projected columns.
398    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
399        let (_old_schema, batch_cols, batch_rows) = batch.into_parts();
400
401        let cols = self
402            .projected_table_schema
403            // go through each field in the projected schema
404            .fields()
405            .iter()
406            // and zip it with the index that maps fields from the projected table schema to the
407            // projected file schema in `batch`
408            .zip(&self.field_mappings)
409            // and for each one...
410            .map(|(field, file_idx)| {
411                file_idx.map_or_else(
412                    // If this field only exists in the table, and not in the file, then we know
413                    // that it's null, so just return that.
414                    || Ok(new_null_array(field.data_type(), batch_rows)),
415                    // However, if it does exist in both, use the cast_column function
416                    // to perform any necessary conversions
417                    |batch_idx| (self.cast_column)(&batch_cols[batch_idx], field),
418                )
419            })
420            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
421
422        // Necessary to handle empty batches
423        let options = RecordBatchOptions::new().with_row_count(Some(batch_rows));
424
425        let schema = Arc::clone(&self.projected_table_schema);
426        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
427        Ok(record_batch)
428    }
429
430    /// Adapts file-level column `Statistics` to match the `table_schema`
431    fn map_column_statistics(
432        &self,
433        file_col_statistics: &[ColumnStatistics],
434    ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
435        let mut table_col_statistics = vec![];
436
437        // Map the statistics for each field in the file schema to the corresponding field in the
438        // table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown`
439        for (_, file_col_idx) in self
440            .projected_table_schema
441            .fields()
442            .iter()
443            .zip(&self.field_mappings)
444        {
445            if let Some(file_col_idx) = file_col_idx {
446                table_col_statistics.push(
447                    file_col_statistics
448                        .get(*file_col_idx)
449                        .cloned()
450                        .unwrap_or_default(),
451                );
452            } else {
453                table_col_statistics.push(ColumnStatistics::new_unknown());
454            }
455        }
456
457        Ok(table_col_statistics)
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use arrow::{
465        array::{Array, ArrayRef, StringBuilder, StructArray, TimestampMillisecondArray},
466        compute::cast,
467        datatypes::{DataType, Field, TimeUnit},
468        record_batch::RecordBatch,
469    };
470    use datafusion_common::{stats::Precision, Result, ScalarValue, Statistics};
471
472    #[test]
473    fn test_schema_mapping_map_statistics_basic() {
474        // Create table schema (a, b, c)
475        let table_schema = Arc::new(Schema::new(vec![
476            Field::new("a", DataType::Int32, true),
477            Field::new("b", DataType::Utf8, true),
478            Field::new("c", DataType::Float64, true),
479        ]));
480
481        // Create file schema (b, a) - different order, missing c
482        let file_schema = Schema::new(vec![
483            Field::new("b", DataType::Utf8, true),
484            Field::new("a", DataType::Int32, true),
485        ]);
486
487        // Create SchemaAdapter
488        let adapter = DefaultSchemaAdapter {
489            projected_table_schema: Arc::clone(&table_schema),
490        };
491
492        // Get mapper and projection
493        let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
494
495        // Should project columns 0,1 from file
496        assert_eq!(projection, vec![0, 1]);
497
498        // Create file statistics
499        let mut file_stats = Statistics::default();
500
501        // Statistics for column b (index 0 in file)
502        let b_stats = ColumnStatistics {
503            null_count: Precision::Exact(5),
504            ..Default::default()
505        };
506
507        // Statistics for column a (index 1 in file)
508        let a_stats = ColumnStatistics {
509            null_count: Precision::Exact(10),
510            ..Default::default()
511        };
512
513        file_stats.column_statistics = vec![b_stats, a_stats];
514
515        // Map statistics
516        let table_col_stats = mapper
517            .map_column_statistics(&file_stats.column_statistics)
518            .unwrap();
519
520        // Verify stats
521        assert_eq!(table_col_stats.len(), 3);
522        assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1
523        assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0
524        assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown)
525    }
526
527    #[test]
528    fn test_schema_mapping_map_statistics_empty() {
529        // Create schemas
530        let table_schema = Arc::new(Schema::new(vec![
531            Field::new("a", DataType::Int32, true),
532            Field::new("b", DataType::Utf8, true),
533        ]));
534        let file_schema = Schema::new(vec![
535            Field::new("a", DataType::Int32, true),
536            Field::new("b", DataType::Utf8, true),
537        ]);
538
539        let adapter = DefaultSchemaAdapter {
540            projected_table_schema: Arc::clone(&table_schema),
541        };
542        let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
543
544        // Empty file statistics
545        let file_stats = Statistics::default();
546        let table_col_stats = mapper
547            .map_column_statistics(&file_stats.column_statistics)
548            .unwrap();
549
550        // All stats should be unknown
551        assert_eq!(table_col_stats.len(), 2);
552        assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
553        assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
554    }
555
556    #[test]
557    fn test_can_cast_field() {
558        // Same type should work
559        let from_field = Field::new("col", DataType::Int32, true);
560        let to_field = Field::new("col", DataType::Int32, true);
561        assert!(can_cast_field(&from_field, &to_field).unwrap());
562
563        // Casting Int32 to Float64 is allowed
564        let from_field = Field::new("col", DataType::Int32, true);
565        let to_field = Field::new("col", DataType::Float64, true);
566        assert!(can_cast_field(&from_field, &to_field).unwrap());
567
568        // Casting Float64 to Utf8 should work (converts to string)
569        let from_field = Field::new("col", DataType::Float64, true);
570        let to_field = Field::new("col", DataType::Utf8, true);
571        assert!(can_cast_field(&from_field, &to_field).unwrap());
572
573        // Binary to Utf8 is not supported - this is an example of a cast that should fail
574        // Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast
575        let from_field = Field::new("col", DataType::Binary, true);
576        let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
577        let result = can_cast_field(&from_field, &to_field);
578        assert!(result.is_err());
579        let error_msg = result.unwrap_err().to_string();
580        assert!(error_msg.contains("Cannot cast file schema field col"));
581    }
582
583    #[test]
584    fn test_create_field_mapping() {
585        // Define the table schema
586        let table_schema = Arc::new(Schema::new(vec![
587            Field::new("a", DataType::Int32, true),
588            Field::new("b", DataType::Utf8, true),
589            Field::new("c", DataType::Float64, true),
590        ]));
591
592        // Define file schema: different order, missing column c, and b has different type
593        let file_schema = Schema::new(vec![
594            Field::new("b", DataType::Float64, true), // Different type but castable to Utf8
595            Field::new("a", DataType::Int32, true),   // Same type
596            Field::new("d", DataType::Boolean, true), // Not in table schema
597        ]);
598
599        // Custom can_map_field function that allows all mappings for testing
600        let allow_all = |_: &Field, _: &Field| Ok(true);
601
602        // Test field mapping
603        let (field_mappings, projection) =
604            create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
605
606        // Expected:
607        // - field_mappings[0] (a) maps to projection[1]
608        // - field_mappings[1] (b) maps to projection[0]
609        // - field_mappings[2] (c) is None (not in file)
610        assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
611        assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a
612
613        // Test with a failing mapper
614        let fails_all = |_: &Field, _: &Field| Ok(false);
615        let (field_mappings, projection) =
616            create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
617
618        // Should have no mappings or projections if all cast checks fail
619        assert_eq!(field_mappings, vec![None, None, None]);
620        assert_eq!(projection, Vec::<usize>::new());
621
622        // Test with error-producing mapper
623        let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
624        let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
625        assert!(result.is_err());
626        assert!(result.unwrap_err().to_string().contains("Test error"));
627    }
628
629    #[test]
630    fn test_schema_mapping_new() {
631        // Define the projected table schema
632        let projected_schema = Arc::new(Schema::new(vec![
633            Field::new("a", DataType::Int32, true),
634            Field::new("b", DataType::Utf8, true),
635        ]));
636
637        // Define field mappings from table to file
638        let field_mappings = vec![Some(1), Some(0)];
639
640        // Create SchemaMapping manually
641        let mapping = SchemaMapping::new(
642            Arc::clone(&projected_schema),
643            field_mappings.clone(),
644            Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)),
645        );
646
647        // Check that fields were set correctly
648        assert_eq!(*mapping.projected_table_schema, *projected_schema);
649        assert_eq!(mapping.field_mappings, field_mappings);
650
651        // Test with a batch to ensure it works properly
652        let batch = RecordBatch::try_new(
653            Arc::new(Schema::new(vec![
654                Field::new("b_file", DataType::Utf8, true),
655                Field::new("a_file", DataType::Int32, true),
656            ])),
657            vec![
658                Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
659                Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
660            ],
661        )
662        .unwrap();
663
664        // Test that map_batch works with our manually created mapping
665        let mapped_batch = mapping.map_batch(batch).unwrap();
666
667        // Verify the mapped batch has the correct schema and data
668        assert_eq!(*mapped_batch.schema(), *projected_schema);
669        assert_eq!(mapped_batch.num_columns(), 2);
670        assert_eq!(mapped_batch.column(0).len(), 2); // a column
671        assert_eq!(mapped_batch.column(1).len(), 2); // b column
672    }
673
674    #[test]
675    fn test_map_schema_error_path() {
676        // Define the table schema
677        let table_schema = Arc::new(Schema::new(vec![
678            Field::new("a", DataType::Int32, true),
679            Field::new("b", DataType::Utf8, true),
680            Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules
681        ]));
682
683        // Define file schema with incompatible type for column c
684        let file_schema = Schema::new(vec![
685            Field::new("a", DataType::Int32, true),
686            Field::new("b", DataType::Float64, true), // Different but castable
687            Field::new("c", DataType::Binary, true),  // Not castable to Decimal128
688        ]);
689
690        // Create DefaultSchemaAdapter
691        let adapter = DefaultSchemaAdapter {
692            projected_table_schema: Arc::clone(&table_schema),
693        };
694
695        // map_schema should error due to incompatible types
696        let result = adapter.map_schema(&file_schema);
697        assert!(result.is_err());
698        let error_msg = result.unwrap_err().to_string();
699        assert!(error_msg.contains("Cannot cast file schema field c"));
700    }
701
702    #[test]
703    fn test_map_schema_happy_path() {
704        // Define the table schema
705        let table_schema = Arc::new(Schema::new(vec![
706            Field::new("a", DataType::Int32, true),
707            Field::new("b", DataType::Utf8, true),
708            Field::new("c", DataType::Decimal128(10, 2), true),
709        ]));
710
711        // Create DefaultSchemaAdapter
712        let adapter = DefaultSchemaAdapter {
713            projected_table_schema: Arc::clone(&table_schema),
714        };
715
716        // Define compatible file schema (missing column c)
717        let compatible_file_schema = Schema::new(vec![
718            Field::new("a", DataType::Int64, true), // Can be cast to Int32
719            Field::new("b", DataType::Float64, true), // Can be cast to Utf8
720        ]);
721
722        // Test successful schema mapping
723        let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
724
725        // Verify field_mappings and projection created correctly
726        assert_eq!(projection, vec![0, 1]); // Projecting a and b
727
728        // Verify the SchemaMapping works with actual data
729        let file_batch = RecordBatch::try_new(
730            Arc::new(compatible_file_schema.clone()),
731            vec![
732                Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
733                Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
734            ],
735        )
736        .unwrap();
737
738        let mapped_batch = mapper.map_batch(file_batch).unwrap();
739
740        // Verify correct schema mapping
741        assert_eq!(*mapped_batch.schema(), *table_schema);
742        assert_eq!(mapped_batch.num_columns(), 3); // a, b, c
743
744        // Column c should be null since it wasn't in the file schema
745        let c_array = mapped_batch.column(2);
746        assert_eq!(c_array.len(), 2);
747        assert_eq!(c_array.null_count(), 2);
748    }
749
750    #[test]
751    fn test_adapt_struct_with_added_nested_fields() -> Result<()> {
752        let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
753        let batch = create_test_batch_with_struct_data(&file_schema)?;
754
755        let adapter = DefaultSchemaAdapter {
756            projected_table_schema: Arc::clone(&table_schema),
757        };
758        let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
759        let mapped_batch = mapper.map_batch(batch)?;
760
761        verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?;
762        Ok(())
763    }
764
765    #[test]
766    fn test_map_column_statistics_struct() -> Result<()> {
767        let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
768
769        let adapter = DefaultSchemaAdapter {
770            projected_table_schema: Arc::clone(&table_schema),
771        };
772        let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
773
774        let file_stats = vec![
775            create_test_column_statistics(
776                0,
777                100,
778                Some(ScalarValue::Int32(Some(1))),
779                Some(ScalarValue::Int32(Some(100))),
780                Some(ScalarValue::Int32(Some(5100))),
781            ),
782            create_test_column_statistics(10, 50, None, None, None),
783        ];
784
785        let table_stats = mapper.map_column_statistics(&file_stats)?;
786        assert_eq!(table_stats.len(), 1);
787        verify_column_statistics(
788            &table_stats[0],
789            Some(0),
790            Some(100),
791            Some(ScalarValue::Int32(Some(1))),
792            Some(ScalarValue::Int32(Some(100))),
793            Some(ScalarValue::Int32(Some(5100))),
794        );
795        let missing_stats = mapper.map_column_statistics(&[])?;
796        assert_eq!(missing_stats.len(), 1);
797        assert_eq!(missing_stats[0], ColumnStatistics::new_unknown());
798        Ok(())
799    }
800
801    fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) {
802        let file_schema = Arc::new(Schema::new(vec![Field::new(
803            "info",
804            DataType::Struct(
805                vec![
806                    Field::new("location", DataType::Utf8, true),
807                    Field::new(
808                        "timestamp_utc",
809                        DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
810                        true,
811                    ),
812                ]
813                .into(),
814            ),
815            true,
816        )]));
817
818        let table_schema = Arc::new(Schema::new(vec![Field::new(
819            "info",
820            DataType::Struct(
821                vec![
822                    Field::new("location", DataType::Utf8, true),
823                    Field::new(
824                        "timestamp_utc",
825                        DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
826                        true,
827                    ),
828                    Field::new(
829                        "reason",
830                        DataType::Struct(
831                            vec![
832                                Field::new("_level", DataType::Float64, true),
833                                Field::new(
834                                    "details",
835                                    DataType::Struct(
836                                        vec![
837                                            Field::new("rurl", DataType::Utf8, true),
838                                            Field::new("s", DataType::Float64, true),
839                                            Field::new("t", DataType::Utf8, true),
840                                        ]
841                                        .into(),
842                                    ),
843                                    true,
844                                ),
845                            ]
846                            .into(),
847                        ),
848                        true,
849                    ),
850                ]
851                .into(),
852            ),
853            true,
854        )]));
855
856        (file_schema, table_schema)
857    }
858
859    fn create_test_batch_with_struct_data(
860        file_schema: &SchemaRef,
861    ) -> Result<RecordBatch> {
862        let mut location_builder = StringBuilder::new();
863        location_builder.append_value("San Francisco");
864        location_builder.append_value("New York");
865
866        let timestamp_array = TimestampMillisecondArray::from(vec![
867            Some(1640995200000),
868            Some(1641081600000),
869        ]);
870
871        let timestamp_type =
872            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()));
873        let timestamp_array = cast(&timestamp_array, &timestamp_type)?;
874
875        let info_struct = StructArray::from(vec![
876            (
877                Arc::new(Field::new("location", DataType::Utf8, true)),
878                Arc::new(location_builder.finish()) as ArrayRef,
879            ),
880            (
881                Arc::new(Field::new("timestamp_utc", timestamp_type, true)),
882                timestamp_array,
883            ),
884        ]);
885
886        Ok(RecordBatch::try_new(
887            Arc::clone(file_schema),
888            vec![Arc::new(info_struct)],
889        )?)
890    }
891
892    fn verify_adapted_batch_with_nested_fields(
893        mapped_batch: &RecordBatch,
894        table_schema: &SchemaRef,
895    ) -> Result<()> {
896        assert_eq!(mapped_batch.schema(), *table_schema);
897        assert_eq!(mapped_batch.num_rows(), 2);
898
899        let info_col = mapped_batch.column(0);
900        let info_array = info_col
901            .as_any()
902            .downcast_ref::<StructArray>()
903            .expect("Expected info column to be a StructArray");
904
905        verify_preserved_fields(info_array)?;
906        verify_reason_field_structure(info_array)?;
907        Ok(())
908    }
909
910    fn verify_preserved_fields(info_array: &StructArray) -> Result<()> {
911        let location_col = info_array
912            .column_by_name("location")
913            .expect("Expected location field in struct");
914        let location_array = location_col
915            .as_any()
916            .downcast_ref::<arrow::array::StringArray>()
917            .expect("Expected location to be a StringArray");
918        assert_eq!(location_array.value(0), "San Francisco");
919        assert_eq!(location_array.value(1), "New York");
920
921        let timestamp_col = info_array
922            .column_by_name("timestamp_utc")
923            .expect("Expected timestamp_utc field in struct");
924        let timestamp_array = timestamp_col
925            .as_any()
926            .downcast_ref::<TimestampMillisecondArray>()
927            .expect("Expected timestamp_utc to be a TimestampMillisecondArray");
928        assert_eq!(timestamp_array.value(0), 1640995200000);
929        assert_eq!(timestamp_array.value(1), 1641081600000);
930        Ok(())
931    }
932
933    fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> {
934        let reason_col = info_array
935            .column_by_name("reason")
936            .expect("Expected reason field in struct");
937        let reason_array = reason_col
938            .as_any()
939            .downcast_ref::<StructArray>()
940            .expect("Expected reason to be a StructArray");
941        assert_eq!(reason_array.fields().len(), 2);
942        assert!(reason_array.column_by_name("_level").is_some());
943        assert!(reason_array.column_by_name("details").is_some());
944
945        let details_col = reason_array
946            .column_by_name("details")
947            .expect("Expected details field in reason struct");
948        let details_array = details_col
949            .as_any()
950            .downcast_ref::<StructArray>()
951            .expect("Expected details to be a StructArray");
952        assert_eq!(details_array.fields().len(), 3);
953        assert!(details_array.column_by_name("rurl").is_some());
954        assert!(details_array.column_by_name("s").is_some());
955        assert!(details_array.column_by_name("t").is_some());
956        for i in 0..2 {
957            assert!(reason_array.is_null(i), "reason field should be null");
958        }
959        Ok(())
960    }
961
962    fn verify_column_statistics(
963        stats: &ColumnStatistics,
964        expected_null_count: Option<usize>,
965        expected_distinct_count: Option<usize>,
966        expected_min: Option<ScalarValue>,
967        expected_max: Option<ScalarValue>,
968        expected_sum: Option<ScalarValue>,
969    ) {
970        if let Some(count) = expected_null_count {
971            assert_eq!(
972                stats.null_count,
973                Precision::Exact(count),
974                "Null count should match expected value"
975            );
976        }
977        if let Some(count) = expected_distinct_count {
978            assert_eq!(
979                stats.distinct_count,
980                Precision::Exact(count),
981                "Distinct count should match expected value"
982            );
983        }
984        if let Some(min) = expected_min {
985            assert_eq!(
986                stats.min_value,
987                Precision::Exact(min),
988                "Min value should match expected value"
989            );
990        }
991        if let Some(max) = expected_max {
992            assert_eq!(
993                stats.max_value,
994                Precision::Exact(max),
995                "Max value should match expected value"
996            );
997        }
998        if let Some(sum) = expected_sum {
999            assert_eq!(
1000                stats.sum_value,
1001                Precision::Exact(sum),
1002                "Sum value should match expected value"
1003            );
1004        }
1005    }
1006
1007    fn create_test_column_statistics(
1008        null_count: usize,
1009        distinct_count: usize,
1010        min_value: Option<ScalarValue>,
1011        max_value: Option<ScalarValue>,
1012        sum_value: Option<ScalarValue>,
1013    ) -> ColumnStatistics {
1014        ColumnStatistics {
1015            null_count: Precision::Exact(null_count),
1016            distinct_count: Precision::Exact(distinct_count),
1017            min_value: min_value.map_or_else(|| Precision::Absent, Precision::Exact),
1018            max_value: max_value.map_or_else(|| Precision::Absent, Precision::Exact),
1019            sum_value: sum_value.map_or_else(|| Precision::Absent, Precision::Exact),
1020        }
1021    }
1022}