datafusion_datasource/schema_adapter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
19//!
20//! Adapter provides a method of translating the RecordBatches that come out of the
21//! physical format into how they should be used by DataFusion. For instance, a schema
22//! can be stored external to a parquet file that maps parquet logical types to arrow types.
23
24use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
25use arrow::compute::{can_cast_types, cast};
26use arrow::datatypes::{Schema, SchemaRef};
27use datafusion_common::plan_err;
28use std::fmt::Debug;
29use std::sync::Arc;
30
31/// Factory for creating [`SchemaAdapter`]
32///
33/// This interface provides a way to implement custom schema adaptation logic
34/// for DataSourceExec (for example, to fill missing columns with default value
35/// other than null).
36///
37/// Most users should use [`DefaultSchemaAdapterFactory`]. See that struct for
38/// more details and examples.
39pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
40 /// Create a [`SchemaAdapter`]
41 ///
42 /// Arguments:
43 ///
44 /// * `projected_table_schema`: The schema for the table, projected to
45 /// include only the fields being output (projected) by the this mapping.
46 ///
47 /// * `table_schema`: The entire table schema for the table
48 fn create(
49 &self,
50 projected_table_schema: SchemaRef,
51 table_schema: SchemaRef,
52 ) -> Box<dyn SchemaAdapter>;
53}
54
55/// Creates [`SchemaMapper`]s to map file-level [`RecordBatch`]es to a table
56/// schema, which may have a schema obtained from merging multiple file-level
57/// schemas.
58///
59/// This is useful for implementing schema evolution in partitioned datasets.
60///
61/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
62pub trait SchemaAdapter: Send + Sync {
63 /// Map a column index in the table schema to a column index in a particular
64 /// file schema
65 ///
66 /// This is used while reading a file to push down projections by mapping
67 /// projected column indexes from the table schema to the file schema
68 ///
69 /// Panics if index is not in range for the table schema
70 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;
71
72 /// Creates a mapping for casting columns from the file schema to the table
73 /// schema.
74 ///
75 /// This is used after reading a record batch. The returned [`SchemaMapper`]:
76 ///
77 /// 1. Maps columns to the expected columns indexes
78 /// 2. Handles missing values (e.g. fills nulls or a default value) for
79 /// columns in the in the table schema not in the file schema
80 /// 2. Handles different types: if the column in the file schema has a
81 /// different type than `table_schema`, the mapper will resolve this
82 /// difference (e.g. by casting to the appropriate type)
83 ///
84 /// Returns:
85 /// * a [`SchemaMapper`]
86 /// * an ordered list of columns to project from the file
87 fn map_schema(
88 &self,
89 file_schema: &Schema,
90 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
91}
92
93/// Maps, columns from a specific file schema to the table schema.
94///
95/// See [`DefaultSchemaAdapterFactory`] for more details and examples.
96pub trait SchemaMapper: Debug + Send + Sync {
97 /// Adapts a `RecordBatch` to match the `table_schema`
98 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
99}
100
101/// Default [`SchemaAdapterFactory`] for mapping schemas.
102///
103/// This can be used to adapt file-level record batches to a table schema and
104/// implement schema evolution.
105///
106/// Given an input file schema and a table schema, this factory returns
107/// [`SchemaAdapter`] that return [`SchemaMapper`]s that:
108///
109/// 1. Reorder columns
110/// 2. Cast columns to the correct type
111/// 3. Fill missing columns with nulls
112///
113/// # Errors:
114///
115/// * If a column in the table schema is non-nullable but is not present in the
116/// file schema (i.e. it is missing), the returned mapper tries to fill it with
117/// nulls resulting in a schema error.
118///
119/// # Illustration of Schema Mapping
120///
121/// ```text
122/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
123/// ┌───────┐ ┌───────┐ │ ┌───────┐ ┌───────┐ ┌───────┐ │
124/// ││ 1.0 │ │ "foo" │ ││ NULL │ │ "foo" │ │ "1.0" │
125/// ├───────┤ ├───────┤ │ Schema mapping ├───────┤ ├───────┤ ├───────┤ │
126/// ││ 2.0 │ │ "bar" │ ││ NULL │ │ "bar" │ │ "2.0" │
127/// └───────┘ └───────┘ │────────────────▶ └───────┘ └───────┘ └───────┘ │
128/// │ │
129/// column "c" column "b"│ column "a" column "b" column "c"│
130/// │ Float64 Utf8 │ Int32 Utf8 Utf8
131/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
132/// Input Record Batch Output Record Batch
133///
134/// Schema { Schema {
135/// "c": Float64, "a": Int32,
136/// "b": Utf8, "b": Utf8,
137/// } "c": Utf8,
138/// }
139/// ```
140///
141/// # Example of using the `DefaultSchemaAdapterFactory` to map [`RecordBatch`]s
142///
143/// Note `SchemaMapping` also supports mapping partial batches, which is used as
144/// part of predicate pushdown.
145///
146/// ```
147/// # use std::sync::Arc;
148/// # use arrow::datatypes::{DataType, Field, Schema};
149/// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
150/// # use datafusion_common::record_batch;
151/// // Table has fields "a", "b" and "c"
152/// let table_schema = Schema::new(vec![
153/// Field::new("a", DataType::Int32, true),
154/// Field::new("b", DataType::Utf8, true),
155/// Field::new("c", DataType::Utf8, true),
156/// ]);
157///
158/// // create an adapter to map the table schema to the file schema
159/// let adapter = DefaultSchemaAdapterFactory::from_schema(Arc::new(table_schema));
160///
161/// // The file schema has fields "c" and "b" but "b" is stored as an 'Float64'
162/// // instead of 'Utf8'
163/// let file_schema = Schema::new(vec![
164/// Field::new("c", DataType::Utf8, true),
165/// Field::new("b", DataType::Float64, true),
166/// ]);
167///
168/// // Get a mapping from the file schema to the table schema
169/// let (mapper, _indices) = adapter.map_schema(&file_schema).unwrap();
170///
171/// let file_batch = record_batch!(
172/// ("c", Utf8, vec!["foo", "bar"]),
173/// ("b", Float64, vec![1.0, 2.0])
174/// ).unwrap();
175///
176/// let mapped_batch = mapper.map_batch(file_batch).unwrap();
177///
178/// // the mapped batch has the correct schema and the "b" column has been cast to Utf8
179/// let expected_batch = record_batch!(
180/// ("a", Int32, vec![None, None]), // missing column filled with nulls
181/// ("b", Utf8, vec!["1.0", "2.0"]), // b was cast to string and order was changed
182/// ("c", Utf8, vec!["foo", "bar"])
183/// ).unwrap();
184/// assert_eq!(mapped_batch, expected_batch);
185/// ```
186#[derive(Clone, Debug, Default)]
187pub struct DefaultSchemaAdapterFactory;
188
189impl DefaultSchemaAdapterFactory {
190 /// Create a new factory for mapping batches from a file schema to a table
191 /// schema.
192 ///
193 /// This is a convenience for [`DefaultSchemaAdapterFactory::create`] with
194 /// the same schema for both the projected table schema and the table
195 /// schema.
196 pub fn from_schema(table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
197 Self.create(Arc::clone(&table_schema), table_schema)
198 }
199}
200
201impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
202 fn create(
203 &self,
204 projected_table_schema: SchemaRef,
205 _table_schema: SchemaRef,
206 ) -> Box<dyn SchemaAdapter> {
207 Box::new(DefaultSchemaAdapter {
208 projected_table_schema,
209 })
210 }
211}
212
213/// This SchemaAdapter requires both the table schema and the projected table
214/// schema. See [`SchemaMapping`] for more details
215#[derive(Clone, Debug)]
216pub(crate) struct DefaultSchemaAdapter {
217 /// The schema for the table, projected to include only the fields being output (projected) by the
218 /// associated ParquetSource
219 projected_table_schema: SchemaRef,
220}
221
222impl SchemaAdapter for DefaultSchemaAdapter {
223 /// Map a column index in the table schema to a column index in a particular
224 /// file schema
225 ///
226 /// Panics if index is not in range for the table schema
227 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
228 let field = self.projected_table_schema.field(index);
229 Some(file_schema.fields.find(field.name())?.0)
230 }
231
232 /// Creates a `SchemaMapping` for casting or mapping the columns from the
233 /// file schema to the table schema.
234 ///
235 /// If the provided `file_schema` contains columns of a different type to
236 /// the expected `table_schema`, the method will attempt to cast the array
237 /// data from the file schema to the table schema where possible.
238 ///
239 /// Returns a [`SchemaMapping`] that can be applied to the output batch
240 /// along with an ordered list of columns to project from the file
241 fn map_schema(
242 &self,
243 file_schema: &Schema,
244 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
245 let mut projection = Vec::with_capacity(file_schema.fields().len());
246 let mut field_mappings = vec![None; self.projected_table_schema.fields().len()];
247
248 for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
249 if let Some((table_idx, table_field)) =
250 self.projected_table_schema.fields().find(file_field.name())
251 {
252 match can_cast_types(file_field.data_type(), table_field.data_type()) {
253 true => {
254 field_mappings[table_idx] = Some(projection.len());
255 projection.push(file_idx);
256 }
257 false => {
258 return plan_err!(
259 "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
260 file_field.name(),
261 file_field.data_type(),
262 table_field.data_type()
263 )
264 }
265 }
266 }
267 }
268
269 Ok((
270 Arc::new(SchemaMapping {
271 projected_table_schema: Arc::clone(&self.projected_table_schema),
272 field_mappings,
273 }),
274 projection,
275 ))
276 }
277}
278
279/// The SchemaMapping struct holds a mapping from the file schema to the table
280/// schema and any necessary type conversions.
281///
282/// [`map_batch`] is used by the ParquetOpener to produce a RecordBatch which
283/// has the projected schema, since that's the schema which is supposed to come
284/// out of the execution of this query. Thus `map_batch` uses
285/// `projected_table_schema` as it can only operate on the projected fields.
286///
287/// [`map_batch`]: Self::map_batch
288#[derive(Debug)]
289pub struct SchemaMapping {
290 /// The schema of the table. This is the expected schema after conversion
291 /// and it should match the schema of the query result.
292 projected_table_schema: SchemaRef,
293 /// Mapping from field index in `projected_table_schema` to index in
294 /// projected file_schema.
295 ///
296 /// They are Options instead of just plain `usize`s because the table could
297 /// have fields that don't exist in the file.
298 field_mappings: Vec<Option<usize>>,
299}
300
301impl SchemaMapper for SchemaMapping {
302 /// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
303 /// conversions.
304 /// The produced RecordBatch has a schema that contains only the projected columns.
305 fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
306 let batch_rows = batch.num_rows();
307 let batch_cols = batch.columns().to_vec();
308
309 let cols = self
310 .projected_table_schema
311 // go through each field in the projected schema
312 .fields()
313 .iter()
314 // and zip it with the index that maps fields from the projected table schema to the
315 // projected file schema in `batch`
316 .zip(&self.field_mappings)
317 // and for each one...
318 .map(|(field, file_idx)| {
319 file_idx.map_or_else(
320 // If this field only exists in the table, and not in the file, then we know
321 // that it's null, so just return that.
322 || Ok(new_null_array(field.data_type(), batch_rows)),
323 // However, if it does exist in both, then try to cast it to the correct output
324 // type
325 |batch_idx| cast(&batch_cols[batch_idx], field.data_type()),
326 )
327 })
328 .collect::<datafusion_common::Result<Vec<_>, _>>()?;
329
330 // Necessary to handle empty batches
331 let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
332
333 let schema = Arc::clone(&self.projected_table_schema);
334 let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
335 Ok(record_batch)
336 }
337}