datafusion_datasource/schema_adapter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
19//!
20//! Adapter provides a method of translating the RecordBatches that come out of the
21//! physical format into how they should be used by DataFusion. For instance, a schema
22//! can be stored external to a parquet file that maps parquet logical types to arrow types.
23
24use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
25use arrow::compute::{can_cast_types, cast};
26use arrow::datatypes::{Field, Schema, SchemaRef};
27use datafusion_common::{plan_err, ColumnStatistics};
28use std::fmt::Debug;
29use std::sync::Arc;
30
31/// Factory for creating [`SchemaAdapter`]
32///
33/// This interface provides a way to implement custom schema adaptation logic
34/// for DataSourceExec (for example, to fill missing columns with default value
35/// other than null).
36///
37/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
38/// more details and examples.
39pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
40 /// Create a [`SchemaAdapter`]
41 ///
42 /// Arguments:
43 ///
44 /// * `projected_table_schema`: The schema for the table, projected to
45 /// include only the fields being output (projected) by the this mapping.
46 ///
47 /// * `table_schema`: The entire table schema for the table
48 fn create(
49 &self,
50 projected_table_schema: SchemaRef,
51 table_schema: SchemaRef,
52 ) -> Box<dyn SchemaAdapter>;
53}
54
55/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
56/// schema, which may have a schema obtained from merging multiple file-level
57/// schemas.
58///
59/// This is useful for implementing schema evolution in partitioned datasets.
60///
61/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
62pub trait SchemaAdapter: Send + Sync {
63 /// Map a column index in the table schema to a column index in a particular
64 /// file schema
65 ///
66 /// This is used while reading a file to push down projections by mapping
67 /// projected column indexes from the table schema to the file schema
68 ///
69 /// Panics if index is not in range for the table schema
70 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
71
72 /// Creates a mapping for casting columns from the file schema to the table
73 /// schema.
74 ///
75 /// This is used after reading a record batch. The returned [`SchemaMapper`]:
76 ///
77 /// 1. Maps columns to the expected columns indexes
78 /// 2. Handles missing values (e.g. fills nulls or a default value) for
79 /// columns in the in the table schema not in the file schema
80 /// 2. Handles different types: if the column in the file schema has a
81 /// different type than `table_schema`, the mapper will resolve this
82 /// difference (e.g. by casting to the appropriate type)
83 ///
84 /// Returns:
85 /// * a [`SchemaMapper`]
86 /// * an ordered list of columns to project from the file
87 fn map_schema(
88 &self,
89 file_schema: &Schema,
90 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
91}
92
93/// Maps, columns from a specific file schema to the table schema.
94///
95/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
96pub trait SchemaMapper: Debug + Send + Sync {
97 /// Adapts a `RecordBatch` to match the `table_schema`
98 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
99
100 /// Adapts file-level column `Statistics` to match the `table_schema`
101 fn map_column_statistics(
102 &self,
103 file_col_statistics: &[ColumnStatistics],
104 ) -> datafusion_common::Result<Vec<ColumnStatistics>>;
105}
106
107/// Default [`SchemaAdapterFactory`] for mapping schemas.
108///
109/// This can be used to adapt file-level record batches to a table schema and
110/// implement schema evolution.
111///
112/// Given an input file schema and a table schema, this factory returns
113/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
114///
115/// 1. Reorder columns
116/// 2. Cast columns to the correct type
117/// 3. Fill missing columns with nulls
118///
119/// # Errors:
120///
121/// * If a column in the table schema is non-nullable but is not present in the
122/// file schema (i.e. it is missing), the returned mapper tries to fill it with
123/// nulls resulting in a schema error.
124///
125/// # Illustration of Schema Mapping
126///
127/// ```text
128/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
129/// ┌───────┐ ┌───────┐ │ ┌───────┐ ┌───────┐ ┌───────┐ │
130/// ││ 1.0 │ │ "foo" │ ││ NULL │ │ "foo" │ │ "1.0" │
131/// ├───────┤ ├───────┤ │ Schema mapping ├───────┤ ├───────┤ ├───────┤ │
132/// ││ 2.0 │ │ "bar" │ ││ NULL │ │ "bar" │ │ "2.0" │
133/// └───────┘ └───────┘ │────────────────▶ └───────┘ └───────┘ └───────┘ │
134/// │ │
135/// column "c" column "b"│ column "a" column "b" column "c"│
136/// │ Float64 Utf8 │ Int32 Utf8 Utf8
137/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
138/// Input Record Batch Output Record Batch
139///
140/// Schema { Schema {
141/// "c": Float64, "a": Int32,
142/// "b": Utf8, "b": Utf8,
143/// } "c": Utf8,
144/// }
145/// ```
146///
147/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
148///
149/// Note `SchemaMapping` also supports mapping partial batches, which is used as
150/// part of predicate pushdown.
151///
152/// ```
153/// # use std::sync::Arc;
154/// # use arrow::datatypes::{DataType, Field, Schema};
155/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
156/// # use datafusion_common::record_batch;
157/// // Table has fields "a", "b" and "c"
158/// let table_schema = Schema::new(vec![
159/// Field::new("a", DataType::Int32, true),
160/// Field::new("b", DataType::Utf8, true),
161/// Field::new("c", DataType::Utf8, true),
162/// ]);
163///
164/// // create an adapter to map the table schema to the file schema
165/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
166///
167/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
168/// // instead of 'Utf8'
169/// let file_schema = Schema::new(vec![
170/// Field::new("c", DataType::Utf8, true),
171/// Field::new("b", DataType::Float64, true),
172/// ]);
173///
174/// // Get a mapping from the file schema to the table schema
175/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
176///
177/// let file_batch = record_batch!(
178/// ("c", Utf8, vec!["foo", "bar"]),
179/// ("b", Float64, vec![1.0, 2.0])
180/// ).unwrap();
181///
182/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
183///
184/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
185/// let expected_batch = record_batch!(
186/// ("a", Int32, vec![None, None]), // missing column filled with nulls
187/// ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
188/// ("c", Utf8, vec!["foo", "bar"])
189/// ).unwrap();
190/// assert_eq!(mapped_batch, expected_batch);
191/// ```
192#[derive(Clone, Debug, Default)]
193pub struct DefaultSchemaAdapterFactory;
194
195impl DefaultSchemaAdapterFactory {
196 /// Create a new factory for mapping batches from a file schema to a table
197 /// schema.
198 ///
199 /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
200 /// the same schema for both the projected table schema and the table
201 /// schema.
202 pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
203 Self.create(Arc::clone(&table_schema), table_schema)
204 }
205}
206
207impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
208 fn create(
209 &self,
210 projected_table_schema: SchemaRef,
211 _table_schema: SchemaRef,
212 ) -> Box<dyn SchemaAdapter> {
213 Box::new(DefaultSchemaAdapter {
214 projected_table_schema,
215 })
216 }
217}
218
219/// This SchemaAdapter requires both the table schema and the projected table
220/// schema. See [`SchemaMapping`] for more details
221#[derive(Clone, Debug)]
222pub(crate) struct DefaultSchemaAdapter {
223 /// The schema for the table, projected to include only the fields being output (projected) by the
224 /// associated ParquetSource
225 projected_table_schema: SchemaRef,
226}
227
228/// Checks if a file field can be cast to a table field
229///
230/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible
231pub(crate) fn can_cast_field(
232 file_field: &Field,
233 table_field: &Field,
234) -> datafusion_common::Result<bool> {
235 if can_cast_types(file_field.data_type(), table_field.data_type()) {
236 Ok(true)
237 } else {
238 plan_err!(
239 "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
240 file_field.name(),
241 file_field.data_type(),
242 table_field.data_type()
243 )
244 }
245}
246
247impl SchemaAdapter for DefaultSchemaAdapter {
248 /// Map a column index in the table schema to a column index in a particular
249 /// file schema
250 ///
251 /// Panics if index is not in range for the table schema
252 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
253 let field = self.projected_table_schema.field(index);
254 Some(file_schema.fields.find(field.name())?.0)
255 }
256
257 /// Creates a `SchemaMapping` for casting or mapping the columns from the
258 /// file schema to the table schema.
259 ///
260 /// If the provided `file_schema` contains columns of a different type to
261 /// the expected `table_schema`, the method will attempt to cast the array
262 /// data from the file schema to the table schema where possible.
263 ///
264 /// Returns a [`SchemaMapping`] that can be applied to the output batch
265 /// along with an ordered list of columns to project from the file
266 fn map_schema(
267 &self,
268 file_schema: &Schema,
269 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
270 let (field_mappings, projection) = create_field_mapping(
271 file_schema,
272 &self.projected_table_schema,
273 can_cast_field,
274 )?;
275
276 Ok((
277 Arc::new(SchemaMapping::new(
278 Arc::clone(&self.projected_table_schema),
279 field_mappings,
280 )),
281 projection,
282 ))
283 }
284}
285
286/// Helper function that creates field mappings between file schema and table schema
287///
288/// Maps columns from the file schema to their corresponding positions in the table schema,
289/// applying type compatibility checking via the provided predicate function.
290///
291/// Returns field mappings (for column reordering) and a projection (for field selection).
292pub(crate) fn create_field_mapping<F>(
293 file_schema: &Schema,
294 projected_table_schema: &SchemaRef,
295 can_map_field: F,
296) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
297where
298 F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
299{
300 let mut projection = Vec::with_capacity(file_schema.fields().len());
301 let mut field_mappings = vec![None; projected_table_schema.fields().len()];
302
303 for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
304 if let Some((table_idx, table_field)) =
305 projected_table_schema.fields().find(file_field.name())
306 {
307 if can_map_field(file_field, table_field)? {
308 field_mappings[table_idx] = Some(projection.len());
309 projection.push(file_idx);
310 }
311 }
312 }
313
314 Ok((field_mappings, projection))
315}
316
317/// The SchemaMapping struct holds a mapping from the file schema to the table
318/// schema and any necessary type conversions.
319///
320/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
321/// has the projected schema, since that's the schema which is supposed to come
322/// out of the execution of this query. Thus `map_batch` uses
323/// `projected_table_schema` as it can only operate on the projected fields.
324///
325/// [`map_batch`]: Self::map_batch
326#[derive(Debug)]
327pub struct SchemaMapping {
328 /// The schema of the table. This is the expected schema after conversion
329 /// and it should match the schema of the query result.
330 projected_table_schema: SchemaRef,
331 /// Mapping from field index in `projected_table_schema` to index in
332 /// projected file_schema.
333 ///
334 /// They are Options instead of just plain `usize`s because the table could
335 /// have fields that don't exist in the file.
336 field_mappings: Vec<Option<usize>>,
337}
338
339impl SchemaMapping {
340 /// Creates a new SchemaMapping instance
341 ///
342 /// Initializes the field mappings needed to transform file data to the projected table schema
343 pub fn new(
344 projected_table_schema: SchemaRef,
345 field_mappings: Vec<Option<usize>>,
346 ) -> Self {
347 Self {
348 projected_table_schema,
349 field_mappings,
350 }
351 }
352}
353
354impl SchemaMapper for SchemaMapping {
355 /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
356 /// conversions.
357 /// The produced RecordBatch has a schema that contains only the projected columns.
358 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
359 let batch_rows = batch.num_rows();
360 let batch_cols = batch.columns().to_vec();
361
362 let cols = self
363 .projected_table_schema
364 // go through each field in the projected schema
365 .fields()
366 .iter()
367 // and zip it with the index that maps fields from the projected table schema to the
368 // projected file schema in `batch`
369 .zip(&self.field_mappings)
370 // and for each one...
371 .map(|(field, file_idx)| {
372 file_idx.map_or_else(
373 // If this field only exists in the table, and not in the file, then we know
374 // that it's null, so just return that.
375 || Ok(new_null_array(field.data_type(), batch_rows)),
376 // However, if it does exist in both, then try to cast it to the correct output
377 // type
378 |batch_idx| cast(&batch_cols[batch_idx], field.data_type()),
379 )
380 })
381 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
382
383 // Necessary to handle empty batches
384 let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
385
386 let schema = Arc::clone(&self.projected_table_schema);
387 let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
388 Ok(record_batch)
389 }
390
391 /// Adapts file-level column `Statistics` to match the `table_schema`
392 fn map_column_statistics(
393 &self,
394 file_col_statistics: &[ColumnStatistics],
395 ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
396 let mut table_col_statistics = vec![];
397
398 // Map the statistics for each field in the file schema to the corresponding field in the
399 // table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown`
400 for (_, file_col_idx) in self
401 .projected_table_schema
402 .fields()
403 .iter()
404 .zip(&self.field_mappings)
405 {
406 if let Some(file_col_idx) = file_col_idx {
407 table_col_statistics.push(
408 file_col_statistics
409 .get(*file_col_idx)
410 .cloned()
411 .unwrap_or_default(),
412 );
413 } else {
414 table_col_statistics.push(ColumnStatistics::new_unknown());
415 }
416 }
417
418 Ok(table_col_statistics)
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use arrow::datatypes::{DataType, Field};
425 use datafusion_common::{stats::Precision, Statistics};
426
427 use super::*;
428
429 #[test]
430 fn test_schema_mapping_map_statistics_basic() {
431 // Create table schema (a, b, c)
432 let table_schema = Arc::new(Schema::new(vec![
433 Field::new("a", DataType::Int32, true),
434 Field::new("b", DataType::Utf8, true),
435 Field::new("c", DataType::Float64, true),
436 ]));
437
438 // Create file schema (b, a) - different order, missing c
439 let file_schema = Schema::new(vec![
440 Field::new("b", DataType::Utf8, true),
441 Field::new("a", DataType::Int32, true),
442 ]);
443
444 // Create SchemaAdapter
445 let adapter = DefaultSchemaAdapter {
446 projected_table_schema: Arc::clone(&table_schema),
447 };
448
449 // Get mapper and projection
450 let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();
451
452 // Should project columns 0,1 from file
453 assert_eq!(projection, vec![0, 1]);
454
455 // Create file statistics
456 let mut file_stats = Statistics::default();
457
458 // Statistics for column b (index 0 in file)
459 let b_stats = ColumnStatistics {
460 null_count: Precision::Exact(5),
461 ..Default::default()
462 };
463
464 // Statistics for column a (index 1 in file)
465 let a_stats = ColumnStatistics {
466 null_count: Precision::Exact(10),
467 ..Default::default()
468 };
469
470 file_stats.column_statistics = vec![b_stats, a_stats];
471
472 // Map statistics
473 let table_col_stats = mapper
474 .map_column_statistics(&file_stats.column_statistics)
475 .unwrap();
476
477 // Verify stats
478 assert_eq!(table_col_stats.len(), 3);
479 assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1
480 assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0
481 assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown)
482 }
483
484 #[test]
485 fn test_schema_mapping_map_statistics_empty() {
486 // Create schemas
487 let table_schema = Arc::new(Schema::new(vec![
488 Field::new("a", DataType::Int32, true),
489 Field::new("b", DataType::Utf8, true),
490 ]));
491 let file_schema = Schema::new(vec![
492 Field::new("a", DataType::Int32, true),
493 Field::new("b", DataType::Utf8, true),
494 ]);
495
496 let adapter = DefaultSchemaAdapter {
497 projected_table_schema: Arc::clone(&table_schema),
498 };
499 let (mapper, _) = adapter.map_schema(&file_schema).unwrap();
500
501 // Empty file statistics
502 let file_stats = Statistics::default();
503 let table_col_stats = mapper
504 .map_column_statistics(&file_stats.column_statistics)
505 .unwrap();
506
507 // All stats should be unknown
508 assert_eq!(table_col_stats.len(), 2);
509 assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
510 assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
511 }
512
513 #[test]
514 fn test_can_cast_field() {
515 // Same type should work
516 let from_field = Field::new("col", DataType::Int32, true);
517 let to_field = Field::new("col", DataType::Int32, true);
518 assert!(can_cast_field(&from_field, &to_field).unwrap());
519
520 // Casting Int32 to Float64 is allowed
521 let from_field = Field::new("col", DataType::Int32, true);
522 let to_field = Field::new("col", DataType::Float64, true);
523 assert!(can_cast_field(&from_field, &to_field).unwrap());
524
525 // Casting Float64 to Utf8 should work (converts to string)
526 let from_field = Field::new("col", DataType::Float64, true);
527 let to_field = Field::new("col", DataType::Utf8, true);
528 assert!(can_cast_field(&from_field, &to_field).unwrap());
529
530 // Binary to Utf8 is not supported - this is an example of a cast that should fail
531 // Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast
532 let from_field = Field::new("col", DataType::Binary, true);
533 let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
534 let result = can_cast_field(&from_field, &to_field);
535 assert!(result.is_err());
536 let error_msg = result.unwrap_err().to_string();
537 assert!(error_msg.contains("Cannot cast file schema field col"));
538 }
539
540 #[test]
541 fn test_create_field_mapping() {
542 // Define the table schema
543 let table_schema = Arc::new(Schema::new(vec![
544 Field::new("a", DataType::Int32, true),
545 Field::new("b", DataType::Utf8, true),
546 Field::new("c", DataType::Float64, true),
547 ]));
548
549 // Define file schema: different order, missing column c, and b has different type
550 let file_schema = Schema::new(vec![
551 Field::new("b", DataType::Float64, true), // Different type but castable to Utf8
552 Field::new("a", DataType::Int32, true), // Same type
553 Field::new("d", DataType::Boolean, true), // Not in table schema
554 ]);
555
556 // Custom can_map_field function that allows all mappings for testing
557 let allow_all = |_: &Field, _: &Field| Ok(true);
558
559 // Test field mapping
560 let (field_mappings, projection) =
561 create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
562
563 // Expected:
564 // - field_mappings[0] (a) maps to projection[1]
565 // - field_mappings[1] (b) maps to projection[0]
566 // - field_mappings[2] (c) is None (not in file)
567 assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
568 assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a
569
570 // Test with a failing mapper
571 let fails_all = |_: &Field, _: &Field| Ok(false);
572 let (field_mappings, projection) =
573 create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
574
575 // Should have no mappings or projections if all cast checks fail
576 assert_eq!(field_mappings, vec![None, None, None]);
577 assert_eq!(projection, Vec::<usize>::new());
578
579 // Test with error-producing mapper
580 let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
581 let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
582 assert!(result.is_err());
583 assert!(result.unwrap_err().to_string().contains("Test error"));
584 }
585
586 #[test]
587 fn test_schema_mapping_new() {
588 // Define the projected table schema
589 let projected_schema = Arc::new(Schema::new(vec![
590 Field::new("a", DataType::Int32, true),
591 Field::new("b", DataType::Utf8, true),
592 ]));
593
594 // Define field mappings from table to file
595 let field_mappings = vec![Some(1), Some(0)];
596
597 // Create SchemaMapping manually
598 let mapping =
599 SchemaMapping::new(Arc::clone(&projected_schema), field_mappings.clone());
600
601 // Check that fields were set correctly
602 assert_eq!(*mapping.projected_table_schema, *projected_schema);
603 assert_eq!(mapping.field_mappings, field_mappings);
604
605 // Test with a batch to ensure it works properly
606 let batch = RecordBatch::try_new(
607 Arc::new(Schema::new(vec![
608 Field::new("b_file", DataType::Utf8, true),
609 Field::new("a_file", DataType::Int32, true),
610 ])),
611 vec![
612 Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
613 Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
614 ],
615 )
616 .unwrap();
617
618 // Test that map_batch works with our manually created mapping
619 let mapped_batch = mapping.map_batch(batch).unwrap();
620
621 // Verify the mapped batch has the correct schema and data
622 assert_eq!(*mapped_batch.schema(), *projected_schema);
623 assert_eq!(mapped_batch.num_columns(), 2);
624 assert_eq!(mapped_batch.column(0).len(), 2); // a column
625 assert_eq!(mapped_batch.column(1).len(), 2); // b column
626 }
627
628 #[test]
629 fn test_map_schema_error_path() {
630 // Define the table schema
631 let table_schema = Arc::new(Schema::new(vec![
632 Field::new("a", DataType::Int32, true),
633 Field::new("b", DataType::Utf8, true),
634 Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules
635 ]));
636
637 // Define file schema with incompatible type for column c
638 let file_schema = Schema::new(vec![
639 Field::new("a", DataType::Int32, true),
640 Field::new("b", DataType::Float64, true), // Different but castable
641 Field::new("c", DataType::Binary, true), // Not castable to Decimal128
642 ]);
643
644 // Create DefaultSchemaAdapter
645 let adapter = DefaultSchemaAdapter {
646 projected_table_schema: Arc::clone(&table_schema),
647 };
648
649 // map_schema should error due to incompatible types
650 let result = adapter.map_schema(&file_schema);
651 assert!(result.is_err());
652 let error_msg = result.unwrap_err().to_string();
653 assert!(error_msg.contains("Cannot cast file schema field c"));
654 }
655
656 #[test]
657 fn test_map_schema_happy_path() {
658 // Define the table schema
659 let table_schema = Arc::new(Schema::new(vec![
660 Field::new("a", DataType::Int32, true),
661 Field::new("b", DataType::Utf8, true),
662 Field::new("c", DataType::Decimal128(10, 2), true),
663 ]));
664
665 // Create DefaultSchemaAdapter
666 let adapter = DefaultSchemaAdapter {
667 projected_table_schema: Arc::clone(&table_schema),
668 };
669
670 // Define compatible file schema (missing column c)
671 let compatible_file_schema = Schema::new(vec![
672 Field::new("a", DataType::Int64, true), // Can be cast to Int32
673 Field::new("b", DataType::Float64, true), // Can be cast to Utf8
674 ]);
675
676 // Test successful schema mapping
677 let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
678
679 // Verify field_mappings and projection created correctly
680 assert_eq!(projection, vec![0, 1]); // Projecting a and b
681
682 // Verify the SchemaMapping works with actual data
683 let file_batch = RecordBatch::try_new(
684 Arc::new(compatible_file_schema.clone()),
685 vec![
686 Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
687 Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
688 ],
689 )
690 .unwrap();
691
692 let mapped_batch = mapper.map_batch(file_batch).unwrap();
693
694 // Verify correct schema mapping
695 assert_eq!(*mapped_batch.schema(), *table_schema);
696 assert_eq!(mapped_batch.num_columns(), 3); // a, b, c
697
698 // Column c should be null since it wasn't in the file schema
699 let c_array = mapped_batch.column(2);
700 assert_eq!(c_array.len(), 2);
701 assert_eq!(c_array.null_count(), 2);
702 }
703}