datafusion_datasource/
schema_adapter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
19//!
20//! Adapter provides a method of translating the RecordBatches that come out of the
21//! physical format into how they should be used by DataFusion.  For instance, a schema
22//! can be stored external to a parquet file that maps parquet logical types to arrow types.
23
24use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
25use arrow::compute::{can_cast_types, cast};
26use arrow::datatypes::{Field, Schema, SchemaRef};
27use datafusion_common::{plan_err, ColumnStatistics};
28use std::fmt::Debug;
29use std::sync::Arc;
30
31/// Factory for creating [`SchemaAdapter`]
32///
33/// This interface provides a way to implement custom schema adaptation logic
34/// for DataSourceExec (for example, to fill missing columns with default value
35/// other than null).
36///
37/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
38/// more details and examples.
39pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
40    /// Create a [`SchemaAdapter`]
41    ///
42    /// Arguments:
43    ///
44    /// * `projected_table_schema`: The schema for the table, projected to
45    ///   include only the fields being output (projected) by the this mapping.
46    ///
47    /// * `table_schema`: The entire table schema for the table
48    fn create(
49        &self,
50        projected_table_schema: SchemaRef,
51        table_schema: SchemaRef,
52    ) -> Box<dyn SchemaAdapter>;
53}
54
55/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
56/// schema, which may have a schema obtained from merging multiple file-level
57/// schemas.
58///
59/// This is useful for implementing schema evolution in partitioned datasets.
60///
61/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
62pub trait SchemaAdapter: Send + Sync {
63    /// Map a column index in the table schema to a column index in a particular
64    /// file schema
65    ///
66    /// This is used while reading a file to push down projections by mapping
67    /// projected column indexes from the table schema to the file schema
68    ///
69    /// Panics if index is not in range for the table schema
70    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
71
72    /// Creates a mapping for casting columns from the file schema to the table
73    /// schema.
74    ///
75    /// This is used after reading a record batch. The returned [`SchemaMapper`]:
76    ///
77    /// 1. Maps columns to the expected columns indexes
78    /// 2. Handles missing values (e.g. fills nulls or a default value) for
79    ///    columns in the in the table schema not in the file schema
80    /// 2. Handles different types: if the column in the file schema has a
81    ///    different type than `table_schema`, the mapper will resolve this
82    ///    difference (e.g. by casting to the appropriate type)
83    ///
84    /// Returns:
85    /// * a [`SchemaMapper`]
86    /// * an ordered list of columns to project from the file
87    fn map_schema(
88        &self,
89        file_schema: &Schema,
90    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
91}
92
93/// Maps, columns from a specific file schema to the table schema.
94///
95/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
96pub trait SchemaMapper: Debug + Send + Sync {
97    /// Adapts a `RecordBatch` to match the `table_schema`
98    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
99
100    /// Adapts file-level column `Statistics` to match the `table_schema`
101    fn map_column_statistics(
102        &self,
103        file_col_statistics: &[ColumnStatistics],
104    ) -> datafusion_common::Result<Vec<ColumnStatistics>>;
105}
106
107/// Default  [`SchemaAdapterFactory`] for mapping schemas.
108///
109/// This can be used to adapt file-level record batches to a table schema and
110/// implement schema evolution.
111///
112/// Given an input file schema and a table schema, this factory returns
113/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
114///
115/// 1. Reorder columns
116/// 2. Cast columns to the correct type
117/// 3. Fill missing columns with nulls
118///
119/// # Errors:
120///
121/// * If a column in the table schema is non-nullable but is not present in the
122///   file schema (i.e. it is missing), the returned mapper tries to fill it with
123///   nulls resulting in a schema error.
124///
125/// # Illustration of Schema Mapping
126///
127/// ```text
128/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
129///  ┌───────┐   ┌───────┐ │                  ┌───────┐   ┌───────┐   ┌───────┐ │
130/// ││  1.0  │   │ "foo" │                   ││ NULL  │   │ "foo" │   │ "1.0" │
131///  ├───────┤   ├───────┤ │ Schema mapping   ├───────┤   ├───────┤   ├───────┤ │
132/// ││  2.0  │   │ "bar" │                   ││  NULL │   │ "bar" │   │ "2.0" │
133///  └───────┘   └───────┘ │────────────────▶ └───────┘   └───────┘   └───────┘ │
134/// │                                        │
135///  column "c"  column "b"│                  column "a"  column "b"  column "c"│
136/// │ Float64       Utf8                     │  Int32        Utf8        Utf8
137///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
138///     Input Record Batch                         Output Record Batch
139///
140///     Schema {                                   Schema {
141///      "c": Float64,                              "a": Int32,
142///      "b": Utf8,                                 "b": Utf8,
143///     }                                           "c": Utf8,
144///                                                }
145/// ```
146///
147/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
148///
149/// Note `SchemaMapping` also supports mapping partial batches, which is used as
150/// part of predicate pushdown.
151///
152/// ```
153/// # use std::sync::Arc;
154/// # use arrow::datatypes::{DataType, Field, Schema};
155/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
156/// # use datafusion_common::record_batch;
157/// // Table has fields "a",  "b" and "c"
158/// let table_schema = Schema::new(vec![
159///     Field::new("a", DataType::Int32, true),
160///     Field::new("b", DataType::Utf8, true),
161///     Field::new("c", DataType::Utf8, true),
162/// ]);
163///
164/// // create an adapter to map the table schema to the file schema
165/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
166///
167/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
168/// // instead of 'Utf8'
169/// let file_schema = Schema::new(vec![
170///    Field::new("c", DataType::Utf8, true),
171///    Field::new("b", DataType::Float64, true),
172/// ]);
173///
174/// // Get a mapping from the file schema to the table schema
175/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
176///
177/// let file_batch = record_batch!(
178///     ("c", Utf8, vec!["foo", "bar"]),
179///     ("b", Float64, vec![1.0, 2.0])
180/// ).unwrap();
181///
182/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
183///
184/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
185/// let expected_batch = record_batch!(
186///    ("a", Int32, vec![None, None]),  // missing column filled with nulls
187///    ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
188///    ("c", Utf8, vec!["foo", "bar"])
189/// ).unwrap();
190/// assert_eq!(mapped_batch, expected_batch);
191/// ```
192#[derive(Clone, Debug, Default)]
193pub struct DefaultSchemaAdapterFactory;
194
195impl DefaultSchemaAdapterFactory {
196    /// Create a new factory for mapping batches from a file schema to a table
197    /// schema.
198    ///
199    /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
200    /// the same schema for both the projected table schema and the table
201    /// schema.
202    pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
203        Self.create(Arc::clone(&table_schema), table_schema)
204    }
205}
206
207impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
208    fn create(
209        &self,
210        projected_table_schema: SchemaRef,
211        _table_schema: SchemaRef,
212    ) -> Box<dyn SchemaAdapter> {
213        Box::new(DefaultSchemaAdapter {
214            projected_table_schema,
215        })
216    }
217}
218
219/// This SchemaAdapter requires both the table schema and the projected table
220/// schema. See  [`SchemaMapping`] for more details
221#[derive(Clone, Debug)]
222pub(crate) struct DefaultSchemaAdapter {
223    /// The schema for the table, projected to include only the fields being output (projected) by the
224    /// associated ParquetSource
225    projected_table_schema: SchemaRef,
226}
227
228/// Checks if a file field can be cast to a table field
229///
230/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible
231pub(crate) fn can_cast_field(
232    file_field: &Field,
233    table_field: &Field,
234) -> datafusion_common::Result<bool> {
235    if can_cast_types(file_field.data_type(), table_field.data_type()) {
236        Ok(true)
237    } else {
238        plan_err!(
239            "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
240            file_field.name(),
241            file_field.data_type(),
242            table_field.data_type()
243        )
244    }
245}
246
247impl SchemaAdapter for DefaultSchemaAdapter {
248    /// Map a column index in the table schema to a column index in a particular
249    /// file schema
250    ///
251    /// Panics if index is not in range for the table schema
252    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
253        let field = self.projected_table_schema.field(index);
254        Some(file_schema.fields.find(field.name())?.0)
255    }
256
257    /// Creates a `SchemaMapping` for casting or mapping the columns from the
258    /// file schema to the table schema.
259    ///
260    /// If the provided `file_schema` contains columns of a different type to
261    /// the expected `table_schema`, the method will attempt to cast the array
262    /// data from the file schema to the table schema where possible.
263    ///
264    /// Returns a [`SchemaMapping`] that can be applied to the output batch
265    /// along with an ordered list of columns to project from the file
266    fn map_schema(
267        &self,
268        file_schema: &Schema,
269    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
270        let (field_mappings, projection) = create_field_mapping(
271            file_schema,
272            &self.projected_table_schema,
273            can_cast_field,
274        )?;
275
276        Ok((
277            Arc::new(SchemaMapping::new(
278                Arc::clone(&self.projected_table_schema),
279                field_mappings,
280            )),
281            projection,
282        ))
283    }
284}
285
286/// Helper function that creates field mappings between file schema and table schema
287///
288/// Maps columns from the file schema to their corresponding positions in the table schema,
289/// applying type compatibility checking via the provided predicate function.
290///
291/// Returns field mappings (for column reordering) and a projection (for field selection).
292pub(crate) fn create_field_mapping<F>(
293    file_schema: &Schema,
294    projected_table_schema: &SchemaRef,
295    can_map_field: F,
296) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
297where
298    F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
299{
300    let mut projection = Vec::with_capacity(file_schema.fields().len());
301    let mut field_mappings = vec![None; projected_table_schema.fields().len()];
302
303    for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
304        if let Some((table_idx, table_field)) =
305            projected_table_schema.fields().find(file_field.name())
306        {
307            if can_map_field(file_field, table_field)? {
308                field_mappings[table_idx] = Some(projection.len());
309                projection.push(file_idx);
310            }
311        }
312    }
313
314    Ok((field_mappings, projection))
315}
316
317/// The SchemaMapping struct holds a mapping from the file schema to the table
318/// schema and any necessary type conversions.
319///
320/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
321/// has the projected schema, since that's the schema which is supposed to come
322/// out of the execution of this query. Thus `map_batch` uses
323/// `projected_table_schema` as it can only operate on the projected fields.
324///
325/// [`map_batch`]: Self::map_batch
326#[derive(Debug)]
327pub struct SchemaMapping {
328    /// The schema of the table. This is the expected schema after conversion
329    /// and it should match the schema of the query result.
330    projected_table_schema: SchemaRef,
331    /// Mapping from field index in `projected_table_schema` to index in
332    /// projected file_schema.
333    ///
334    /// They are Options instead of just plain `usize`s because the table could
335    /// have fields that don't exist in the file.
336    field_mappings: Vec<Option<usize>>,
337}
338
339impl SchemaMapping {
340    /// Creates a new SchemaMapping instance
341    ///
342    /// Initializes the field mappings needed to transform file data to the projected table schema
343    pub fn new(
344        projected_table_schema: SchemaRef,
345        field_mappings: Vec<Option<usize>>,
346    ) -> Self {
347        Self {
348            projected_table_schema,
349            field_mappings,
350        }
351    }
352}
353
354impl SchemaMapper for SchemaMapping {
355    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
356    /// conversions.
357    /// The produced RecordBatch has a schema that contains only the projected columns.
358    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
359        let batch_rows = batch.num_rows();
360        let batch_cols = batch.columns().to_vec();
361
362        let cols = self
363            .projected_table_schema
364            // go through each field in the projected schema
365            .fields()
366            .iter()
367            // and zip it with the index that maps fields from the projected table schema to the
368            // projected file schema in `batch`
369            .zip(&self.field_mappings)
370            // and for each one...
371            .map(|(field, file_idx)| {
372                file_idx.map_or_else(
373                    // If this field only exists in the table, and not in the file, then we know
374                    // that it's null, so just return that.
375                    || Ok(new_null_array(field.data_type(), batch_rows)),
376                    // However, if it does exist in both, then try to cast it to the correct output
377                    // type
378                    |batch_idx| cast(&batch_cols[batch_idx], field.data_type()),
379                )
380            })
381            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
382
383        // Necessary to handle empty batches
384        let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
385
386        let schema = Arc::clone(&self.projected_table_schema);
387        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
388        Ok(record_batch)
389    }
390
391    /// Adapts file-level column `Statistics` to match the `table_schema`
392    fn map_column_statistics(
393        &self,
394        file_col_statistics: &[ColumnStatistics],
395    ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
396        let mut table_col_statistics = vec![];
397
398        // Map the statistics for each field in the file schema to the corresponding field in the
399        // table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown`
400        for (_, file_col_idx) in self
401            .projected_table_schema
402            .fields()
403            .iter()
404            .zip(&self.field_mappings)
405        {
406            if let Some(file_col_idx) = file_col_idx {
407                table_col_statistics.push(
408                    file_col_statistics
409                        .get(*file_col_idx)
410                        .cloned()
411                        .unwrap_or_default(),
412                );
413            } else {
414                table_col_statistics.push(ColumnStatistics::new_unknown());
415            }
416        }
417
418        Ok(table_col_statistics)
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use arrow::datatypes::{DataType, Field};
425    use datafusion_common::{stats::Precision, Statistics};
426
427    use super::*;
428
429    #[test]
430    fn test_schema_mapping_map_statistics_basic() {
431        // Create table schema (a, b, c)
432        let table_schema = Arc::new(Schema::new(vec![
433            Field::new("a", DataType::Int32, true),
434            Field::new("b", DataType::Utf8, true),
435            Field::new("c", DataType::Float64, true),
436        ]));
437
438        // Create file schema (b, a) - different order, missing c
439        let file_schema = Schema::new(vec![
440            Field::new("b", DataType::Utf8, true),
441            Field::new("a", DataType::Int32, true),
442        ]);
443
444        // Create SchemaAdapter
445        let adapter = DefaultSchemaAdapter {
446            projected_table_schema: Arc::clone(&table_schema),
447        };
448
449        // Get mapper and projection
450        let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
451
452        // Should project columns 0,1 from file
453        assert_eq!(projection, vec![0, 1]);
454
455        // Create file statistics
456        let mut file_stats = Statistics::default();
457
458        // Statistics for column b (index 0 in file)
459        let b_stats = ColumnStatistics {
460            null_count: Precision::Exact(5),
461            ..Default::default()
462        };
463
464        // Statistics for column a (index 1 in file)
465        let a_stats = ColumnStatistics {
466            null_count: Precision::Exact(10),
467            ..Default::default()
468        };
469
470        file_stats.column_statistics = vec![b_stats, a_stats];
471
472        // Map statistics
473        let table_col_stats = mapper
474            .map_column_statistics(&file_stats.column_statistics)
475            .unwrap();
476
477        // Verify stats
478        assert_eq!(table_col_stats.len(), 3);
479        assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1
480        assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0
481        assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown)
482    }
483
484    #[test]
485    fn test_schema_mapping_map_statistics_empty() {
486        // Create schemas
487        let table_schema = Arc::new(Schema::new(vec![
488            Field::new("a", DataType::Int32, true),
489            Field::new("b", DataType::Utf8, true),
490        ]));
491        let file_schema = Schema::new(vec![
492            Field::new("a", DataType::Int32, true),
493            Field::new("b", DataType::Utf8, true),
494        ]);
495
496        let adapter = DefaultSchemaAdapter {
497            projected_table_schema: Arc::clone(&table_schema),
498        };
499        let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
500
501        // Empty file statistics
502        let file_stats = Statistics::default();
503        let table_col_stats = mapper
504            .map_column_statistics(&file_stats.column_statistics)
505            .unwrap();
506
507        // All stats should be unknown
508        assert_eq!(table_col_stats.len(), 2);
509        assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
510        assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
511    }
512
513    #[test]
514    fn test_can_cast_field() {
515        // Same type should work
516        let from_field = Field::new("col", DataType::Int32, true);
517        let to_field = Field::new("col", DataType::Int32, true);
518        assert!(can_cast_field(&from_field, &to_field).unwrap());
519
520        // Casting Int32 to Float64 is allowed
521        let from_field = Field::new("col", DataType::Int32, true);
522        let to_field = Field::new("col", DataType::Float64, true);
523        assert!(can_cast_field(&from_field, &to_field).unwrap());
524
525        // Casting Float64 to Utf8 should work (converts to string)
526        let from_field = Field::new("col", DataType::Float64, true);
527        let to_field = Field::new("col", DataType::Utf8, true);
528        assert!(can_cast_field(&from_field, &to_field).unwrap());
529
530        // Binary to Utf8 is not supported - this is an example of a cast that should fail
531        // Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast
532        let from_field = Field::new("col", DataType::Binary, true);
533        let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
534        let result = can_cast_field(&from_field, &to_field);
535        assert!(result.is_err());
536        let error_msg = result.unwrap_err().to_string();
537        assert!(error_msg.contains("Cannot cast file schema field col"));
538    }
539
540    #[test]
541    fn test_create_field_mapping() {
542        // Define the table schema
543        let table_schema = Arc::new(Schema::new(vec![
544            Field::new("a", DataType::Int32, true),
545            Field::new("b", DataType::Utf8, true),
546            Field::new("c", DataType::Float64, true),
547        ]));
548
549        // Define file schema: different order, missing column c, and b has different type
550        let file_schema = Schema::new(vec![
551            Field::new("b", DataType::Float64, true), // Different type but castable to Utf8
552            Field::new("a", DataType::Int32, true),   // Same type
553            Field::new("d", DataType::Boolean, true), // Not in table schema
554        ]);
555
556        // Custom can_map_field function that allows all mappings for testing
557        let allow_all = |_: &Field, _: &Field| Ok(true);
558
559        // Test field mapping
560        let (field_mappings, projection) =
561            create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
562
563        // Expected:
564        // - field_mappings[0] (a) maps to projection[1]
565        // - field_mappings[1] (b) maps to projection[0]
566        // - field_mappings[2] (c) is None (not in file)
567        assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
568        assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a
569
570        // Test with a failing mapper
571        let fails_all = |_: &Field, _: &Field| Ok(false);
572        let (field_mappings, projection) =
573            create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
574
575        // Should have no mappings or projections if all cast checks fail
576        assert_eq!(field_mappings, vec![None, None, None]);
577        assert_eq!(projection, Vec::<usize>::new());
578
579        // Test with error-producing mapper
580        let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
581        let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
582        assert!(result.is_err());
583        assert!(result.unwrap_err().to_string().contains("Test error"));
584    }
585
586    #[test]
587    fn test_schema_mapping_new() {
588        // Define the projected table schema
589        let projected_schema = Arc::new(Schema::new(vec![
590            Field::new("a", DataType::Int32, true),
591            Field::new("b", DataType::Utf8, true),
592        ]));
593
594        // Define field mappings from table to file
595        let field_mappings = vec![Some(1), Some(0)];
596
597        // Create SchemaMapping manually
598        let mapping =
599            SchemaMapping::new(Arc::clone(&projected_schema), field_mappings.clone());
600
601        // Check that fields were set correctly
602        assert_eq!(*mapping.projected_table_schema, *projected_schema);
603        assert_eq!(mapping.field_mappings, field_mappings);
604
605        // Test with a batch to ensure it works properly
606        let batch = RecordBatch::try_new(
607            Arc::new(Schema::new(vec![
608                Field::new("b_file", DataType::Utf8, true),
609                Field::new("a_file", DataType::Int32, true),
610            ])),
611            vec![
612                Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
613                Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
614            ],
615        )
616        .unwrap();
617
618        // Test that map_batch works with our manually created mapping
619        let mapped_batch = mapping.map_batch(batch).unwrap();
620
621        // Verify the mapped batch has the correct schema and data
622        assert_eq!(*mapped_batch.schema(), *projected_schema);
623        assert_eq!(mapped_batch.num_columns(), 2);
624        assert_eq!(mapped_batch.column(0).len(), 2); // a column
625        assert_eq!(mapped_batch.column(1).len(), 2); // b column
626    }
627
628    #[test]
629    fn test_map_schema_error_path() {
630        // Define the table schema
631        let table_schema = Arc::new(Schema::new(vec![
632            Field::new("a", DataType::Int32, true),
633            Field::new("b", DataType::Utf8, true),
634            Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules
635        ]));
636
637        // Define file schema with incompatible type for column c
638        let file_schema = Schema::new(vec![
639            Field::new("a", DataType::Int32, true),
640            Field::new("b", DataType::Float64, true), // Different but castable
641            Field::new("c", DataType::Binary, true),  // Not castable to Decimal128
642        ]);
643
644        // Create DefaultSchemaAdapter
645        let adapter = DefaultSchemaAdapter {
646            projected_table_schema: Arc::clone(&table_schema),
647        };
648
649        // map_schema should error due to incompatible types
650        let result = adapter.map_schema(&file_schema);
651        assert!(result.is_err());
652        let error_msg = result.unwrap_err().to_string();
653        assert!(error_msg.contains("Cannot cast file schema field c"));
654    }
655
656    #[test]
657    fn test_map_schema_happy_path() {
658        // Define the table schema
659        let table_schema = Arc::new(Schema::new(vec![
660            Field::new("a", DataType::Int32, true),
661            Field::new("b", DataType::Utf8, true),
662            Field::new("c", DataType::Decimal128(10, 2), true),
663        ]));
664
665        // Create DefaultSchemaAdapter
666        let adapter = DefaultSchemaAdapter {
667            projected_table_schema: Arc::clone(&table_schema),
668        };
669
670        // Define compatible file schema (missing column c)
671        let compatible_file_schema = Schema::new(vec![
672            Field::new("a", DataType::Int64, true), // Can be cast to Int32
673            Field::new("b", DataType::Float64, true), // Can be cast to Utf8
674        ]);
675
676        // Test successful schema mapping
677        let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
678
679        // Verify field_mappings and projection created correctly
680        assert_eq!(projection, vec![0, 1]); // Projecting a and b
681
682        // Verify the SchemaMapping works with actual data
683        let file_batch = RecordBatch::try_new(
684            Arc::new(compatible_file_schema.clone()),
685            vec![
686                Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
687                Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
688            ],
689        )
690        .unwrap();
691
692        let mapped_batch = mapper.map_batch(file_batch).unwrap();
693
694        // Verify correct schema mapping
695        assert_eq!(*mapped_batch.schema(), *table_schema);
696        assert_eq!(mapped_batch.num_columns(), 3); // a, b, c
697
698        // Column c should be null since it wasn't in the file schema
699        let c_array = mapped_batch.column(2);
700        assert_eq!(c_array.len(), 2);
701        assert_eq!(c_array.null_count(), 2);
702    }
703}