datafusion_datasource/
schema_adapter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
19//!
20//! Adapter provides a method of translating the RecordBatches that come out of the
21//! physical format into how they should be used by DataFusion.  For instance, a schema
22//! can be stored external to a parquet file that maps parquet logical types to arrow types.
23
24use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
25use arrow::compute::{can_cast_types, cast};
26use arrow::datatypes::{Schema, SchemaRef};
27use datafusion_common::plan_err;
28use std::fmt::Debug;
29use std::sync::Arc;
30
31/// Factory for creating [`SchemaAdapter`]
32///
33/// This interface provides a way to implement custom schema adaptation logic
34/// for DataSourceExec (for example, to fill missing columns with default value
35/// other than null).
36///
37/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
38/// more details and examples.
39pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
40    /// Create a [`SchemaAdapter`]
41    ///
42    /// Arguments:
43    ///
44    /// * `projected_table_schema`: The schema for the table, projected to
45    ///   include only the fields being output (projected) by the this mapping.
46    ///
47    /// * `table_schema`: The entire table schema for the table
48    fn create(
49        &self,
50        projected_table_schema: SchemaRef,
51        table_schema: SchemaRef,
52    ) -> Box<dyn SchemaAdapter>;
53}
54
55/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
56/// schema, which may have a schema obtained from merging multiple file-level
57/// schemas.
58///
59/// This is useful for implementing schema evolution in partitioned datasets.
60///
61/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
62pub trait SchemaAdapter: Send + Sync {
63    /// Map a column index in the table schema to a column index in a particular
64    /// file schema
65    ///
66    /// This is used while reading a file to push down projections by mapping
67    /// projected column indexes from the table schema to the file schema
68    ///
69    /// Panics if index is not in range for the table schema
70    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
71
72    /// Creates a mapping for casting columns from the file schema to the table
73    /// schema.
74    ///
75    /// This is used after reading a record batch. The returned [`SchemaMapper`]:
76    ///
77    /// 1. Maps columns to the expected columns indexes
78    /// 2. Handles missing values (e.g. fills nulls or a default value) for
79    ///    columns in the in the table schema not in the file schema
80    /// 2. Handles different types: if the column in the file schema has a
81    ///    different type than `table_schema`, the mapper will resolve this
82    ///    difference (e.g. by casting to the appropriate type)
83    ///
84    /// Returns:
85    /// * a [`SchemaMapper`]
86    /// * an ordered list of columns to project from the file
87    fn map_schema(
88        &self,
89        file_schema: &Schema,
90    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
91}
92
93/// Maps, columns from a specific file schema to the table schema.
94///
95/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
96pub trait SchemaMapper: Debug + Send + Sync {
97    /// Adapts a `RecordBatch` to match the `table_schema`
98    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
99}
100
101/// Default  [`SchemaAdapterFactory`] for mapping schemas.
102///
103/// This can be used to adapt file-level record batches to a table schema and
104/// implement schema evolution.
105///
106/// Given an input file schema and a table schema, this factory returns
107/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
108///
109/// 1. Reorder columns
110/// 2. Cast columns to the correct type
111/// 3. Fill missing columns with nulls
112///
113/// # Errors:
114///
115/// * If a column in the table schema is non-nullable but is not present in the
116///   file schema (i.e. it is missing), the returned mapper tries to fill it with
117///   nulls resulting in a schema error.
118///
119/// # Illustration of Schema Mapping
120///
121/// ```text
122/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
123///  ┌───────┐   ┌───────┐ │                  ┌───────┐   ┌───────┐   ┌───────┐ │
124/// ││  1.0  │   │ "foo" │                   ││ NULL  │   │ "foo" │   │ "1.0" │
125///  ├───────┤   ├───────┤ │ Schema mapping   ├───────┤   ├───────┤   ├───────┤ │
126/// ││  2.0  │   │ "bar" │                   ││  NULL │   │ "bar" │   │ "2.0" │
127///  └───────┘   └───────┘ │────────────────▶ └───────┘   └───────┘   └───────┘ │
128/// │                                        │
129///  column "c"  column "b"│                  column "a"  column "b"  column "c"│
130/// │ Float64       Utf8                     │  Int32        Utf8        Utf8
131///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘                  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
132///     Input Record Batch                         Output Record Batch
133///
134///     Schema {                                   Schema {
135///      "c": Float64,                              "a": Int32,
136///      "b": Utf8,                                 "b": Utf8,
137///     }                                           "c": Utf8,
138///                                                }
139/// ```
140///
141/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
142///
143/// Note `SchemaMapping` also supports mapping partial batches, which is used as
144/// part of predicate pushdown.
145///
146/// ```
147/// # use std::sync::Arc;
148/// # use arrow::datatypes::{DataType, Field, Schema};
149/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
150/// # use datafusion_common::record_batch;
151/// // Table has fields "a",  "b" and "c"
152/// let table_schema = Schema::new(vec![
153///     Field::new("a", DataType::Int32, true),
154///     Field::new("b", DataType::Utf8, true),
155///     Field::new("c", DataType::Utf8, true),
156/// ]);
157///
158/// // create an adapter to map the table schema to the file schema
159/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
160///
161/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
162/// // instead of 'Utf8'
163/// let file_schema = Schema::new(vec![
164///    Field::new("c", DataType::Utf8, true),
165///    Field::new("b", DataType::Float64, true),
166/// ]);
167///
168/// // Get a mapping from the file schema to the table schema
169/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
170///
171/// let file_batch = record_batch!(
172///     ("c", Utf8, vec!["foo", "bar"]),
173///     ("b", Float64, vec![1.0, 2.0])
174/// ).unwrap();
175///
176/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
177///
178/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
179/// let expected_batch = record_batch!(
180///    ("a", Int32, vec![None, None]),  // missing column filled with nulls
181///    ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
182///    ("c", Utf8, vec!["foo", "bar"])
183/// ).unwrap();
184/// assert_eq!(mapped_batch, expected_batch);
185/// ```
186#[derive(Clone, Debug, Default)]
187pub struct DefaultSchemaAdapterFactory;
188
189impl DefaultSchemaAdapterFactory {
190    /// Create a new factory for mapping batches from a file schema to a table
191    /// schema.
192    ///
193    /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
194    /// the same schema for both the projected table schema and the table
195    /// schema.
196    pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
197        Self.create(Arc::clone(&table_schema), table_schema)
198    }
199}
200
201impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
202    fn create(
203        &self,
204        projected_table_schema: SchemaRef,
205        _table_schema: SchemaRef,
206    ) -> Box<dyn SchemaAdapter> {
207        Box::new(DefaultSchemaAdapter {
208            projected_table_schema,
209        })
210    }
211}
212
213/// This SchemaAdapter requires both the table schema and the projected table
214/// schema. See  [`SchemaMapping`] for more details
215#[derive(Clone, Debug)]
216pub(crate) struct DefaultSchemaAdapter {
217    /// The schema for the table, projected to include only the fields being output (projected) by the
218    /// associated ParquetSource
219    projected_table_schema: SchemaRef,
220}
221
222impl SchemaAdapter for DefaultSchemaAdapter {
223    /// Map a column index in the table schema to a column index in a particular
224    /// file schema
225    ///
226    /// Panics if index is not in range for the table schema
227    fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
228        let field = self.projected_table_schema.field(index);
229        Some(file_schema.fields.find(field.name())?.0)
230    }
231
232    /// Creates a `SchemaMapping` for casting or mapping the columns from the
233    /// file schema to the table schema.
234    ///
235    /// If the provided `file_schema` contains columns of a different type to
236    /// the expected `table_schema`, the method will attempt to cast the array
237    /// data from the file schema to the table schema where possible.
238    ///
239    /// Returns a [`SchemaMapping`] that can be applied to the output batch
240    /// along with an ordered list of columns to project from the file
241    fn map_schema(
242        &self,
243        file_schema: &Schema,
244    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
245        let mut projection = Vec::with_capacity(file_schema.fields().len());
246        let mut field_mappings = vec![None; self.projected_table_schema.fields().len()];
247
248        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
249            if let Some((table_idx, table_field)) =
250                self.projected_table_schema.fields().find(file_field.name())
251            {
252                match can_cast_types(file_field.data_type(), table_field.data_type()) {
253                    true => {
254                        field_mappings[table_idx] = Some(projection.len());
255                        projection.push(file_idx);
256                    }
257                    false => {
258                        return plan_err!(
259                            "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
260                            file_field.name(),
261                            file_field.data_type(),
262                            table_field.data_type()
263                        )
264                    }
265                }
266            }
267        }
268
269        Ok((
270            Arc::new(SchemaMapping {
271                projected_table_schema: Arc::clone(&self.projected_table_schema),
272                field_mappings,
273            }),
274            projection,
275        ))
276    }
277}
278
279/// The SchemaMapping struct holds a mapping from the file schema to the table
280/// schema and any necessary type conversions.
281///
282/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
283/// has the projected schema, since that's the schema which is supposed to come
284/// out of the execution of this query. Thus `map_batch` uses
285/// `projected_table_schema` as it can only operate on the projected fields.
286///
287/// [`map_batch`]: Self::map_batch
288#[derive(Debug)]
289pub struct SchemaMapping {
290    /// The schema of the table. This is the expected schema after conversion
291    /// and it should match the schema of the query result.
292    projected_table_schema: SchemaRef,
293    /// Mapping from field index in `projected_table_schema` to index in
294    /// projected file_schema.
295    ///
296    /// They are Options instead of just plain `usize`s because the table could
297    /// have fields that don't exist in the file.
298    field_mappings: Vec<Option<usize>>,
299}
300
301impl SchemaMapper for SchemaMapping {
302    /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
303    /// conversions.
304    /// The produced RecordBatch has a schema that contains only the projected columns.
305    fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
306        let batch_rows = batch.num_rows();
307        let batch_cols = batch.columns().to_vec();
308
309        let cols = self
310            .projected_table_schema
311            // go through each field in the projected schema
312            .fields()
313            .iter()
314            // and zip it with the index that maps fields from the projected table schema to the
315            // projected file schema in `batch`
316            .zip(&self.field_mappings)
317            // and for each one...
318            .map(|(field, file_idx)| {
319                file_idx.map_or_else(
320                    // If this field only exists in the table, and not in the file, then we know
321                    // that it's null, so just return that.
322                    || Ok(new_null_array(field.data_type(), batch_rows)),
323                    // However, if it does exist in both, then try to cast it to the correct output
324                    // type
325                    |batch_idx| cast(&batch_cols[batch_idx], field.data_type()),
326                )
327            })
328            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
329
330        // Necessary to handle empty batches
331        let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
332
333        let schema = Arc::clone(&self.projected_table_schema);
334        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
335        Ok(record_batch)
336    }
337}