datafusion_datasource/
schema_adapter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
19//!
20//! Adapter provides a method of translating the RecordBatches that come out of the
21//! physical format into how they should be used by DataFusion.  For instance, a schema
22//! can be stored external to a parquet file that maps parquet logical types to arrow types.
23use arrow::{
24    array::{new_null_array, ArrayRef, RecordBatch, RecordBatchOptions},
25    compute::can_cast_types,
26    datatypes::{DataType, Field, Schema, SchemaRef},
27};
28use datafusion_common::{
29    nested_struct::{cast_column, validate_struct_compatibility},
30    plan_err, ColumnStatistics,
31};
32use std::{fmt::Debug, sync::Arc};
33/// Function used by [`SchemaMapping`] to adapt a column from the file schema to
34/// the table schema.
35pub type CastColumnFn =
36    dyn Fn(&ArrayRef, &Field) -> datafusion_common::Result<ArrayRef> + Send + Sync;
37
38/// Factory for creating [`SchemaAdapter`]
39///
40/// This interface provides a way to implement custom schema adaptation logic
41/// for DataSourceExec (for example, to fill missing columns with default value
42/// other than null).
43///
44/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
45/// more details and examples.
46pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
47    /// Create a [`SchemaAdapter`]
48    ///
49    /// Arguments:
50    ///
51    /// * `projected_table_schema`: The schema for the table, projected to
52    ///   include only the fields being output (projected) by the this mapping.
53    ///
54    /// * `table_schema`: The entire table schema for the table
55    fn create(
56        &self,
57        projected_table_schema: SchemaRef,
58        table_schema: SchemaRef,
59    ) -> Box<dyn SchemaAdapter>;
60
61    /// Create a [`SchemaAdapter`] using only the projected table schema.
62    ///
63    /// This is a convenience method for cases where the table schema and the
64    /// projected table schema are the same.
65    fn create_with_projected_schema(
66        &self,
67        projected_table_schema: SchemaRef,
68    ) -> Box<dyn SchemaAdapter> {
69        self.create(Arc::clone(&projected_table_schema), projected_table_schema)
70    }
71}
72
73/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
74/// schema, which may have a schema obtained from merging multiple file-level
75/// schemas.
76///
77/// This is useful for implementing schema evolution in partitioned datasets.
78///
79/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
80pub trait SchemaAdapter: Send + Sync {
81    /// Map a column index in the table schema to a column index in a particular
82    /// file schema
83    ///
84    /// This is used while reading a file to push down projections by mapping
85    /// projected column indexes from the table schema to the file schema
86    ///
87    /// Panics if index is not in range for the table schema
88    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
89
90    /// Creates a mapping for casting columns from the file schema to the table
91    /// schema.
92    ///
93    /// This is used after reading a record batch. The returned [`SchemaMapper`]:
94    ///
95    /// 1. Maps columns to the expected columns indexes
96    /// 2. Handles missing values (e.g. fills nulls or a default value) for
97    ///    columns in the in the table schema not in the file schema
98    /// 2. Handles different types: if the column in the file schema has a
99    ///    different type than `table_schema`, the mapper will resolve this
100    ///    difference (e.g. by casting to the appropriate type)
101    ///
102    /// Returns:
103    /// * a [`SchemaMapper`]
104    /// * an ordered list of columns to project from the file
105    fn map_schema(
106        &self,
107        file_schema: &Schema,
108    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
109}
110
111/// Maps, columns from a specific file schema to the table schema.
112///
113/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
114pub trait SchemaMapper: Debug + Send + Sync {
115    /// Adapts a `RecordBatch` to match the `table_schema`
116    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
117
118    /// Adapts file-level column `Statistics` to match the `table_schema`
119    fn map_column_statistics(
120        &self,
121        file_col_statistics: &[ColumnStatistics],
122    ) -> datafusion_common::Result<Vec<ColumnStatistics>>;
123}
124
125/// Default  [`SchemaAdapterFactory`] for mapping schemas.
126///
127/// This can be used to adapt file-level record batches to a table schema and
128/// implement schema evolution.
129///
130/// Given an input file schema and a table schema, this factory returns
131/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
132///
133/// 1. Reorder columns
134/// 2. Cast columns to the correct type
135/// 3. Fill missing columns with nulls
136///
137/// # Errors:
138///
139/// * If a column in the table schema is non-nullable but is not present in the
140///   file schema (i.e. it is missing), the returned mapper tries to fill it with
141///   nulls resulting in a schema error.
142///
143/// # Illustration of Schema Mapping
144///
145/// ```text
146/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
147///  ┌───────┐   ┌───────┐ │                  ┌───────┐   ┌───────┐   ┌───────┐ │
148/// ││  1.0  │   │ "foo" │                   ││ NULL  │   │ "foo" │   │ "1.0" │
149///  ├───────┤   ├───────┤ │ Schema mapping   ├───────┤   ├───────┤   ├───────┤ │
150/// ││  2.0  │   │ "bar" │                   ││  NULL │   │ "bar" │   │ "2.0" │
151///  └───────┘   └───────┘ │────────────────▶ └───────┘   └───────┘   └───────┘ │
152/// │                                        │
153///  column "c"  column "b"│                  column "a"  column "b"  column "c"│
154/// │ Float64       Utf8                     │  Int32        Utf8        Utf8
155///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
156///     Input Record Batch                         Output Record Batch
157///
158///     Schema {                                   Schema {
159///      "c": Float64,                              "a": Int32,
160///      "b": Utf8,                                 "b": Utf8,
161///     }                                           "c": Utf8,
162///                                                }
163/// ```
164///
165/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
166///
167/// Note `SchemaMapping` also supports mapping partial batches, which is used as
168/// part of predicate pushdown.
169///
170/// ```
171/// # use std::sync::Arc;
172/// # use arrow::datatypes::{DataType, Field, Schema};
173/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
174/// # use datafusion_common::record_batch;
175/// // Table has fields "a",  "b" and "c"
176/// let table_schema = Schema::new(vec![
177///     Field::new("a", DataType::Int32, true),
178///     Field::new("b", DataType::Utf8, true),
179///     Field::new("c", DataType::Utf8, true),
180/// ]);
181///
182/// // create an adapter to map the table schema to the file schema
183/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
184///
185/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
186/// // instead of 'Utf8'
187/// let file_schema = Schema::new(vec![
188///    Field::new("c", DataType::Utf8, true),
189///    Field::new("b", DataType::Float64, true),
190/// ]);
191///
192/// // Get a mapping from the file schema to the table schema
193/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
194///
195/// let file_batch = record_batch!(
196///     ("c", Utf8, vec!["foo", "bar"]),
197///     ("b", Float64, vec![1.0, 2.0])
198/// ).unwrap();
199///
200/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
201///
202/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
203/// let expected_batch = record_batch!(
204///    ("a", Int32, vec![None, None]),  // missing column filled with nulls
205///    ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
206///    ("c", Utf8, vec!["foo", "bar"])
207/// ).unwrap();
208/// assert_eq!(mapped_batch, expected_batch);
209/// ```
210#[derive(Clone, Debug, Default)]
211pub struct DefaultSchemaAdapterFactory;
212
213impl DefaultSchemaAdapterFactory {
214    /// Create a new factory for mapping batches from a file schema to a table
215    /// schema.
216    ///
217    /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
218    /// the same schema for both the projected table schema and the table
219    /// schema.
220    pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
221        Self.create(Arc::clone(&table_schema), table_schema)
222    }
223}
224
225impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
226    fn create(
227        &self,
228        projected_table_schema: SchemaRef,
229        _table_schema: SchemaRef,
230    ) -> Box<dyn SchemaAdapter> {
231        Box::new(DefaultSchemaAdapter {
232            projected_table_schema,
233        })
234    }
235}
236
237/// This SchemaAdapter requires both the table schema and the projected table
238/// schema. See  [`SchemaMapping`] for more details
239#[derive(Clone, Debug)]
240pub(crate) struct DefaultSchemaAdapter {
241    /// The schema for the table, projected to include only the fields being output (projected) by the
242    /// associated ParquetSource
243    projected_table_schema: SchemaRef,
244}
245
246/// Checks if a file field can be cast to a table field
247///
248/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible
249pub(crate) fn can_cast_field(
250    file_field: &Field,
251    table_field: &Field,
252) -> datafusion_common::Result<bool> {
253    match (file_field.data_type(), table_field.data_type()) {
254        (DataType::Struct(source_fields), DataType::Struct(target_fields)) => {
255            validate_struct_compatibility(source_fields, target_fields)
256        }
257        _ => {
258            if can_cast_types(file_field.data_type(), table_field.data_type()) {
259                Ok(true)
260            } else {
261                plan_err!(
262                    "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
263                    file_field.name(),
264                    file_field.data_type(),
265                    table_field.data_type()
266                )
267            }
268        }
269    }
270}
271
272impl SchemaAdapter for DefaultSchemaAdapter {
273    /// Map a column index in the table schema to a column index in a particular
274    /// file schema
275    ///
276    /// Panics if index is not in range for the table schema
277    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
278        let field = self.projected_table_schema.field(index);
279        Some(file_schema.fields.find(field.name())?.0)
280    }
281
282    /// Creates a `SchemaMapping` for casting or mapping the columns from the
283    /// file schema to the table schema.
284    ///
285    /// If the provided `file_schema` contains columns of a different type to
286    /// the expected `table_schema`, the method will attempt to cast the array
287    /// data from the file schema to the table schema where possible.
288    ///
289    /// Returns a [`SchemaMapping`] that can be applied to the output batch
290    /// along with an ordered list of columns to project from the file
291    fn map_schema(
292        &self,
293        file_schema: &Schema,
294    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
295        let (field_mappings, projection) = create_field_mapping(
296            file_schema,
297            &self.projected_table_schema,
298            can_cast_field,
299        )?;
300
301        Ok((
302            Arc::new(SchemaMapping::new(
303                Arc::clone(&self.projected_table_schema),
304                field_mappings,
305                Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)),
306            )),
307            projection,
308        ))
309    }
310}
311
312/// Helper function that creates field mappings between file schema and table schema
313///
314/// Maps columns from the file schema to their corresponding positions in the table schema,
315/// applying type compatibility checking via the provided predicate function.
316///
317/// Returns field mappings (for column reordering) and a projection (for field selection).
318pub(crate) fn create_field_mapping<F>(
319    file_schema: &Schema,
320    projected_table_schema: &SchemaRef,
321    can_map_field: F,
322) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
323where
324    F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
325{
326    let mut projection = Vec::with_capacity(file_schema.fields().len());
327    let mut field_mappings = vec![None; projected_table_schema.fields().len()];
328
329    for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
330        if let Some((table_idx, table_field)) =
331            projected_table_schema.fields().find(file_field.name())
332        {
333            if can_map_field(file_field, table_field)? {
334                field_mappings[table_idx] = Some(projection.len());
335                projection.push(file_idx);
336            }
337        }
338    }
339
340    Ok((field_mappings, projection))
341}
342
343/// The SchemaMapping struct holds a mapping from the file schema to the table
344/// schema and any necessary type conversions.
345///
346/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
347/// has the projected schema, since that's the schema which is supposed to come
348/// out of the execution of this query. Thus `map_batch` uses
349/// `projected_table_schema` as it can only operate on the projected fields.
350///
351/// [`map_batch`]: Self::map_batch
352pub struct SchemaMapping {
353    /// The schema of the table. This is the expected schema after conversion
354    /// and it should match the schema of the query result.
355    projected_table_schema: SchemaRef,
356    /// Mapping from field index in `projected_table_schema` to index in
357    /// projected file_schema.
358    ///
359    /// They are Options instead of just plain `usize`s because the table could
360    /// have fields that don't exist in the file.
361    field_mappings: Vec<Option<usize>>,
362    /// Function used to adapt a column from the file schema to the table schema
363    /// when it exists in both schemas
364    cast_column: Arc<CastColumnFn>,
365}
366
367impl Debug for SchemaMapping {
368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369        f.debug_struct("SchemaMapping")
370            .field("projected_table_schema", &self.projected_table_schema)
371            .field("field_mappings", &self.field_mappings)
372            .field("cast_column", &"<fn>")
373            .finish()
374    }
375}
376
377impl SchemaMapping {
378    /// Creates a new SchemaMapping instance
379    ///
380    /// Initializes the field mappings needed to transform file data to the projected table schema
381    pub fn new(
382        projected_table_schema: SchemaRef,
383        field_mappings: Vec<Option<usize>>,
384        cast_column: Arc<CastColumnFn>,
385    ) -> Self {
386        Self {
387            projected_table_schema,
388            field_mappings,
389            cast_column,
390        }
391    }
392}
393
394impl SchemaMapper for SchemaMapping {
395    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
396    /// conversions.
397    /// The produced RecordBatch has a schema that contains only the projected columns.
398    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
399        let batch_rows = batch.num_rows();
400        let batch_cols = batch.columns().to_vec();
401
402        let cols = self
403            .projected_table_schema
404            // go through each field in the projected schema
405            .fields()
406            .iter()
407            // and zip it with the index that maps fields from the projected table schema to the
408            // projected file schema in `batch`
409            .zip(&self.field_mappings)
410            // and for each one...
411            .map(|(field, file_idx)| {
412                file_idx.map_or_else(
413                    // If this field only exists in the table, and not in the file, then we know
414                    // that it's null, so just return that.
415                    || Ok(new_null_array(field.data_type(), batch_rows)),
416                    // However, if it does exist in both, use the cast_column function
417                    // to perform any necessary conversions
418                    |batch_idx| (self.cast_column)(&batch_cols[batch_idx], field),
419                )
420            })
421            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
422
423        // Necessary to handle empty batches
424        let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
425
426        let schema = Arc::clone(&self.projected_table_schema);
427        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
428        Ok(record_batch)
429    }
430
431    /// Adapts file-level column `Statistics` to match the `table_schema`
432    fn map_column_statistics(
433        &self,
434        file_col_statistics: &[ColumnStatistics],
435    ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
436        let mut table_col_statistics = vec![];
437
438        // Map the statistics for each field in the file schema to the corresponding field in the
439        // table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown`
440        for (_, file_col_idx) in self
441            .projected_table_schema
442            .fields()
443            .iter()
444            .zip(&self.field_mappings)
445        {
446            if let Some(file_col_idx) = file_col_idx {
447                table_col_statistics.push(
448                    file_col_statistics
449                        .get(*file_col_idx)
450                        .cloned()
451                        .unwrap_or_default(),
452                );
453            } else {
454                table_col_statistics.push(ColumnStatistics::new_unknown());
455            }
456        }
457
458        Ok(table_col_statistics)
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use arrow::{
466        array::{Array, ArrayRef, StringBuilder, StructArray, TimestampMillisecondArray},
467        compute::cast,
468        datatypes::{DataType, Field, TimeUnit},
469        record_batch::RecordBatch,
470    };
471    use datafusion_common::{stats::Precision, Result, ScalarValue, Statistics};
472
473    #[test]
474    fn test_schema_mapping_map_statistics_basic() {
475        // Create table schema (a, b, c)
476        let table_schema = Arc::new(Schema::new(vec![
477            Field::new("a", DataType::Int32, true),
478            Field::new("b", DataType::Utf8, true),
479            Field::new("c", DataType::Float64, true),
480        ]));
481
482        // Create file schema (b, a) - different order, missing c
483        let file_schema = Schema::new(vec![
484            Field::new("b", DataType::Utf8, true),
485            Field::new("a", DataType::Int32, true),
486        ]);
487
488        // Create SchemaAdapter
489        let adapter = DefaultSchemaAdapter {
490            projected_table_schema: Arc::clone(&table_schema),
491        };
492
493        // Get mapper and projection
494        let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
495
496        // Should project columns 0,1 from file
497        assert_eq!(projection, vec![0, 1]);
498
499        // Create file statistics
500        let mut file_stats = Statistics::default();
501
502        // Statistics for column b (index 0 in file)
503        let b_stats = ColumnStatistics {
504            null_count: Precision::Exact(5),
505            ..Default::default()
506        };
507
508        // Statistics for column a (index 1 in file)
509        let a_stats = ColumnStatistics {
510            null_count: Precision::Exact(10),
511            ..Default::default()
512        };
513
514        file_stats.column_statistics = vec![b_stats, a_stats];
515
516        // Map statistics
517        let table_col_stats = mapper
518            .map_column_statistics(&file_stats.column_statistics)
519            .unwrap();
520
521        // Verify stats
522        assert_eq!(table_col_stats.len(), 3);
523        assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1
524        assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0
525        assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown)
526    }
527
528    #[test]
529    fn test_schema_mapping_map_statistics_empty() {
530        // Create schemas
531        let table_schema = Arc::new(Schema::new(vec![
532            Field::new("a", DataType::Int32, true),
533            Field::new("b", DataType::Utf8, true),
534        ]));
535        let file_schema = Schema::new(vec![
536            Field::new("a", DataType::Int32, true),
537            Field::new("b", DataType::Utf8, true),
538        ]);
539
540        let adapter = DefaultSchemaAdapter {
541            projected_table_schema: Arc::clone(&table_schema),
542        };
543        let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
544
545        // Empty file statistics
546        let file_stats = Statistics::default();
547        let table_col_stats = mapper
548            .map_column_statistics(&file_stats.column_statistics)
549            .unwrap();
550
551        // All stats should be unknown
552        assert_eq!(table_col_stats.len(), 2);
553        assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
554        assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
555    }
556
557    #[test]
558    fn test_can_cast_field() {
559        // Same type should work
560        let from_field = Field::new("col", DataType::Int32, true);
561        let to_field = Field::new("col", DataType::Int32, true);
562        assert!(can_cast_field(&from_field, &to_field).unwrap());
563
564        // Casting Int32 to Float64 is allowed
565        let from_field = Field::new("col", DataType::Int32, true);
566        let to_field = Field::new("col", DataType::Float64, true);
567        assert!(can_cast_field(&from_field, &to_field).unwrap());
568
569        // Casting Float64 to Utf8 should work (converts to string)
570        let from_field = Field::new("col", DataType::Float64, true);
571        let to_field = Field::new("col", DataType::Utf8, true);
572        assert!(can_cast_field(&from_field, &to_field).unwrap());
573
574        // Binary to Utf8 is not supported - this is an example of a cast that should fail
575        // Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast
576        let from_field = Field::new("col", DataType::Binary, true);
577        let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
578        let result = can_cast_field(&from_field, &to_field);
579        assert!(result.is_err());
580        let error_msg = result.unwrap_err().to_string();
581        assert!(error_msg.contains("Cannot cast file schema field col"));
582    }
583
584    #[test]
585    fn test_create_field_mapping() {
586        // Define the table schema
587        let table_schema = Arc::new(Schema::new(vec![
588            Field::new("a", DataType::Int32, true),
589            Field::new("b", DataType::Utf8, true),
590            Field::new("c", DataType::Float64, true),
591        ]));
592
593        // Define file schema: different order, missing column c, and b has different type
594        let file_schema = Schema::new(vec![
595            Field::new("b", DataType::Float64, true), // Different type but castable to Utf8
596            Field::new("a", DataType::Int32, true),   // Same type
597            Field::new("d", DataType::Boolean, true), // Not in table schema
598        ]);
599
600        // Custom can_map_field function that allows all mappings for testing
601        let allow_all = |_: &Field, _: &Field| Ok(true);
602
603        // Test field mapping
604        let (field_mappings, projection) =
605            create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
606
607        // Expected:
608        // - field_mappings[0] (a) maps to projection[1]
609        // - field_mappings[1] (b) maps to projection[0]
610        // - field_mappings[2] (c) is None (not in file)
611        assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
612        assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a
613
614        // Test with a failing mapper
615        let fails_all = |_: &Field, _: &Field| Ok(false);
616        let (field_mappings, projection) =
617            create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
618
619        // Should have no mappings or projections if all cast checks fail
620        assert_eq!(field_mappings, vec![None, None, None]);
621        assert_eq!(projection, Vec::<usize>::new());
622
623        // Test with error-producing mapper
624        let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
625        let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
626        assert!(result.is_err());
627        assert!(result.unwrap_err().to_string().contains("Test error"));
628    }
629
630    #[test]
631    fn test_schema_mapping_new() {
632        // Define the projected table schema
633        let projected_schema = Arc::new(Schema::new(vec![
634            Field::new("a", DataType::Int32, true),
635            Field::new("b", DataType::Utf8, true),
636        ]));
637
638        // Define field mappings from table to file
639        let field_mappings = vec![Some(1), Some(0)];
640
641        // Create SchemaMapping manually
642        let mapping = SchemaMapping::new(
643            Arc::clone(&projected_schema),
644            field_mappings.clone(),
645            Arc::new(|array: &ArrayRef, field: &Field| cast_column(array, field)),
646        );
647
648        // Check that fields were set correctly
649        assert_eq!(*mapping.projected_table_schema, *projected_schema);
650        assert_eq!(mapping.field_mappings, field_mappings);
651
652        // Test with a batch to ensure it works properly
653        let batch = RecordBatch::try_new(
654            Arc::new(Schema::new(vec![
655                Field::new("b_file", DataType::Utf8, true),
656                Field::new("a_file", DataType::Int32, true),
657            ])),
658            vec![
659                Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
660                Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
661            ],
662        )
663        .unwrap();
664
665        // Test that map_batch works with our manually created mapping
666        let mapped_batch = mapping.map_batch(batch).unwrap();
667
668        // Verify the mapped batch has the correct schema and data
669        assert_eq!(*mapped_batch.schema(), *projected_schema);
670        assert_eq!(mapped_batch.num_columns(), 2);
671        assert_eq!(mapped_batch.column(0).len(), 2); // a column
672        assert_eq!(mapped_batch.column(1).len(), 2); // b column
673    }
674
675    #[test]
676    fn test_map_schema_error_path() {
677        // Define the table schema
678        let table_schema = Arc::new(Schema::new(vec![
679            Field::new("a", DataType::Int32, true),
680            Field::new("b", DataType::Utf8, true),
681            Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules
682        ]));
683
684        // Define file schema with incompatible type for column c
685        let file_schema = Schema::new(vec![
686            Field::new("a", DataType::Int32, true),
687            Field::new("b", DataType::Float64, true), // Different but castable
688            Field::new("c", DataType::Binary, true),  // Not castable to Decimal128
689        ]);
690
691        // Create DefaultSchemaAdapter
692        let adapter = DefaultSchemaAdapter {
693            projected_table_schema: Arc::clone(&table_schema),
694        };
695
696        // map_schema should error due to incompatible types
697        let result = adapter.map_schema(&file_schema);
698        assert!(result.is_err());
699        let error_msg = result.unwrap_err().to_string();
700        assert!(error_msg.contains("Cannot cast file schema field c"));
701    }
702
703    #[test]
704    fn test_map_schema_happy_path() {
705        // Define the table schema
706        let table_schema = Arc::new(Schema::new(vec![
707            Field::new("a", DataType::Int32, true),
708            Field::new("b", DataType::Utf8, true),
709            Field::new("c", DataType::Decimal128(10, 2), true),
710        ]));
711
712        // Create DefaultSchemaAdapter
713        let adapter = DefaultSchemaAdapter {
714            projected_table_schema: Arc::clone(&table_schema),
715        };
716
717        // Define compatible file schema (missing column c)
718        let compatible_file_schema = Schema::new(vec![
719            Field::new("a", DataType::Int64, true), // Can be cast to Int32
720            Field::new("b", DataType::Float64, true), // Can be cast to Utf8
721        ]);
722
723        // Test successful schema mapping
724        let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
725
726        // Verify field_mappings and projection created correctly
727        assert_eq!(projection, vec![0, 1]); // Projecting a and b
728
729        // Verify the SchemaMapping works with actual data
730        let file_batch = RecordBatch::try_new(
731            Arc::new(compatible_file_schema.clone()),
732            vec![
733                Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
734                Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
735            ],
736        )
737        .unwrap();
738
739        let mapped_batch = mapper.map_batch(file_batch).unwrap();
740
741        // Verify correct schema mapping
742        assert_eq!(*mapped_batch.schema(), *table_schema);
743        assert_eq!(mapped_batch.num_columns(), 3); // a, b, c
744
745        // Column c should be null since it wasn't in the file schema
746        let c_array = mapped_batch.column(2);
747        assert_eq!(c_array.len(), 2);
748        assert_eq!(c_array.null_count(), 2);
749    }
750
751    #[test]
752    fn test_adapt_struct_with_added_nested_fields() -> Result<()> {
753        let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
754        let batch = create_test_batch_with_struct_data(&file_schema)?;
755
756        let adapter = DefaultSchemaAdapter {
757            projected_table_schema: Arc::clone(&table_schema),
758        };
759        let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
760        let mapped_batch = mapper.map_batch(batch)?;
761
762        verify_adapted_batch_with_nested_fields(&mapped_batch, &table_schema)?;
763        Ok(())
764    }
765
766    #[test]
767    fn test_map_column_statistics_struct() -> Result<()> {
768        let (file_schema, table_schema) = create_test_schemas_with_nested_fields();
769
770        let adapter = DefaultSchemaAdapter {
771            projected_table_schema: Arc::clone(&table_schema),
772        };
773        let (mapper, _) = adapter.map_schema(file_schema.as_ref())?;
774
775        let file_stats = vec![
776            create_test_column_statistics(
777                0,
778                100,
779                Some(ScalarValue::Int32(Some(1))),
780                Some(ScalarValue::Int32(Some(100))),
781                Some(ScalarValue::Int32(Some(5100))),
782            ),
783            create_test_column_statistics(10, 50, None, None, None),
784        ];
785
786        let table_stats = mapper.map_column_statistics(&file_stats)?;
787        assert_eq!(table_stats.len(), 1);
788        verify_column_statistics(
789            &table_stats[0],
790            Some(0),
791            Some(100),
792            Some(ScalarValue::Int32(Some(1))),
793            Some(ScalarValue::Int32(Some(100))),
794            Some(ScalarValue::Int32(Some(5100))),
795        );
796        let missing_stats = mapper.map_column_statistics(&[])?;
797        assert_eq!(missing_stats.len(), 1);
798        assert_eq!(missing_stats[0], ColumnStatistics::new_unknown());
799        Ok(())
800    }
801
802    fn create_test_schemas_with_nested_fields() -> (SchemaRef, SchemaRef) {
803        let file_schema = Arc::new(Schema::new(vec![Field::new(
804            "info",
805            DataType::Struct(
806                vec![
807                    Field::new("location", DataType::Utf8, true),
808                    Field::new(
809                        "timestamp_utc",
810                        DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
811                        true,
812                    ),
813                ]
814                .into(),
815            ),
816            true,
817        )]));
818
819        let table_schema = Arc::new(Schema::new(vec![Field::new(
820            "info",
821            DataType::Struct(
822                vec![
823                    Field::new("location", DataType::Utf8, true),
824                    Field::new(
825                        "timestamp_utc",
826                        DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
827                        true,
828                    ),
829                    Field::new(
830                        "reason",
831                        DataType::Struct(
832                            vec![
833                                Field::new("_level", DataType::Float64, true),
834                                Field::new(
835                                    "details",
836                                    DataType::Struct(
837                                        vec![
838                                            Field::new("rurl", DataType::Utf8, true),
839                                            Field::new("s", DataType::Float64, true),
840                                            Field::new("t", DataType::Utf8, true),
841                                        ]
842                                        .into(),
843                                    ),
844                                    true,
845                                ),
846                            ]
847                            .into(),
848                        ),
849                        true,
850                    ),
851                ]
852                .into(),
853            ),
854            true,
855        )]));
856
857        (file_schema, table_schema)
858    }
859
860    fn create_test_batch_with_struct_data(
861        file_schema: &SchemaRef,
862    ) -> Result<RecordBatch> {
863        let mut location_builder = StringBuilder::new();
864        location_builder.append_value("San Francisco");
865        location_builder.append_value("New York");
866
867        let timestamp_array = TimestampMillisecondArray::from(vec![
868            Some(1640995200000),
869            Some(1641081600000),
870        ]);
871
872        let timestamp_type =
873            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into()));
874        let timestamp_array = cast(&timestamp_array, &timestamp_type)?;
875
876        let info_struct = StructArray::from(vec![
877            (
878                Arc::new(Field::new("location", DataType::Utf8, true)),
879                Arc::new(location_builder.finish()) as ArrayRef,
880            ),
881            (
882                Arc::new(Field::new("timestamp_utc", timestamp_type, true)),
883                timestamp_array,
884            ),
885        ]);
886
887        Ok(RecordBatch::try_new(
888            Arc::clone(file_schema),
889            vec![Arc::new(info_struct)],
890        )?)
891    }
892
893    fn verify_adapted_batch_with_nested_fields(
894        mapped_batch: &RecordBatch,
895        table_schema: &SchemaRef,
896    ) -> Result<()> {
897        assert_eq!(mapped_batch.schema(), *table_schema);
898        assert_eq!(mapped_batch.num_rows(), 2);
899
900        let info_col = mapped_batch.column(0);
901        let info_array = info_col
902            .as_any()
903            .downcast_ref::<StructArray>()
904            .expect("Expected info column to be a StructArray");
905
906        verify_preserved_fields(info_array)?;
907        verify_reason_field_structure(info_array)?;
908        Ok(())
909    }
910
911    fn verify_preserved_fields(info_array: &StructArray) -> Result<()> {
912        let location_col = info_array
913            .column_by_name("location")
914            .expect("Expected location field in struct");
915        let location_array = location_col
916            .as_any()
917            .downcast_ref::<arrow::array::StringArray>()
918            .expect("Expected location to be a StringArray");
919        assert_eq!(location_array.value(0), "San Francisco");
920        assert_eq!(location_array.value(1), "New York");
921
922        let timestamp_col = info_array
923            .column_by_name("timestamp_utc")
924            .expect("Expected timestamp_utc field in struct");
925        let timestamp_array = timestamp_col
926            .as_any()
927            .downcast_ref::<TimestampMillisecondArray>()
928            .expect("Expected timestamp_utc to be a TimestampMillisecondArray");
929        assert_eq!(timestamp_array.value(0), 1640995200000);
930        assert_eq!(timestamp_array.value(1), 1641081600000);
931        Ok(())
932    }
933
934    fn verify_reason_field_structure(info_array: &StructArray) -> Result<()> {
935        let reason_col = info_array
936            .column_by_name("reason")
937            .expect("Expected reason field in struct");
938        let reason_array = reason_col
939            .as_any()
940            .downcast_ref::<StructArray>()
941            .expect("Expected reason to be a StructArray");
942        assert_eq!(reason_array.fields().len(), 2);
943        assert!(reason_array.column_by_name("_level").is_some());
944        assert!(reason_array.column_by_name("details").is_some());
945
946        let details_col = reason_array
947            .column_by_name("details")
948            .expect("Expected details field in reason struct");
949        let details_array = details_col
950            .as_any()
951            .downcast_ref::<StructArray>()
952            .expect("Expected details to be a StructArray");
953        assert_eq!(details_array.fields().len(), 3);
954        assert!(details_array.column_by_name("rurl").is_some());
955        assert!(details_array.column_by_name("s").is_some());
956        assert!(details_array.column_by_name("t").is_some());
957        for i in 0..2 {
958            assert!(reason_array.is_null(i), "reason field should be null");
959        }
960        Ok(())
961    }
962
963    fn verify_column_statistics(
964        stats: &ColumnStatistics,
965        expected_null_count: Option<usize>,
966        expected_distinct_count: Option<usize>,
967        expected_min: Option<ScalarValue>,
968        expected_max: Option<ScalarValue>,
969        expected_sum: Option<ScalarValue>,
970    ) {
971        if let Some(count) = expected_null_count {
972            assert_eq!(
973                stats.null_count,
974                Precision::Exact(count),
975                "Null count should match expected value"
976            );
977        }
978        if let Some(count) = expected_distinct_count {
979            assert_eq!(
980                stats.distinct_count,
981                Precision::Exact(count),
982                "Distinct count should match expected value"
983            );
984        }
985        if let Some(min) = expected_min {
986            assert_eq!(
987                stats.min_value,
988                Precision::Exact(min),
989                "Min value should match expected value"
990            );
991        }
992        if let Some(max) = expected_max {
993            assert_eq!(
994                stats.max_value,
995                Precision::Exact(max),
996                "Max value should match expected value"
997            );
998        }
999        if let Some(sum) = expected_sum {
1000            assert_eq!(
1001                stats.sum_value,
1002                Precision::Exact(sum),
1003                "Sum value should match expected value"
1004            );
1005        }
1006    }
1007
1008    fn create_test_column_statistics(
1009        null_count: usize,
1010        distinct_count: usize,
1011        min_value: Option<ScalarValue>,
1012        max_value: Option<ScalarValue>,
1013        sum_value: Option<ScalarValue>,
1014    ) -> ColumnStatistics {
1015        ColumnStatistics {
1016            null_count: Precision::Exact(null_count),
1017            distinct_count: Precision::Exact(distinct_count),
1018            min_value: min_value.map_or_else(|| Precision::Absent, Precision::Exact),
1019            max_value: max_value.map_or_else(|| Precision::Absent, Precision::Exact),
1020            sum_value: sum_value.map_or_else(|| Precision::Absent, Precision::Exact),
1021        }
1022    }
1023}