datafusion_datasource_parquet/schema_coercion.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow-schema coercion utilities used by the Parquet reader to make a
19//! file schema match the table schema (binary→string, regular→view,
20//! INT96→Timestamp).
21//!
22//! These helpers are independent of the [`ParquetFormat`](crate::file_format::ParquetFormat)
23//! type and several have been re-exported at the crate root for use by
24//! callers outside the format implementation.
25
26use std::cell::RefCell;
27use std::collections::{HashMap, HashSet};
28use std::rc::Rc;
29use std::sync::Arc;
30
31use arrow::datatypes::{DataType, Field, FieldRef, Schema, TimeUnit};
32use parquet::basic::Type;
33use parquet::schema::types::SchemaDescriptor;
34
35/// Apply necessary schema type coercions to make file schema match table schema.
36///
37/// This function performs two main types of transformations in a single pass:
38/// 1. Binary types to string types conversion - Converts binary data types to their
39/// corresponding string types when the table schema expects string data
40/// 2. Regular to view types conversion - Converts standard string/binary types to
41/// view types when the table schema uses view types
42///
43/// # Arguments
44/// * `table_schema` - The table schema containing the desired types
45/// * `file_schema` - The file schema to be transformed
46///
47/// # Returns
48/// * `Some(Schema)` - If any transformations were applied, returns the transformed schema
49/// * `None` - If no transformations were needed
50pub fn apply_file_schema_type_coercions(
51 table_schema: &Schema,
52 file_schema: &Schema,
53) -> Option<Schema> {
54 let mut needs_view_transform = false;
55 let mut needs_string_transform = false;
56
57 // Create a mapping of table field names to their data types for fast lookup
58 // and simultaneously check if we need any transformations
59 let table_fields: HashMap<_, _> = table_schema
60 .fields()
61 .iter()
62 .map(|f| {
63 let dt = f.data_type();
64 // Check if we need view type transformation
65 if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
66 needs_view_transform = true;
67 }
68 // Check if we need string type transformation
69 if matches!(
70 dt,
71 &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
72 ) {
73 needs_string_transform = true;
74 }
75
76 (f.name(), dt)
77 })
78 .collect();
79
80 // Early return if no transformation needed
81 if !needs_view_transform && !needs_string_transform {
82 return None;
83 }
84
85 let transformed_fields: Vec<Arc<Field>> = file_schema
86 .fields()
87 .iter()
88 .map(|field| {
89 let field_name = field.name();
90 let field_type = field.data_type();
91
92 // Look up the corresponding field type in the table schema
93 if let Some(table_type) = table_fields.get(field_name) {
94 match (table_type, field_type) {
95 // table schema uses string type, coerce the file schema to use string type
96 (
97 &DataType::Utf8,
98 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
99 ) => {
100 return field_with_new_type(field, DataType::Utf8);
101 }
102 // table schema uses large string type, coerce the file schema to use large string type
103 (
104 &DataType::LargeUtf8,
105 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
106 ) => {
107 return field_with_new_type(field, DataType::LargeUtf8);
108 }
109 // table schema uses string view type, coerce the file schema to use view type
110 (
111 &DataType::Utf8View,
112 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
113 ) => {
114 return field_with_new_type(field, DataType::Utf8View);
115 }
116 // Handle view type conversions
117 (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
118 return field_with_new_type(field, DataType::Utf8View);
119 }
120 (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
121 return field_with_new_type(field, DataType::BinaryView);
122 }
123 _ => {}
124 }
125 }
126
127 // If no transformation is needed, keep the original field
128 Arc::clone(field)
129 })
130 .collect();
131
132 Some(Schema::new_with_metadata(
133 transformed_fields,
134 file_schema.metadata.clone(),
135 ))
136}
137
138/// Coerces the file schema's Timestamps to the provided TimeUnit if the
139/// Parquet schema contains INT96.
140///
141/// Deprecated wrapper around [`Int96Coercer`]; use the builder directly
142/// instead — it also supports attaching a timezone via
143/// [`Int96Coercer::with_timezone`].
144#[deprecated(since = "53.2.0", note = "use `Int96Coercer` instead")]
145pub fn coerce_int96_to_resolution(
146 parquet_schema: &SchemaDescriptor,
147 file_schema: &Schema,
148 time_unit: &TimeUnit,
149) -> Option<Schema> {
150 Int96Coercer::new(parquet_schema, file_schema, time_unit).coerce()
151}
152
153/// Builder for coercing INT96-originated Timestamp columns in `file_schema`
154/// to a specific [`TimeUnit`], optionally attaching a timezone.
155///
156/// INT96 is the legacy Parquet representation that systems like Spark use for
157/// timestamps. Arrow surfaces it as `Timestamp(Nanosecond, None)`, but the
158/// underlying values are written as UTC-adjusted instants. Use this builder
159/// to:
160///
161/// - Coerce INT96-derived columns to a smaller [`TimeUnit`] (e.g. microseconds)
162/// to extend the representable date range.
163/// - Optionally attach a timezone so the resulting Arrow type carries the
164/// timezone-aware semantic (`Timestamp(unit, Some(tz))`). Without a
165/// timezone, INT96-derived columns become `Timestamp(unit, None)` — the
166/// historical default.
167///
168/// Returns `None` if `file_schema` contains no INT96-derived columns.
169///
170/// # Example
171///
172/// ```ignore
173/// use std::sync::Arc;
174/// use arrow::datatypes::TimeUnit;
175/// use datafusion_datasource_parquet::Int96Coercer;
176///
177/// let coerced = Int96Coercer::new(parquet_schema, file_schema, &TimeUnit::Microsecond)
178/// .with_timezone(Some(Arc::from("UTC")))
179/// .coerce();
180/// ```
181pub struct Int96Coercer<'a> {
182 parquet_schema: &'a SchemaDescriptor,
183 file_schema: &'a Schema,
184 time_unit: &'a TimeUnit,
185 timezone: Option<Arc<str>>,
186}
187
188impl<'a> Int96Coercer<'a> {
189 /// Create a new builder. INT96-derived columns will coerce to
190 /// `Timestamp(time_unit, None)` unless [`Self::with_timezone`] is set.
191 pub fn new(
192 parquet_schema: &'a SchemaDescriptor,
193 file_schema: &'a Schema,
194 time_unit: &'a TimeUnit,
195 ) -> Self {
196 Self {
197 parquet_schema,
198 file_schema,
199 time_unit,
200 timezone: None,
201 }
202 }
203
204 /// Attach a timezone to INT96-derived columns. When `Some`, INT96-derived
205 /// columns coerce to `Timestamp(time_unit, Some(timezone))` instead of
206 /// the default `Timestamp(time_unit, None)`. Spark and other systems
207 /// write INT96 as UTC-adjusted instants, so callers that need the
208 /// resulting Arrow type to be timezone-aware should pass
209 /// `Some(Arc::from("UTC"))`.
210 pub fn with_timezone(mut self, timezone: Option<Arc<str>>) -> Self {
211 self.timezone = timezone;
212 self
213 }
214
215 /// Run the coercion, returning the rewritten schema or `None` if
216 /// `file_schema` contains no INT96-derived columns.
217 pub fn coerce(self) -> Option<Schema> {
218 let Self {
219 parquet_schema,
220 file_schema,
221 time_unit,
222 timezone,
223 } = self;
224 coerce_int96_to_resolution_impl(
225 parquet_schema,
226 file_schema,
227 time_unit,
228 timezone.as_ref(),
229 )
230 }
231}
232
233fn coerce_int96_to_resolution_impl(
234 parquet_schema: &SchemaDescriptor,
235 file_schema: &Schema,
236 time_unit: &TimeUnit,
237 timezone: Option<&Arc<str>>,
238) -> Option<Schema> {
239 // Traverse the parquet_schema columns looking for int96 physical types. If encountered, insert
240 // the field's full path into a set.
241 let int96_fields: HashSet<_> = parquet_schema
242 .columns()
243 .iter()
244 .filter(|f| f.physical_type() == Type::INT96)
245 .map(|f| f.path().string())
246 .collect();
247
248 if int96_fields.is_empty() {
249 // The schema doesn't contain any int96 fields, so skip the remaining logic.
250 return None;
251 }
252
253 // Do a DFS into the schema using a stack, looking for timestamp(nanos) fields that originated
254 // as int96 to coerce to the provided time_unit.
255
256 type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
257 type StackContext<'a> = (
258 Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1") for the current field.
259 &'a FieldRef, // The current field to be processed.
260 NestedFields, // The parent's fields that this field will be (possibly) type-coerced and
261 // inserted into. All fields have a parent, so this is not an Option type.
262 Option<NestedFields>, // Nested types need to create their own vector of fields for their
263 // children. For primitive types this will remain None. For nested
264 // types it is None the first time they are processed. Then, we
265 // instantiate a vector for its children, push the field back onto the
266 // stack to be processed again, and DFS into its children. The next
267 // time we process the field, we know we have DFS'd into the children
268 // because this field is Some.
269 );
270
271 // This is our top-level fields from which we will construct our schema. We pass this into our
272 // initial stack context as the parent fields, and the DFS populates it.
273 let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
274
275 // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we
276 // use some sort of LPM data structure to check if we're currently DFS'ing nested types that are
277 // in a column path that contains an int96. That can be a future optimization for large schemas.
278 let transformed_schema = {
279 // Populate the stack with our top-level fields.
280 let mut stack: Vec<StackContext> = file_schema
281 .fields()
282 .iter()
283 .rev()
284 .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
285 .collect();
286
287 // Pop fields to DFS into until we have exhausted the stack.
288 while let Some((parquet_path, current_field, parent_fields, child_fields)) =
289 stack.pop()
290 {
291 match (current_field.data_type(), child_fields) {
292 (DataType::Struct(unprocessed_children), None) => {
293 // This is the first time popping off this struct. We don't yet know the
294 // correct types of its children (i.e., if they need coercing) so we create
295 // a vector for child_fields, push the struct node back onto the stack to be
296 // processed again (see below) after processing all its children.
297 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
298 unprocessed_children.len(),
299 )));
300 // Note that here we push the struct back onto the stack with its
301 // parent_fields in the same position, now with Some(child_fields).
302 stack.push((
303 parquet_path.clone(),
304 current_field,
305 parent_fields,
306 Some(Rc::clone(&child_fields)),
307 ));
308 // Push all the children in reverse to maintain original schema order due to
309 // stack processing.
310 for child in unprocessed_children.into_iter().rev() {
311 let mut child_path = parquet_path.clone();
312 // Build up a normalized path that we'll use as a key into the original
313 // int96_fields set above to test if this originated as int96.
314 child_path.push(".");
315 child_path.push(child.name());
316 // Note that here we push the field onto the stack using the struct's
317 // new child_fields vector as the field's parent_fields.
318 stack.push((child_path, child, Rc::clone(&child_fields), None));
319 }
320 }
321 (DataType::Struct(unprocessed_children), Some(processed_children)) => {
322 // This is the second time popping off this struct. The child_fields vector
323 // now contains each field that has been DFS'd into, and we can construct
324 // the resulting struct with correct child types.
325 let processed_children = processed_children.borrow();
326 assert_eq!(processed_children.len(), unprocessed_children.len());
327 let processed_struct = Field::new_struct(
328 current_field.name(),
329 processed_children.as_slice(),
330 current_field.is_nullable(),
331 );
332 parent_fields.borrow_mut().push(Arc::new(processed_struct));
333 }
334 (DataType::List(unprocessed_child), None) => {
335 // This is the first time popping off this list. See struct docs above.
336 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
337 stack.push((
338 parquet_path.clone(),
339 current_field,
340 parent_fields,
341 Some(Rc::clone(&child_fields)),
342 ));
343 let mut child_path = parquet_path.clone();
344 // Spark uses a definition for arrays/lists that results in a group
345 // named "list" that is not maintained when parsing to Arrow. We just push
346 // this name into the path.
347 child_path.push(".list.");
348 child_path.push(unprocessed_child.name());
349 stack.push((
350 child_path.clone(),
351 unprocessed_child,
352 Rc::clone(&child_fields),
353 None,
354 ));
355 }
356 (DataType::List(_), Some(processed_children)) => {
357 // This is the second time popping off this list. See struct docs above.
358 let processed_children = processed_children.borrow();
359 assert_eq!(processed_children.len(), 1);
360 let processed_list = Field::new_list(
361 current_field.name(),
362 Arc::clone(&processed_children[0]),
363 current_field.is_nullable(),
364 );
365 parent_fields.borrow_mut().push(Arc::new(processed_list));
366 }
367 (DataType::Map(unprocessed_child, _), None) => {
368 // This is the first time popping off this map. See struct docs above.
369 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
370 stack.push((
371 parquet_path.clone(),
372 current_field,
373 parent_fields,
374 Some(Rc::clone(&child_fields)),
375 ));
376 let mut child_path = parquet_path.clone();
377 child_path.push(".");
378 child_path.push(unprocessed_child.name());
379 stack.push((
380 child_path.clone(),
381 unprocessed_child,
382 Rc::clone(&child_fields),
383 None,
384 ));
385 }
386 (DataType::Map(_, sorted), Some(processed_children)) => {
387 // This is the second time popping off this map. See struct docs above.
388 let processed_children = processed_children.borrow();
389 assert_eq!(processed_children.len(), 1);
390 let processed_map = Field::new(
391 current_field.name(),
392 DataType::Map(Arc::clone(&processed_children[0]), *sorted),
393 current_field.is_nullable(),
394 );
395 parent_fields.borrow_mut().push(Arc::new(processed_map));
396 }
397 (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
398 if int96_fields.contains(parquet_path.concat().as_str()) =>
399 // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct
400 // time_unit, optionally attaching the requested timezone.
401 {
402 parent_fields.borrow_mut().push(field_with_new_type(
403 current_field,
404 DataType::Timestamp(*time_unit, timezone.cloned()),
405 ));
406 }
407 // Other types can be cloned as they are.
408 _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
409 }
410 }
411 assert_eq!(fields.borrow().len(), file_schema.fields.len());
412 Schema::new_with_metadata(
413 fields.borrow_mut().clone(),
414 file_schema.metadata.clone(),
415 )
416 };
417
418 Some(transformed_schema)
419}
420
421/// Coerces the file schema if the table schema uses a view type.
422#[deprecated(
423 since = "47.0.0",
424 note = "Use `apply_file_schema_type_coercions` instead"
425)]
426pub fn coerce_file_schema_to_view_type(
427 table_schema: &Schema,
428 file_schema: &Schema,
429) -> Option<Schema> {
430 let mut transform = false;
431 let table_fields: HashMap<_, _> = table_schema
432 .fields
433 .iter()
434 .map(|f| {
435 let dt = f.data_type();
436 if dt.equals_datatype(&DataType::Utf8View)
437 || dt.equals_datatype(&DataType::BinaryView)
438 {
439 transform = true;
440 }
441 (f.name(), dt)
442 })
443 .collect();
444
445 if !transform {
446 return None;
447 }
448
449 let transformed_fields: Vec<Arc<Field>> = file_schema
450 .fields
451 .iter()
452 .map(
453 |field| match (table_fields.get(field.name()), field.data_type()) {
454 (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
455 field_with_new_type(field, DataType::Utf8View)
456 }
457 (
458 Some(DataType::BinaryView),
459 DataType::Binary | DataType::LargeBinary,
460 ) => field_with_new_type(field, DataType::BinaryView),
461 _ => Arc::clone(field),
462 },
463 )
464 .collect();
465
466 Some(Schema::new_with_metadata(
467 transformed_fields,
468 file_schema.metadata.clone(),
469 ))
470}
471
472/// If the table schema uses a string type, coerce the file schema to use a string type.
473///
474/// See [`ParquetFormat::binary_as_string`](crate::file_format::ParquetFormat::binary_as_string) for details
475#[deprecated(
476 since = "47.0.0",
477 note = "Use `apply_file_schema_type_coercions` instead"
478)]
479pub fn coerce_file_schema_to_string_type(
480 table_schema: &Schema,
481 file_schema: &Schema,
482) -> Option<Schema> {
483 let mut transform = false;
484 let table_fields: HashMap<_, _> = table_schema
485 .fields
486 .iter()
487 .map(|f| (f.name(), f.data_type()))
488 .collect();
489 let transformed_fields: Vec<Arc<Field>> = file_schema
490 .fields
491 .iter()
492 .map(
493 |field| match (table_fields.get(field.name()), field.data_type()) {
494 // table schema uses string type, coerce the file schema to use string type
495 (
496 Some(DataType::Utf8),
497 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
498 ) => {
499 transform = true;
500 field_with_new_type(field, DataType::Utf8)
501 }
502 // table schema uses large string type, coerce the file schema to use large string type
503 (
504 Some(DataType::LargeUtf8),
505 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
506 ) => {
507 transform = true;
508 field_with_new_type(field, DataType::LargeUtf8)
509 }
510 // table schema uses string view type, coerce the file schema to use view type
511 (
512 Some(DataType::Utf8View),
513 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
514 ) => {
515 transform = true;
516 field_with_new_type(field, DataType::Utf8View)
517 }
518 _ => Arc::clone(field),
519 },
520 )
521 .collect();
522
523 if !transform {
524 None
525 } else {
526 Some(Schema::new_with_metadata(
527 transformed_fields,
528 file_schema.metadata.clone(),
529 ))
530 }
531}
532
533/// Create a new field with the specified data type, copying the other
534/// properties from the input field
535fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
536 Arc::new(field.as_ref().clone().with_data_type(new_type))
537}
538
539/// Transform a schema to use view types for Utf8 and Binary
540///
541/// See [`ParquetFormat::force_view_types`](crate::file_format::ParquetFormat::force_view_types) for details
542pub fn transform_schema_to_view(schema: &Schema) -> Schema {
543 let transformed_fields: Vec<Arc<Field>> = schema
544 .fields
545 .iter()
546 .map(|field| match field.data_type() {
547 DataType::Utf8 | DataType::LargeUtf8 => {
548 field_with_new_type(field, DataType::Utf8View)
549 }
550 DataType::Binary | DataType::LargeBinary => {
551 field_with_new_type(field, DataType::BinaryView)
552 }
553 _ => Arc::clone(field),
554 })
555 .collect();
556 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
557}
558
559/// Transform a schema so that any binary types are strings
560pub fn transform_binary_to_string(schema: &Schema) -> Schema {
561 let transformed_fields: Vec<Arc<Field>> = schema
562 .fields
563 .iter()
564 .map(|field| match field.data_type() {
565 DataType::Binary => field_with_new_type(field, DataType::Utf8),
566 DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
567 DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
568 _ => Arc::clone(field),
569 })
570 .collect();
571 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
572}
573#[cfg(test)]
574mod tests {
575 use parquet::arrow::parquet_to_arrow_schema;
576
577 use super::*;
578
579 use parquet::schema::parser::parse_message_type;
580
581 #[test]
582 fn coerce_int96_to_resolution_with_mixed_timestamps() {
583 // Unclear if Spark (or other writer) could generate a file with mixed timestamps like this,
584 // but we want to test the scenario just in case since it's at least a valid schema as far
585 // as the Parquet spec is concerned.
586 let spark_schema = "
587 message spark_schema {
588 optional int96 c0;
589 optional int64 c1 (TIMESTAMP(NANOS,true));
590 optional int64 c2 (TIMESTAMP(NANOS,false));
591 optional int64 c3 (TIMESTAMP(MILLIS,true));
592 optional int64 c4 (TIMESTAMP(MILLIS,false));
593 optional int64 c5 (TIMESTAMP(MICROS,true));
594 optional int64 c6 (TIMESTAMP(MICROS,false));
595 }
596 ";
597
598 let schema = parse_message_type(spark_schema).expect("should parse schema");
599 let descr = SchemaDescriptor::new(Arc::new(schema));
600
601 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
602
603 let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond)
604 .coerce()
605 .unwrap();
606
607 // Only the first field (c0) should be converted to a microsecond timestamp because it's the
608 // only timestamp that originated from an INT96.
609 let expected_schema = Schema::new(vec![
610 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
611 Field::new(
612 "c1",
613 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
614 true,
615 ),
616 Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
617 Field::new(
618 "c3",
619 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
620 true,
621 ),
622 Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
623 Field::new(
624 "c5",
625 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
626 true,
627 ),
628 Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
629 ]);
630
631 assert_eq!(result, expected_schema);
632 }
633
634 #[test]
635 fn coerce_int96_to_resolution_with_tz_applies_timezone() {
636 // Same input schema as `coerce_int96_to_resolution_with_mixed_timestamps`, but with a
637 // non-empty `timezone` argument. Only c0 (the INT96 column) should pick up the timezone;
638 // the other timestamp columns must keep whatever timezone they were declared with.
639 let spark_schema = "
640 message spark_schema {
641 optional int96 c0;
642 optional int64 c1 (TIMESTAMP(NANOS,true));
643 optional int64 c2 (TIMESTAMP(NANOS,false));
644 optional int64 c3 (TIMESTAMP(MILLIS,true));
645 optional int64 c4 (TIMESTAMP(MILLIS,false));
646 optional int64 c5 (TIMESTAMP(MICROS,true));
647 optional int64 c6 (TIMESTAMP(MICROS,false));
648 }
649 ";
650
651 let schema = parse_message_type(spark_schema).expect("should parse schema");
652 let descr = SchemaDescriptor::new(Arc::new(schema));
653
654 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
655
656 let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond)
657 .with_timezone(Some(Arc::from("UTC")))
658 .coerce()
659 .unwrap();
660
661 let expected_schema = Schema::new(vec![
662 Field::new(
663 "c0",
664 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
665 true,
666 ),
667 Field::new(
668 "c1",
669 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
670 true,
671 ),
672 Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
673 Field::new(
674 "c3",
675 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
676 true,
677 ),
678 Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
679 Field::new(
680 "c5",
681 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
682 true,
683 ),
684 Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
685 ]);
686
687 assert_eq!(result, expected_schema);
688 }
689
690 #[test]
691 fn coerce_int96_to_resolution_with_nested_types() {
692 // This schema is derived from Comet's CometFuzzTestSuite ParquetGenerator only using int96
693 // primitive types with generateStruct, generateArray, and generateMap set to true, with one
694 // additional field added to c4's struct to make sure all fields in a struct get modified.
695 // https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala
696 let spark_schema = "
697 message spark_schema {
698 optional int96 c0;
699 optional group c1 {
700 optional int96 c0;
701 }
702 optional group c2 {
703 optional group c0 (LIST) {
704 repeated group list {
705 optional int96 element;
706 }
707 }
708 }
709 optional group c3 (LIST) {
710 repeated group list {
711 optional int96 element;
712 }
713 }
714 optional group c4 (LIST) {
715 repeated group list {
716 optional group element {
717 optional int96 c0;
718 optional int96 c1;
719 }
720 }
721 }
722 optional group c5 (MAP) {
723 repeated group key_value {
724 required int96 key;
725 optional int96 value;
726 }
727 }
728 optional group c6 (LIST) {
729 repeated group list {
730 optional group element (MAP) {
731 repeated group key_value {
732 required int96 key;
733 optional int96 value;
734 }
735 }
736 }
737 }
738 }
739 ";
740
741 let schema = parse_message_type(spark_schema).expect("should parse schema");
742 let descr = SchemaDescriptor::new(Arc::new(schema));
743
744 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
745
746 let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond)
747 .coerce()
748 .unwrap();
749
750 let expected_schema = Schema::new(vec![
751 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
752 Field::new_struct(
753 "c1",
754 vec![Field::new(
755 "c0",
756 DataType::Timestamp(TimeUnit::Microsecond, None),
757 true,
758 )],
759 true,
760 ),
761 Field::new_struct(
762 "c2",
763 vec![Field::new_list(
764 "c0",
765 Field::new(
766 "element",
767 DataType::Timestamp(TimeUnit::Microsecond, None),
768 true,
769 ),
770 true,
771 )],
772 true,
773 ),
774 Field::new_list(
775 "c3",
776 Field::new(
777 "element",
778 DataType::Timestamp(TimeUnit::Microsecond, None),
779 true,
780 ),
781 true,
782 ),
783 Field::new_list(
784 "c4",
785 Field::new_struct(
786 "element",
787 vec![
788 Field::new(
789 "c0",
790 DataType::Timestamp(TimeUnit::Microsecond, None),
791 true,
792 ),
793 Field::new(
794 "c1",
795 DataType::Timestamp(TimeUnit::Microsecond, None),
796 true,
797 ),
798 ],
799 true,
800 ),
801 true,
802 ),
803 Field::new_map(
804 "c5",
805 "key_value",
806 Field::new(
807 "key",
808 DataType::Timestamp(TimeUnit::Microsecond, None),
809 false,
810 ),
811 Field::new(
812 "value",
813 DataType::Timestamp(TimeUnit::Microsecond, None),
814 true,
815 ),
816 false,
817 true,
818 ),
819 Field::new_list(
820 "c6",
821 Field::new_map(
822 "element",
823 "key_value",
824 Field::new(
825 "key",
826 DataType::Timestamp(TimeUnit::Microsecond, None),
827 false,
828 ),
829 Field::new(
830 "value",
831 DataType::Timestamp(TimeUnit::Microsecond, None),
832 true,
833 ),
834 false,
835 true,
836 ),
837 true,
838 ),
839 ]);
840
841 assert_eq!(result, expected_schema);
842 }
843}