datafusion_datasource/
schema_adapter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
19//!
20//! Adapter provides a method of translating the RecordBatches that come out of the
21//! physical format into how they should be used by DataFusion.  For instance, a schema
22//! can be stored external to a parquet file that maps parquet logical types to arrow types.
23use arrow::{
24    array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions},
25    compute::can_cast_types,
26    datatypes::{DataType, Field, Schema, SchemaRef},
27};
28use datafusion_common::{
29    format::DEFAULT_CAST_OPTIONS,
30    nested_struct::{cast_column, validate_struct_compatibility},
31    plan_err, ColumnStatistics,
32};
33use std::{fmt::Debug, sync::Arc};
34/// Function used by [`SchemaMapping`] to adapt a column from the file schema to
35/// the table schema.
36pub type CastColumnFn = dyn Fn(
37        &ArrayRef,
38        &Field,
39        &arrow::compute::CastOptions,
40    ) -> datafusion_common::Result<ArrayRef>
41    + Send
42    + Sync;
43
44/// Factory for creating [`SchemaAdapter`]
45///
46/// This interface provides a way to implement custom schema adaptation logic
47/// for DataSourceExec (for example, to fill missing columns with default value
48/// other than null).
49///
50/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
51/// more details and examples.
52pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
53    /// Create a [`SchemaAdapter`]
54    ///
55    /// Arguments:
56    ///
57    /// * `projected_table_schema`: The schema for the table, projected to
58    ///   include only the fields being output (projected) by the this mapping.
59    ///
60    /// * `table_schema`: The entire table schema for the table
61    fn create(
62        &self,
63        projected_table_schema: SchemaRef,
64        table_schema: SchemaRef,
65    ) -> Box<dyn SchemaAdapter>;
66
67    /// Create a [`SchemaAdapter`] using only the projected table schema.
68    ///
69    /// This is a convenience method for cases where the table schema and the
70    /// projected table schema are the same.
71    fn create_with_projected_schema(
72        &self,
73        projected_table_schema: SchemaRef,
74    ) -> Box<dyn SchemaAdapter> {
75        self.create(Arc::clone(&projected_table_schema), projected_table_schema)
76    }
77}
78
79/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
80/// schema, which may have a schema obtained from merging multiple file-level
81/// schemas.
82///
83/// This is useful for implementing schema evolution in partitioned datasets.
84///
85/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
86pub trait SchemaAdapter: Send + Sync {
87    /// Map a column index in the table schema to a column index in a particular
88    /// file schema
89    ///
90    /// This is used while reading a file to push down projections by mapping
91    /// projected column indexes from the table schema to the file schema
92    ///
93    /// Panics if index is not in range for the table schema
94    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
95
96    /// Creates a mapping for casting columns from the file schema to the table
97    /// schema.
98    ///
99    /// This is used after reading a record batch. The returned [`SchemaMapper`]:
100    ///
101    /// 1. Maps columns to the expected columns indexes
102    /// 2. Handles missing values (e.g. fills nulls or a default value) for
103    ///    columns in the in the table schema not in the file schema
104    /// 2. Handles different types: if the column in the file schema has a
105    ///    different type than `table_schema`, the mapper will resolve this
106    ///    difference (e.g. by casting to the appropriate type)
107    ///
108    /// Returns:
109    /// * a [`SchemaMapper`]
110    /// * an ordered list of columns to project from the file
111    fn map_schema(
112        &self,
113        file_schema: &Schema,
114    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
115}
116
117/// Maps, columns from a specific file schema to the table schema.
118///
119/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
120pub trait SchemaMapper: Debug + Send + Sync {
121    /// Adapts a `RecordBatch` to match the `table_schema`
122    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
123
124    /// Adapts file-level column `Statistics` to match the `table_schema`
125    fn map_column_statistics(
126        &self,
127        file_col_statistics: &[ColumnStatistics],
128    ) -> datafusion_common::Result<Vec<ColumnStatistics>>;
129}
130
131/// Default  [`SchemaAdapterFactory`] for mapping schemas.
132///
133/// This can be used to adapt file-level record batches to a table schema and
134/// implement schema evolution.
135///
136/// Given an input file schema and a table schema, this factory returns
137/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
138///
139/// 1. Reorder columns
140/// 2. Cast columns to the correct type
141/// 3. Fill missing columns with nulls
142///
143/// # Errors:
144///
145/// * If a column in the table schema is non-nullable but is not present in the
146///   file schema (i.e. it is missing), the returned mapper tries to fill it with
147///   nulls resulting in a schema error.
148///
149/// # Illustration of Schema Mapping
150///
151/// ```text
152/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
153///  ┌───────┐   ┌───────┐ │                  ┌───────┐   ┌───────┐   ┌───────┐ │
154/// ││  1.0  │   │ "foo" │                   ││ NULL  │   │ "foo" │   │ "1.0" │
155///  ├───────┤   ├───────┤ │ Schema mapping   ├───────┤   ├───────┤   ├───────┤ │
156/// ││  2.0  │   │ "bar" │                   ││  NULL │   │ "bar" │   │ "2.0" │
157///  └───────┘   └───────┘ │────────────────▶ └───────┘   └───────┘   └───────┘ │
158/// │                                        │
159///  column "c"  column "b"│                  column "a"  column "b"  column "c"│
160/// │ Float64       Utf8                     │  Int32        Utf8        Utf8
161///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
162///     Input Record Batch                         Output Record Batch
163///
164///     Schema {                                   Schema {
165///      "c": Float64,                              "a": Int32,
166///      "b": Utf8,                                 "b": Utf8,
167///     }                                           "c": Utf8,
168///                                                }
169/// ```
170///
171/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
172///
173/// Note `SchemaMapping` also supports mapping partial batches, which is used as
174/// part of predicate pushdown.
175///
176/// ```
177/// # use std::sync::Arc;
178/// # use arrow::datatypes::{DataType, Field, Schema};
179/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
180/// # use datafusion_common::record_batch;
181/// // Table has fields "a",  "b" and "c"
182/// let table_schema = Schema::new(vec![
183///     Field::new("a", DataType::Int32, true),
184///     Field::new("b", DataType::Utf8, true),
185///     Field::new("c", DataType::Utf8, true),
186/// ]);
187///
188/// // create an adapter to map the table schema to the file schema
189/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
190///
191/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
192/// // instead of 'Utf8'
193/// let file_schema = Schema::new(vec![
194///    Field::new("c", DataType::Utf8, true),
195///    Field::new("b", DataType::Float64, true),
196/// ]);
197///
198/// // Get a mapping from the file schema to the table schema
199/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
200///
201/// let file_batch = record_batch!(
202///     ("c", Utf8, vec!["foo", "bar"]),
203///     ("b", Float64, vec![1.0, 2.0])
204/// ).unwrap();
205///
206/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
207///
208/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
209/// let expected_batch = record_batch!(
210///    ("a", Int32, vec![None, None]),  // missing column filled with nulls
211///    ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
212///    ("c", Utf8, vec!["foo", "bar"])
213/// ).unwrap();
214/// assert_eq!(mapped_batch, expected_batch);
215/// ```
216#[derive(Clone, Debug, Default)]
217pub struct DefaultSchemaAdapterFactory;
218
219impl DefaultSchemaAdapterFactory {
220    /// Create a new factory for mapping batches from a file schema to a table
221    /// schema.
222    ///
223    /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
224    /// the same schema for both the projected table schema and the table
225    /// schema.
226    pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
227        Self.create(Arc::clone(&table_schema), table_schema)
228    }
229}
230
231impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
232    fn create(
233        &self,
234        projected_table_schema: SchemaRef,
235        _table_schema: SchemaRef,
236    ) -> Box<dyn SchemaAdapter> {
237        Box::new(DefaultSchemaAdapter {
238            projected_table_schema,
239        })
240    }
241}
242
243/// This SchemaAdapter requires both the table schema and the projected table
244/// schema. See  [`SchemaMapping`] for more details
245#[derive(Clone, Debug)]
246pub(crate) struct DefaultSchemaAdapter {
247    /// The schema for the table, projected to include only the fields being output (projected) by the
248    /// associated ParquetSource
249    projected_table_schema: SchemaRef,
250}
251
252/// Checks if a file field can be cast to a table field
253///
254/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible
255pub(crate) fn can_cast_field(
256    file_field: &Field,
257    table_field: &Field,
258) -> datafusion_common::Result<bool> {
259    match (file_field.data_type(), table_field.data_type()) {
260        (DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
261            // validate_struct_compatibility returns Result<()>; on success we can cast structs
262            validate_struct_compatibility(source_fields, target_fields)?;
263            Ok(true)
264        }
265        _ => {
266            if can_cast_types(file_field.data_type(), table_field.data_type()) {
267                Ok(true)
268            } else {
269                plan_err!(
270                    "Cannot cast file schema field {} of type {} to table schema field of type {}",
271                    file_field.name(),
272                    file_field.data_type(),
273                    table_field.data_type()
274                )
275            }
276        }
277    }
278}
279
280impl SchemaAdapter for DefaultSchemaAdapter {
281    /// Map a column index in the table schema to a column index in a particular
282    /// file schema
283    ///
284    /// Panics if index is not in range for the table schema
285    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
286        let field = self.projected_table_schema.field(index);
287        Some(file_schema.fields.find(field.name())?.0)
288    }
289
290    /// Creates a `SchemaMapping` for casting or mapping the columns from the
291    /// file schema to the table schema.
292    ///
293    /// If the provided `file_schema` contains columns of a different type to
294    /// the expected `table_schema`, the method will attempt to cast the array
295    /// data from the file schema to the table schema where possible.
296    ///
297    /// Returns a [`SchemaMapping`] that can be applied to the output batch
298    /// along with an ordered list of columns to project from the file
299    fn map_schema(
300        &self,
301        file_schema: &Schema,
302    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
303        let (field_mappings, projection) = create_field_mapping(
304            file_schema,
305            &self.projected_table_schema,
306            can_cast_field,
307        )?;
308
309        Ok((
310            Arc::new(SchemaMapping::new(
311                Arc::clone(&self.projected_table_schema),
312                field_mappings,
313                Arc::new(
314                    |array: &ArrayRef,
315                     field: &Field,
316                     opts: &arrow::compute::CastOptions| {
317                        cast_column(array, field, opts)
318                    },
319                ),
320            )),
321            projection,
322        ))
323    }
324}
325
326/// Helper function that creates field mappings between file schema and table schema
327///
328/// Maps columns from the file schema to their corresponding positions in the table schema,
329/// applying type compatibility checking via the provided predicate function.
330///
331/// Returns field mappings (for column reordering) and a projection (for field selection).
332pub(crate) fn create_field_mapping<F>(
333    file_schema: &Schema,
334    projected_table_schema: &SchemaRef,
335    can_map_field: F,
336) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
337where
338    F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
339{
340    let mut projection = Vec::with_capacity(file_schema.fields().len());
341    let mut field_mappings = vec![None; projected_table_schema.fields().len()];
342
343    for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
344        if let Some((table_idx, table_field)) =
345            projected_table_schema.fields().find(file_field.name())
346        {
347            if can_map_field(file_field, table_field)? {
348                field_mappings[table_idx] = Some(projection.len());
349                projection.push(file_idx);
350            }
351        }
352    }
353
354    Ok((field_mappings, projection))
355}
356
357/// The SchemaMapping struct holds a mapping from the file schema to the table
358/// schema and any necessary type conversions.
359///
360/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
361/// has the projected schema, since that's the schema which is supposed to come
362/// out of the execution of this query. Thus `map_batch` uses
363/// `projected_table_schema` as it can only operate on the projected fields.
364///
365/// [`map_batch`]: Self::map_batch
366pub struct SchemaMapping {
367    /// The schema of the table. This is the expected schema after conversion
368    /// and it should match the schema of the query result.
369    projected_table_schema: SchemaRef,
370    /// Mapping from field index in `projected_table_schema` to index in
371    /// projected file_schema.
372    ///
373    /// They are Options instead of just plain `usize`s because the table could
374    /// have fields that don't exist in the file.
375    field_mappings: Vec<Option<usize>>,
376    /// Function used to adapt a column from the file schema to the table schema
377    /// when it exists in both schemas
378    cast_column: Arc<CastColumnFn>,
379}
380
381impl Debug for SchemaMapping {
382    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383        f.debug_struct("SchemaMapping")
384            .field("projected_table_schema", &self.projected_table_schema)
385            .field("field_mappings", &self.field_mappings)
386            .field("cast_column", &"<fn>")
387            .finish()
388    }
389}
390
391impl SchemaMapping {
392    /// Creates a new SchemaMapping instance
393    ///
394    /// Initializes the field mappings needed to transform file data to the projected table schema
395    pub fn new(
396        projected_table_schema: SchemaRef,
397        field_mappings: Vec<Option<usize>>,
398        cast_column: Arc<CastColumnFn>,
399    ) -> Self {
400        Self {
401            projected_table_schema,
402            field_mappings,
403            cast_column,
404        }
405    }
406}
407
408impl SchemaMapper for SchemaMapping {
409    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
410    /// conversions.
411    /// The produced RecordBatch has a schema that contains only the projected columns.
412    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
413        let (_old_schema, batch_cols, batch_rows) = batch.into_parts();
414
415        let cols = self
416            .projected_table_schema
417            // go through each field in the projected schema
418            .fields()
419            .iter()
420            // and zip it with the index that maps fields from the projected table schema to the
421            // projected file schema in `batch`
422            .zip(&self.field_mappings)
423            // and for each one...
424            .map(|(field, file_idx)| {
425                file_idx.map_or_else(
426                    // If this field only exists in the table, and not in the file, then we know
427                    // that it's null, so just return that.
428                    || Ok(new_null_array(field.data_type(), batch_rows)),
429                    // However, if it does exist in both, use the cast_column function
430                    // to perform any necessary conversions
431                    |batch_idx| {
432                        (self.cast_column)(
433                            &batch_cols[batch_idx],
434                            field,
435                            &DEFAULT_CAST_OPTIONS,
436                        )
437                    },
438                )
439            })
440            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
441
442        // Necessary to handle empty batches
443        let options = RecordBatchOptions::new().with_row_count(Some(batch_rows));
444
445        let schema = Arc::clone(&self.projected_table_schema);
446        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
447        Ok(record_batch)
448    }
449
450    /// Adapts file-level column `Statistics` to match the `table_schema`
451    fn map_column_statistics(
452        &self,
453        file_col_statistics: &[ColumnStatistics],
454    ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
455        let mut table_col_statistics = vec![];
456
457        // Map the statistics for each field in the file schema to the corresponding field in the
458        // table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown`
459        for (_, file_col_idx) in self
460            .projected_table_schema
461            .fields()
462            .iter()
463            .zip(&self.field_mappings)
464        {
465            if let Some(file_col_idx) = file_col_idx {
466                table_col_statistics.push(
467                    file_col_statistics
468                        .get(*file_col_idx)
469                        .cloned()
470                        .unwrap_or_default(),
471                );
472            } else {
473                table_col_statistics.push(ColumnStatistics::new_unknown());
474            }
475        }
476
477        Ok(table_col_statistics)
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use arrow::{
485        array::{Array, ArrayRef, StringBuilder, StructArray, TimestampMillisecondArray},
486        compute::cast,
487        datatypes::{DataType, Field, TimeUnit},
488        record_batch::RecordBatch,
489    };
490    use datafusion_common::{stats::Precision, Result, ScalarValue, Statistics};
491
492    #[test]
493    fn test_schema_mapping_map_statistics_basic() {
494        // Create table schema (a, b, c)
495        let table_schema = Arc::new(Schema::new(vec![
496            Field::new("a", DataType::Int32, true),
497            Field::new("b", DataType::Utf8, true),
498            Field::new("c", DataType::Float64, true),
499        ]));
500
501        // Create file schema (b, a) - different order, missing c
502        let file_schema = Schema::new(vec![
503            Field::new("b", DataType::Utf8, true),
504            Field::new("a", DataType::Int32, true),
505        ]);
506
507        // Create SchemaAdapter
508        let adapter = DefaultSchemaAdapter {
509            projected_table_schema: Arc::clone(&table_schema),
510        };
511
512        // Get mapper and projection
513        let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
514
515        // Should project columns 0,1 from file
516        assert_eq!(projection, vec![0, 1]);
517
518        // Create file statistics
519        let mut file_stats = Statistics::default();
520
521        // Statistics for column b (index 0 in file)
522        let b_stats = ColumnStatistics {
523            null_count: Precision::Exact(5),
524            ..Default::default()
525        };
526
527        // Statistics for column a (index 1 in file)
528        let a_stats = ColumnStatistics {
529            null_count: Precision::Exact(10),
530            ..Default::default()
531        };
532
533        file_stats.column_statistics = vec![b_stats, a_stats];
534
535        // Map statistics
536        let table_col_stats = mapper
537            .map_column_statistics(&file_stats.column_statistics)
538            .unwrap();
539
540        // Verify stats
541        assert_eq!(table_col_stats.len(), 3);
542        assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1
543        assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0
544        assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown)
545    }
546
547    #[test]
548    fn test_schema_mapping_map_statistics_empty() {
549        // Create schemas
550        let table_schema = Arc::new(Schema::new(vec![
551            Field::new("a", DataType::Int32, true),
552            Field::new("b", DataType::Utf8, true),
553        ]));
554        let file_schema = Schema::new(vec![
555            Field::new("a", DataType::Int32, true),
556            Field::new("b", DataType::Utf8, true),
557        ]);
558
559        let adapter = DefaultSchemaAdapter {
560            projected_table_schema: Arc::clone(&table_schema),
561        };
562        let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
563
564        // Empty file statistics
565        let file_stats = Statistics::default();
566        let table_col_stats = mapper
567            .map_column_statistics(&file_stats.column_statistics)
568            .unwrap();
569
570        // All stats should be unknown
571        assert_eq!(table_col_stats.len(), 2);
572        assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
573        assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
574    }
575
576    #[test]
577    fn test_can_cast_field() {
578        // Same type should work
579        let from_field = Field::new("col", DataType::Int32, true);
580        let to_field = Field::new("col", DataType::Int32, true);
581        assert!(can_cast_field(&from_field, &to_field).unwrap());
582
583        // Casting Int32 to Float64 is allowed
584        let from_field = Field::new("col", DataType::Int32, true);
585        let to_field = Field::new("col", DataType::Float64, true);
586        assert!(can_cast_field(&from_field, &to_field).unwrap());
587
588        // Casting Float64 to Utf8 should work (converts to string)
589        let from_field = Field::new("col", DataType::Float64, true);
590        let to_field = Field::new("col", DataType::Utf8, true);
591        assert!(can_cast_field(&from_field, &to_field).unwrap());
592
593        // Binary to Utf8 is not supported - this is an example of a cast that should fail
594        // Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast
595        let from_field = Field::new("col", DataType::Binary, true);
596        let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
597        let result = can_cast_field(&from_field, &to_field);
598        assert!(result.is_err());
599        let error_msg = result.unwrap_err().to_string();
600        assert!(error_msg.contains("Cannot cast file schema field col"));
601    }
602
603    #[test]
604    fn test_create_field_mapping() {
605        // Define the table schema
606        let table_schema = Arc::new(Schema::new(vec![
607            Field::new("a", DataType::Int32, true),
608            Field::new("b", DataType::Utf8, true),
609            Field::new("c", DataType::Float64, true),
610        ]));
611
612        // Define file schema: different order, missing column c, and b has different type
613        let file_schema = Schema::new(vec![
614            Field::new("b", DataType::Float64, true), // Different type but castable to Utf8
615            Field::new("a", DataType::Int32, true),   // Same type
616            Field::new("d", DataType::Boolean, true), // Not in table schema
617        ]);
618
619        // Custom can_map_field function that allows all mappings for testing
620        let allow_all = |_: &Field, _: &Field| Ok(true);
621
622        // Test field mapping
623        let (field_mappings, projection) =
624            create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
625
626        // Expected:
627        // - field_mappings[0] (a) maps to projection[1]
628        // - field_mappings[1] (b) maps to projection[0]
629        // - field_mappings[2] (c) is None (not in file)
630        assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
631        assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a
632
633        // Test with a failing mapper
634        let fails_all = |_: &Field, _: &Field| Ok(false);
635        let (field_mappings, projection) =
636            create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
637
638        // Should have no mappings or projections if all cast checks fail
639        assert_eq!(field_mappings, vec![None, None, None]);
640        assert_eq!(projection, Vec::<usize>::new());
641
642        // Test with error-producing mapper
643        let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
644        let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
645        assert!(result.is_err());
646        assert!(result.unwrap_err().to_string().contains("Test error"));
647    }
648
649    #[test]
650    fn test_schema_mapping_new() {
651        // Define the projected table schema
652        let projected_schema = Arc::new(Schema::new(vec![
653            Field::new("a", DataType::Int32, true),
654            Field::new("b", DataType::Utf8, true),
655        ]));
656
657        // Define field mappings from table to file
658        let field_mappings = vec![Some(1), Some(0)];
659
660        // Create SchemaMapping manually
661        let mapping = SchemaMapping::new(
662            Arc::clone(&projected_schema),
663            field_mappings.clone(),
664            Arc::new(
665                |array: &ArrayRef, field: &Field, opts: &arrow::compute::CastOptions| {
666                    cast_column(array, field, opts)
667                },
668            ),
669        );
670
671        // Check that fields were set correctly
672        assert_eq!(*mapping.projected_table_schema, *projected_schema);
673        assert_eq!(mapping.field_mappings, field_mappings);
674
675        // Test with a batch to ensure it works properly
676        let batch = RecordBatch::try_new(
677            Arc::new(Schema::new(vec![
678                Field::new("b_file", DataType::Utf8, true),
679                Field::new("a_file", DataType::Int32, true),
680            ])),
681            vec![
682                Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
683                Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
684            ],
685        )
686        .unwrap();
687
688        // Test that map_batch works with our manually created mapping
689        let mapped_batch = mapping.map_batch(batch).unwrap();
690
691        // Verify the mapped batch has the correct schema and data
692        assert_eq!(*mapped_batch.schema(), *projected_schema);
693        assert_eq!(mapped_batch.num_columns(), 2);
694        assert_eq!(mapped_batch.column(0).len(), 2); // a column
695        assert_eq!(mapped_batch.column(1).len(), 2); // b column
696    }
697
698    #[test]
699    fn test_map_schema_error_path() {
700        // Define the table schema
701        let table_schema = Arc::new(Schema::new(vec![
702            Field::new("a", DataType::Int32, true),
703            Field::new("b", DataType::Utf8, true),
704            Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules
705        ]));
706
707        // Define file schema with incompatible type for column c
708        let file_schema = Schema::new(vec![
709            Field::new("a", DataType::Int32, true),
710            Field::new("b", DataType::Float64, true), // Different but castable
711            Field::new("c", DataType::Binary, true),  // Not castable to Decimal128
712        ]);
713
714        // Create DefaultSchemaAdapter
715        let adapter = DefaultSchemaAdapter {
716            projected_table_schema: Arc::clone(&table_schema),
717        };
718
719        // map_schema should error due to incompatible types
720        let result = adapter.map_schema(&file_schema);
721        assert!(result.is_err());
722        let error_msg = result.unwrap_err().to_string();
723        assert!(error_msg.contains("Cannot cast file schema field c"));
724    }
725
726    #[test]
727    fn test_map_schema_happy_path() {
728        // Define the table schema
729        let table_schema = Arc::new(Schema::new(vec![
730            Field::new("a", DataType::Int32, true),
731            Field::new("b", DataType::Utf8, true),
732            Field::new("c", DataType::Decimal128(10, 2), true),
733        ]));
734
735        // Create DefaultSchemaAdapter
736        let adapter = DefaultSchemaAdapter {
737            projected_table_schema: Arc::clone(&table_schema),
738        };
739
740        // Define compatible file schema (missing column c)
741        let compatible_file_schema = Schema::new(vec![
742            Field::new("a", DataType::Int64, true), // Can be cast to Int32
743            Field::new("b", DataType::Float64, true), // Can be cast to Utf8
744        ]);
745
746        // Test successful schema mapping
747        let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
748
749        // Verify field_mappings and projection created correctly
750        assert_eq!(projection, vec![0, 1]); // Projecting a and b
751
752        // Verify the SchemaMapping works with actual data
753        let file_batch = RecordBatch::try_new(
754            Arc::new(compatible_file_schema.clone()),
755            vec![
756                Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
757                Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
758            ],
759        )
760        .unwrap();
761
762        let mapped_batch = mapper.map_batch(file_batch).unwrap();
763
764        // Verify correct schema mapping
765        assert_eq!(*mapped_batch.schema(), *table_schema);
766        assert_eq!(mapped_batch.num_columns(), 3); // a, b, c
767
768        // Column c should be null since it wasn't in the file schema
769        let c_array = mapped_batch.column(2);
770        assert_eq!(c_array.len(), 2);
771        assert_eq!(c_array.null_count(), 2);
772    }
773
774    #[test]
775    fn test_adapt_struct_with_added_nested_fields() -> Result<()> {
776        let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
777        let batch = create_test_batch_with_struct_data(&file_schema)?;
778
779        let adapter = DefaultSchemaAdapter {
780            projected_table_schema: Arc::clone(&table_schema),
781        };
782        let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
783        let mapped_batch = mapper.map_batch(batch)?;
784
785        verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?;
786        Ok(())
787    }
788
789    #[test]
790    fn test_map_column_statistics_struct() -> Result<()> {
791        let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
792
793        let adapter = DefaultSchemaAdapter {
794            projected_table_schema: Arc::clone(&table_schema),
795        };
796        let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
797
798        let file_stats = vec![
799            create_test_column_statistics(
800                0,
801                100,
802                Some(ScalarValue::Int32(Some(1))),
803                Some(ScalarValue::Int32(Some(100))),
804                Some(ScalarValue::Int32(Some(5100))),
805            ),
806            create_test_column_statistics(10, 50, None, None, None),
807        ];
808
809        let table_stats = mapper.map_column_statistics(&file_stats)?;
810        assert_eq!(table_stats.len(), 1);
811        verify_column_statistics(
812            &table_stats[0],
813            Some(0),
814            Some(100),
815            Some(ScalarValue::Int32(Some(1))),
816            Some(ScalarValue::Int32(Some(100))),
817            Some(ScalarValue::Int32(Some(5100))),
818        );
819        let missing_stats = mapper.map_column_statistics(&[])?;
820        assert_eq!(missing_stats.len(), 1);
821        assert_eq!(missing_stats[0], ColumnStatistics::new_unknown());
822        Ok(())
823    }
824
825    fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) {
826        let file_schema = Arc::new(Schema::new(vec![Field::new(
827            "info",
828            DataType::Struct(
829                vec![
830                    Field::new("location", DataType::Utf8, true),
831                    Field::new(
832                        "timestamp_utc",
833                        DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
834                        true,
835                    ),
836                ]
837                .into(),
838            ),
839            true,
840        )]));
841
842        let table_schema = Arc::new(Schema::new(vec![Field::new(
843            "info",
844            DataType::Struct(
845                vec![
846                    Field::new("location", DataType::Utf8, true),
847                    Field::new(
848                        "timestamp_utc",
849                        DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
850                        true,
851                    ),
852                    Field::new(
853                        "reason",
854                        DataType::Struct(
855                            vec![
856                                Field::new("_level", DataType::Float64, true),
857                                Field::new(
858                                    "details",
859                                    DataType::Struct(
860                                        vec![
861                                            Field::new("rurl", DataType::Utf8, true),
862                                            Field::new("s", DataType::Float64, true),
863                                            Field::new("t", DataType::Utf8, true),
864                                        ]
865                                        .into(),
866                                    ),
867                                    true,
868                                ),
869                            ]
870                            .into(),
871                        ),
872                        true,
873                    ),
874                ]
875                .into(),
876            ),
877            true,
878        )]));
879
880        (file_schema, table_schema)
881    }
882
883    fn create_test_batch_with_struct_data(
884        file_schema: &SchemaRef,
885    ) -> Result<RecordBatch> {
886        let mut location_builder = StringBuilder::new();
887        location_builder.append_value("San Francisco");
888        location_builder.append_value("New York");
889
890        let timestamp_array = TimestampMillisecondArray::from(vec![
891            Some(1640995200000),
892            Some(1641081600000),
893        ]);
894
895        let timestamp_type =
896            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()));
897        let timestamp_array = cast(&timestamp_array, &timestamp_type)?;
898
899        let info_struct = StructArray::from(vec![
900            (
901                Arc::new(Field::new("location", DataType::Utf8, true)),
902                Arc::new(location_builder.finish()) as ArrayRef,
903            ),
904            (
905                Arc::new(Field::new("timestamp_utc", timestamp_type, true)),
906                timestamp_array,
907            ),
908        ]);
909
910        Ok(RecordBatch::try_new(
911            Arc::clone(file_schema),
912            vec![Arc::new(info_struct)],
913        )?)
914    }
915
916    fn verify_adapted_batch_with_nested_fields(
917        mapped_batch: &RecordBatch,
918        table_schema: &SchemaRef,
919    ) -> Result<()> {
920        assert_eq!(mapped_batch.schema(), *table_schema);
921        assert_eq!(mapped_batch.num_rows(), 2);
922
923        let info_col = mapped_batch.column(0);
924        let info_array = info_col
925            .as_any()
926            .downcast_ref::<StructArray>()
927            .expect("Expected info column to be a StructArray");
928
929        verify_preserved_fields(info_array)?;
930        verify_reason_field_structure(info_array)?;
931        Ok(())
932    }
933
934    fn verify_preserved_fields(info_array: &StructArray) -> Result<()> {
935        let location_col = info_array
936            .column_by_name("location")
937            .expect("Expected location field in struct");
938        let location_array = location_col
939            .as_any()
940            .downcast_ref::<arrow::array::StringArray>()
941            .expect("Expected location to be a StringArray");
942        assert_eq!(location_array.value(0), "San Francisco");
943        assert_eq!(location_array.value(1), "New York");
944
945        let timestamp_col = info_array
946            .column_by_name("timestamp_utc")
947            .expect("Expected timestamp_utc field in struct");
948        let timestamp_array = timestamp_col
949            .as_any()
950            .downcast_ref::<TimestampMillisecondArray>()
951            .expect("Expected timestamp_utc to be a TimestampMillisecondArray");
952        assert_eq!(timestamp_array.value(0), 1640995200000);
953        assert_eq!(timestamp_array.value(1), 1641081600000);
954        Ok(())
955    }
956
957    fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> {
958        let reason_col = info_array
959            .column_by_name("reason")
960            .expect("Expected reason field in struct");
961        let reason_array = reason_col
962            .as_any()
963            .downcast_ref::<StructArray>()
964            .expect("Expected reason to be a StructArray");
965        assert_eq!(reason_array.fields().len(), 2);
966        assert!(reason_array.column_by_name("_level").is_some());
967        assert!(reason_array.column_by_name("details").is_some());
968
969        let details_col = reason_array
970            .column_by_name("details")
971            .expect("Expected details field in reason struct");
972        let details_array = details_col
973            .as_any()
974            .downcast_ref::<StructArray>()
975            .expect("Expected details to be a StructArray");
976        assert_eq!(details_array.fields().len(), 3);
977        assert!(details_array.column_by_name("rurl").is_some());
978        assert!(details_array.column_by_name("s").is_some());
979        assert!(details_array.column_by_name("t").is_some());
980        for i in 0..2 {
981            assert!(reason_array.is_null(i), "reason field should be null");
982        }
983        Ok(())
984    }
985
986    fn verify_column_statistics(
987        stats: &ColumnStatistics,
988        expected_null_count: Option<usize>,
989        expected_distinct_count: Option<usize>,
990        expected_min: Option<ScalarValue>,
991        expected_max: Option<ScalarValue>,
992        expected_sum: Option<ScalarValue>,
993    ) {
994        if let Some(count) = expected_null_count {
995            assert_eq!(
996                stats.null_count,
997                Precision::Exact(count),
998                "Null count should match expected value"
999            );
1000        }
1001        if let Some(count) = expected_distinct_count {
1002            assert_eq!(
1003                stats.distinct_count,
1004                Precision::Exact(count),
1005                "Distinct count should match expected value"
1006            );
1007        }
1008        if let Some(min) = expected_min {
1009            assert_eq!(
1010                stats.min_value,
1011                Precision::Exact(min),
1012                "Min value should match expected value"
1013            );
1014        }
1015        if let Some(max) = expected_max {
1016            assert_eq!(
1017                stats.max_value,
1018                Precision::Exact(max),
1019                "Max value should match expected value"
1020            );
1021        }
1022        if let Some(sum) = expected_sum {
1023            assert_eq!(
1024                stats.sum_value,
1025                Precision::Exact(sum),
1026                "Sum value should match expected value"
1027            );
1028        }
1029    }
1030
1031    fn create_test_column_statistics(
1032        null_count: usize,
1033        distinct_count: usize,
1034        min_value: Option<ScalarValue>,
1035        max_value: Option<ScalarValue>,
1036        sum_value: Option<ScalarValue>,
1037    ) -> ColumnStatistics {
1038        ColumnStatistics {
1039            null_count: Precision::Exact(null_count),
1040            distinct_count: Precision::Exact(distinct_count),
1041            min_value: min_value.map_or_else(|| Precision::Absent, Precision::Exact),
1042            max_value: max_value.map_or_else(|| Precision::Absent, Precision::Exact),
1043            sum_value: sum_value.map_or_else(|| Precision::Absent, Precision::Exact),
1044        }
1045    }
1046}