Skip to main content

datafusion_datasource_parquet/
schema_coercion.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Arrow-schema coercion utilities used by the Parquet reader to make a
19//! file schema match the table schema (binary→string, regular→view,
20//! INT96→Timestamp).
21//!
22//! These helpers are independent of the [`ParquetFormat`](crate::file_format::ParquetFormat)
23//! type and several have been re-exported at the crate root for use by
24//! callers outside the format implementation.
25
26use std::cell::RefCell;
27use std::collections::{HashMap, HashSet};
28use std::rc::Rc;
29use std::sync::Arc;
30
31use arrow::datatypes::{DataType, Field, FieldRef, Schema, TimeUnit};
32use parquet::basic::Type;
33use parquet::schema::types::SchemaDescriptor;
34
35/// Apply necessary schema type coercions to make file schema match table schema.
36///
37/// This function performs two main types of transformations in a single pass:
38/// 1. Binary types to string types conversion - Converts binary data types to their
39///    corresponding string types when the table schema expects string data
40/// 2. Regular to view types conversion - Converts standard string/binary types to
41///    view types when the table schema uses view types
42///
43/// # Arguments
44/// * `table_schema` - The table schema containing the desired types
45/// * `file_schema` - The file schema to be transformed
46///
47/// # Returns
48/// * `Some(Schema)` - If any transformations were applied, returns the transformed schema
49/// * `None` - If no transformations were needed
50pub fn apply_file_schema_type_coercions(
51    table_schema: &Schema,
52    file_schema: &Schema,
53) -> Option<Schema> {
54    let mut needs_view_transform = false;
55    let mut needs_string_transform = false;
56
57    // Create a mapping of table field names to their data types for fast lookup
58    // and simultaneously check if we need any transformations
59    let table_fields: HashMap<_, _> = table_schema
60        .fields()
61        .iter()
62        .map(|f| {
63            let dt = f.data_type();
64            // Check if we need view type transformation
65            if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
66                needs_view_transform = true;
67            }
68            // Check if we need string type transformation
69            if matches!(
70                dt,
71                &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
72            ) {
73                needs_string_transform = true;
74            }
75
76            (f.name(), dt)
77        })
78        .collect();
79
80    // Early return if no transformation needed
81    if !needs_view_transform && !needs_string_transform {
82        return None;
83    }
84
85    let transformed_fields: Vec<Arc<Field>> = file_schema
86        .fields()
87        .iter()
88        .map(|field| {
89            let field_name = field.name();
90            let field_type = field.data_type();
91
92            // Look up the corresponding field type in the table schema
93            if let Some(table_type) = table_fields.get(field_name) {
94                match (table_type, field_type) {
95                    // table schema uses string type, coerce the file schema to use string type
96                    (
97                        &DataType::Utf8,
98                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
99                    ) => {
100                        return field_with_new_type(field, DataType::Utf8);
101                    }
102                    // table schema uses large string type, coerce the file schema to use large string type
103                    (
104                        &DataType::LargeUtf8,
105                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
106                    ) => {
107                        return field_with_new_type(field, DataType::LargeUtf8);
108                    }
109                    // table schema uses string view type, coerce the file schema to use view type
110                    (
111                        &DataType::Utf8View,
112                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
113                    ) => {
114                        return field_with_new_type(field, DataType::Utf8View);
115                    }
116                    // Handle view type conversions
117                    (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
118                        return field_with_new_type(field, DataType::Utf8View);
119                    }
120                    (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
121                        return field_with_new_type(field, DataType::BinaryView);
122                    }
123                    _ => {}
124                }
125            }
126
127            // If no transformation is needed, keep the original field
128            Arc::clone(field)
129        })
130        .collect();
131
132    Some(Schema::new_with_metadata(
133        transformed_fields,
134        file_schema.metadata.clone(),
135    ))
136}
137
138/// Coerces the file schema's Timestamps to the provided TimeUnit if the
139/// Parquet schema contains INT96.
140///
141/// Deprecated wrapper around [`Int96Coercer`]; use the builder directly
142/// instead — it also supports attaching a timezone via
143/// [`Int96Coercer::with_timezone`].
144#[deprecated(since = "53.2.0", note = "use `Int96Coercer` instead")]
145pub fn coerce_int96_to_resolution(
146    parquet_schema: &SchemaDescriptor,
147    file_schema: &Schema,
148    time_unit: &TimeUnit,
149) -> Option<Schema> {
150    Int96Coercer::new(parquet_schema, file_schema, time_unit).coerce()
151}
152
153/// Builder for coercing INT96-originated Timestamp columns in `file_schema`
154/// to a specific [`TimeUnit`], optionally attaching a timezone.
155///
156/// INT96 is the legacy Parquet representation that systems like Spark use for
157/// timestamps. Arrow surfaces it as `Timestamp(Nanosecond, None)`, but the
158/// underlying values are written as UTC-adjusted instants. Use this builder
159/// to:
160///
161/// - Coerce INT96-derived columns to a smaller [`TimeUnit`] (e.g. microseconds)
162///   to extend the representable date range.
163/// - Optionally attach a timezone so the resulting Arrow type carries the
164///   timezone-aware semantic (`Timestamp(unit, Some(tz))`). Without a
165///   timezone, INT96-derived columns become `Timestamp(unit, None)` — the
166///   historical default.
167///
168/// Returns `None` if `file_schema` contains no INT96-derived columns.
169///
170/// # Example
171///
172/// ```ignore
173/// use std::sync::Arc;
174/// use arrow::datatypes::TimeUnit;
175/// use datafusion_datasource_parquet::Int96Coercer;
176///
177/// let coerced = Int96Coercer::new(parquet_schema, file_schema, &TimeUnit::Microsecond)
178///     .with_timezone(Some(Arc::from("UTC")))
179///     .coerce();
180/// ```
181pub struct Int96Coercer<'a> {
182    parquet_schema: &'a SchemaDescriptor,
183    file_schema: &'a Schema,
184    time_unit: &'a TimeUnit,
185    timezone: Option<Arc<str>>,
186}
187
188impl<'a> Int96Coercer<'a> {
189    /// Create a new builder. INT96-derived columns will coerce to
190    /// `Timestamp(time_unit, None)` unless [`Self::with_timezone`] is set.
191    pub fn new(
192        parquet_schema: &'a SchemaDescriptor,
193        file_schema: &'a Schema,
194        time_unit: &'a TimeUnit,
195    ) -> Self {
196        Self {
197            parquet_schema,
198            file_schema,
199            time_unit,
200            timezone: None,
201        }
202    }
203
204    /// Attach a timezone to INT96-derived columns. When `Some`, INT96-derived
205    /// columns coerce to `Timestamp(time_unit, Some(timezone))` instead of
206    /// the default `Timestamp(time_unit, None)`. Spark and other systems
207    /// write INT96 as UTC-adjusted instants, so callers that need the
208    /// resulting Arrow type to be timezone-aware should pass
209    /// `Some(Arc::from("UTC"))`.
210    pub fn with_timezone(mut self, timezone: Option<Arc<str>>) -> Self {
211        self.timezone = timezone;
212        self
213    }
214
215    /// Run the coercion, returning the rewritten schema or `None` if
216    /// `file_schema` contains no INT96-derived columns.
217    pub fn coerce(self) -> Option<Schema> {
218        let Self {
219            parquet_schema,
220            file_schema,
221            time_unit,
222            timezone,
223        } = self;
224        coerce_int96_to_resolution_impl(
225            parquet_schema,
226            file_schema,
227            time_unit,
228            timezone.as_ref(),
229        )
230    }
231}
232
233fn coerce_int96_to_resolution_impl(
234    parquet_schema: &SchemaDescriptor,
235    file_schema: &Schema,
236    time_unit: &TimeUnit,
237    timezone: Option<&Arc<str>>,
238) -> Option<Schema> {
239    // Traverse the parquet_schema columns looking for int96 physical types. If encountered, insert
240    // the field's full path into a set.
241    let int96_fields: HashSet<_> = parquet_schema
242        .columns()
243        .iter()
244        .filter(|f| f.physical_type() == Type::INT96)
245        .map(|f| f.path().string())
246        .collect();
247
248    if int96_fields.is_empty() {
249        // The schema doesn't contain any int96 fields, so skip the remaining logic.
250        return None;
251    }
252
253    // Do a DFS into the schema using a stack, looking for timestamp(nanos) fields that originated
254    // as int96 to coerce to the provided time_unit.
255
256    type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
257    type StackContext<'a> = (
258        Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1") for the current field.
259        &'a FieldRef, // The current field to be processed.
260        NestedFields, // The parent's fields that this field will be (possibly) type-coerced and
261        // inserted into. All fields have a parent, so this is not an Option type.
262        Option<NestedFields>, // Nested types need to create their own vector of fields for their
263                              // children. For primitive types this will remain None. For nested
264                              // types it is None the first time they are processed. Then, we
265                              // instantiate a vector for its children, push the field back onto the
266                              // stack to be processed again, and DFS into its children. The next
267                              // time we process the field, we know we have DFS'd into the children
268                              // because this field is Some.
269    );
270
271    // This is our top-level fields from which we will construct our schema. We pass this into our
272    // initial stack context as the parent fields, and the DFS populates it.
273    let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
274
275    // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we
276    // use some sort of LPM data structure to check if we're currently DFS'ing nested types that are
277    // in a column path that contains an int96. That can be a future optimization for large schemas.
278    let transformed_schema = {
279        // Populate the stack with our top-level fields.
280        let mut stack: Vec<StackContext> = file_schema
281            .fields()
282            .iter()
283            .rev()
284            .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
285            .collect();
286
287        // Pop fields to DFS into until we have exhausted the stack.
288        while let Some((parquet_path, current_field, parent_fields, child_fields)) =
289            stack.pop()
290        {
291            match (current_field.data_type(), child_fields) {
292                (DataType::Struct(unprocessed_children), None) => {
293                    // This is the first time popping off this struct. We don't yet know the
294                    // correct types of its children (i.e., if they need coercing) so we create
295                    // a vector for child_fields, push the struct node back onto the stack to be
296                    // processed again (see below) after processing all its children.
297                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
298                        unprocessed_children.len(),
299                    )));
300                    // Note that here we push the struct back onto the stack with its
301                    // parent_fields in the same position, now with Some(child_fields).
302                    stack.push((
303                        parquet_path.clone(),
304                        current_field,
305                        parent_fields,
306                        Some(Rc::clone(&child_fields)),
307                    ));
308                    // Push all the children in reverse to maintain original schema order due to
309                    // stack processing.
310                    for child in unprocessed_children.into_iter().rev() {
311                        let mut child_path = parquet_path.clone();
312                        // Build up a normalized path that we'll use as a key into the original
313                        // int96_fields set above to test if this originated as int96.
314                        child_path.push(".");
315                        child_path.push(child.name());
316                        // Note that here we push the field onto the stack using the struct's
317                        // new child_fields vector as the field's parent_fields.
318                        stack.push((child_path, child, Rc::clone(&child_fields), None));
319                    }
320                }
321                (DataType::Struct(unprocessed_children), Some(processed_children)) => {
322                    // This is the second time popping off this struct. The child_fields vector
323                    // now contains each field that has been DFS'd into, and we can construct
324                    // the resulting struct with correct child types.
325                    let processed_children = processed_children.borrow();
326                    assert_eq!(processed_children.len(), unprocessed_children.len());
327                    let processed_struct = Field::new_struct(
328                        current_field.name(),
329                        processed_children.as_slice(),
330                        current_field.is_nullable(),
331                    );
332                    parent_fields.borrow_mut().push(Arc::new(processed_struct));
333                }
334                (DataType::List(unprocessed_child), None) => {
335                    // This is the first time popping off this list. See struct docs above.
336                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
337                    stack.push((
338                        parquet_path.clone(),
339                        current_field,
340                        parent_fields,
341                        Some(Rc::clone(&child_fields)),
342                    ));
343                    let mut child_path = parquet_path.clone();
344                    // Spark uses a definition for arrays/lists that results in a group
345                    // named "list" that is not maintained when parsing to Arrow. We just push
346                    // this name into the path.
347                    child_path.push(".list.");
348                    child_path.push(unprocessed_child.name());
349                    stack.push((
350                        child_path.clone(),
351                        unprocessed_child,
352                        Rc::clone(&child_fields),
353                        None,
354                    ));
355                }
356                (DataType::List(_), Some(processed_children)) => {
357                    // This is the second time popping off this list. See struct docs above.
358                    let processed_children = processed_children.borrow();
359                    assert_eq!(processed_children.len(), 1);
360                    let processed_list = Field::new_list(
361                        current_field.name(),
362                        Arc::clone(&processed_children[0]),
363                        current_field.is_nullable(),
364                    );
365                    parent_fields.borrow_mut().push(Arc::new(processed_list));
366                }
367                (DataType::Map(unprocessed_child, _), None) => {
368                    // This is the first time popping off this map. See struct docs above.
369                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
370                    stack.push((
371                        parquet_path.clone(),
372                        current_field,
373                        parent_fields,
374                        Some(Rc::clone(&child_fields)),
375                    ));
376                    let mut child_path = parquet_path.clone();
377                    child_path.push(".");
378                    child_path.push(unprocessed_child.name());
379                    stack.push((
380                        child_path.clone(),
381                        unprocessed_child,
382                        Rc::clone(&child_fields),
383                        None,
384                    ));
385                }
386                (DataType::Map(_, sorted), Some(processed_children)) => {
387                    // This is the second time popping off this map. See struct docs above.
388                    let processed_children = processed_children.borrow();
389                    assert_eq!(processed_children.len(), 1);
390                    let processed_map = Field::new(
391                        current_field.name(),
392                        DataType::Map(Arc::clone(&processed_children[0]), *sorted),
393                        current_field.is_nullable(),
394                    );
395                    parent_fields.borrow_mut().push(Arc::new(processed_map));
396                }
397                (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
398                    if int96_fields.contains(parquet_path.concat().as_str()) =>
399                // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct
400                // time_unit, optionally attaching the requested timezone.
401                {
402                    parent_fields.borrow_mut().push(field_with_new_type(
403                        current_field,
404                        DataType::Timestamp(*time_unit, timezone.cloned()),
405                    ));
406                }
407                // Other types can be cloned as they are.
408                _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
409            }
410        }
411        assert_eq!(fields.borrow().len(), file_schema.fields.len());
412        Schema::new_with_metadata(
413            fields.borrow_mut().clone(),
414            file_schema.metadata.clone(),
415        )
416    };
417
418    Some(transformed_schema)
419}
420
421/// Coerces the file schema if the table schema uses a view type.
422#[deprecated(
423    since = "47.0.0",
424    note = "Use `apply_file_schema_type_coercions` instead"
425)]
426pub fn coerce_file_schema_to_view_type(
427    table_schema: &Schema,
428    file_schema: &Schema,
429) -> Option<Schema> {
430    let mut transform = false;
431    let table_fields: HashMap<_, _> = table_schema
432        .fields
433        .iter()
434        .map(|f| {
435            let dt = f.data_type();
436            if dt.equals_datatype(&DataType::Utf8View)
437                || dt.equals_datatype(&DataType::BinaryView)
438            {
439                transform = true;
440            }
441            (f.name(), dt)
442        })
443        .collect();
444
445    if !transform {
446        return None;
447    }
448
449    let transformed_fields: Vec<Arc<Field>> = file_schema
450        .fields
451        .iter()
452        .map(
453            |field| match (table_fields.get(field.name()), field.data_type()) {
454                (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
455                    field_with_new_type(field, DataType::Utf8View)
456                }
457                (
458                    Some(DataType::BinaryView),
459                    DataType::Binary | DataType::LargeBinary,
460                ) => field_with_new_type(field, DataType::BinaryView),
461                _ => Arc::clone(field),
462            },
463        )
464        .collect();
465
466    Some(Schema::new_with_metadata(
467        transformed_fields,
468        file_schema.metadata.clone(),
469    ))
470}
471
472/// If the table schema uses a string type, coerce the file schema to use a string type.
473///
474/// See [`ParquetFormat::binary_as_string`](crate::file_format::ParquetFormat::binary_as_string) for details
475#[deprecated(
476    since = "47.0.0",
477    note = "Use `apply_file_schema_type_coercions` instead"
478)]
479pub fn coerce_file_schema_to_string_type(
480    table_schema: &Schema,
481    file_schema: &Schema,
482) -> Option<Schema> {
483    let mut transform = false;
484    let table_fields: HashMap<_, _> = table_schema
485        .fields
486        .iter()
487        .map(|f| (f.name(), f.data_type()))
488        .collect();
489    let transformed_fields: Vec<Arc<Field>> = file_schema
490        .fields
491        .iter()
492        .map(
493            |field| match (table_fields.get(field.name()), field.data_type()) {
494                // table schema uses string type, coerce the file schema to use string type
495                (
496                    Some(DataType::Utf8),
497                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
498                ) => {
499                    transform = true;
500                    field_with_new_type(field, DataType::Utf8)
501                }
502                // table schema uses large string type, coerce the file schema to use large string type
503                (
504                    Some(DataType::LargeUtf8),
505                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
506                ) => {
507                    transform = true;
508                    field_with_new_type(field, DataType::LargeUtf8)
509                }
510                // table schema uses string view type, coerce the file schema to use view type
511                (
512                    Some(DataType::Utf8View),
513                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
514                ) => {
515                    transform = true;
516                    field_with_new_type(field, DataType::Utf8View)
517                }
518                _ => Arc::clone(field),
519            },
520        )
521        .collect();
522
523    if !transform {
524        None
525    } else {
526        Some(Schema::new_with_metadata(
527            transformed_fields,
528            file_schema.metadata.clone(),
529        ))
530    }
531}
532
533/// Create a new field with the specified data type, copying the other
534/// properties from the input field
535fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
536    Arc::new(field.as_ref().clone().with_data_type(new_type))
537}
538
539/// Transform a schema to use view types for Utf8 and Binary
540///
541/// See [`ParquetFormat::force_view_types`](crate::file_format::ParquetFormat::force_view_types) for details
542pub fn transform_schema_to_view(schema: &Schema) -> Schema {
543    let transformed_fields: Vec<Arc<Field>> = schema
544        .fields
545        .iter()
546        .map(|field| match field.data_type() {
547            DataType::Utf8 | DataType::LargeUtf8 => {
548                field_with_new_type(field, DataType::Utf8View)
549            }
550            DataType::Binary | DataType::LargeBinary => {
551                field_with_new_type(field, DataType::BinaryView)
552            }
553            _ => Arc::clone(field),
554        })
555        .collect();
556    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
557}
558
559/// Transform a schema so that any binary types are strings
560pub fn transform_binary_to_string(schema: &Schema) -> Schema {
561    let transformed_fields: Vec<Arc<Field>> = schema
562        .fields
563        .iter()
564        .map(|field| match field.data_type() {
565            DataType::Binary => field_with_new_type(field, DataType::Utf8),
566            DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
567            DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
568            _ => Arc::clone(field),
569        })
570        .collect();
571    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
572}
573#[cfg(test)]
574mod tests {
575    use parquet::arrow::parquet_to_arrow_schema;
576
577    use super::*;
578
579    use parquet::schema::parser::parse_message_type;
580
581    #[test]
582    fn coerce_int96_to_resolution_with_mixed_timestamps() {
583        // Unclear if Spark (or other writer) could generate a file with mixed timestamps like this,
584        // but we want to test the scenario just in case since it's at least a valid schema as far
585        // as the Parquet spec is concerned.
586        let spark_schema = "
587        message spark_schema {
588          optional int96 c0;
589          optional int64 c1 (TIMESTAMP(NANOS,true));
590          optional int64 c2 (TIMESTAMP(NANOS,false));
591          optional int64 c3 (TIMESTAMP(MILLIS,true));
592          optional int64 c4 (TIMESTAMP(MILLIS,false));
593          optional int64 c5 (TIMESTAMP(MICROS,true));
594          optional int64 c6 (TIMESTAMP(MICROS,false));
595        }
596        ";
597
598        let schema = parse_message_type(spark_schema).expect("should parse schema");
599        let descr = SchemaDescriptor::new(Arc::new(schema));
600
601        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
602
603        let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond)
604            .coerce()
605            .unwrap();
606
607        // Only the first field (c0) should be converted to a microsecond timestamp because it's the
608        // only timestamp that originated from an INT96.
609        let expected_schema = Schema::new(vec![
610            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
611            Field::new(
612                "c1",
613                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
614                true,
615            ),
616            Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
617            Field::new(
618                "c3",
619                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
620                true,
621            ),
622            Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
623            Field::new(
624                "c5",
625                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
626                true,
627            ),
628            Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
629        ]);
630
631        assert_eq!(result, expected_schema);
632    }
633
634    #[test]
635    fn coerce_int96_to_resolution_with_tz_applies_timezone() {
636        // Same input schema as `coerce_int96_to_resolution_with_mixed_timestamps`, but with a
637        // non-empty `timezone` argument. Only c0 (the INT96 column) should pick up the timezone;
638        // the other timestamp columns must keep whatever timezone they were declared with.
639        let spark_schema = "
640        message spark_schema {
641          optional int96 c0;
642          optional int64 c1 (TIMESTAMP(NANOS,true));
643          optional int64 c2 (TIMESTAMP(NANOS,false));
644          optional int64 c3 (TIMESTAMP(MILLIS,true));
645          optional int64 c4 (TIMESTAMP(MILLIS,false));
646          optional int64 c5 (TIMESTAMP(MICROS,true));
647          optional int64 c6 (TIMESTAMP(MICROS,false));
648        }
649        ";
650
651        let schema = parse_message_type(spark_schema).expect("should parse schema");
652        let descr = SchemaDescriptor::new(Arc::new(schema));
653
654        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
655
656        let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond)
657            .with_timezone(Some(Arc::from("UTC")))
658            .coerce()
659            .unwrap();
660
661        let expected_schema = Schema::new(vec![
662            Field::new(
663                "c0",
664                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
665                true,
666            ),
667            Field::new(
668                "c1",
669                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
670                true,
671            ),
672            Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
673            Field::new(
674                "c3",
675                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
676                true,
677            ),
678            Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
679            Field::new(
680                "c5",
681                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
682                true,
683            ),
684            Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
685        ]);
686
687        assert_eq!(result, expected_schema);
688    }
689
690    #[test]
691    fn coerce_int96_to_resolution_with_nested_types() {
692        // This schema is derived from Comet's CometFuzzTestSuite ParquetGenerator only using int96
693        // primitive types with generateStruct, generateArray, and generateMap set to true, with one
694        // additional field added to c4's struct to make sure all fields in a struct get modified.
695        // https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala
696        let spark_schema = "
697        message spark_schema {
698          optional int96 c0;
699          optional group c1 {
700            optional int96 c0;
701          }
702          optional group c2 {
703            optional group c0 (LIST) {
704              repeated group list {
705                optional int96 element;
706              }
707            }
708          }
709          optional group c3 (LIST) {
710            repeated group list {
711              optional int96 element;
712            }
713          }
714          optional group c4 (LIST) {
715            repeated group list {
716              optional group element {
717                optional int96 c0;
718                optional int96 c1;
719              }
720            }
721          }
722          optional group c5 (MAP) {
723            repeated group key_value {
724              required int96 key;
725              optional int96 value;
726            }
727          }
728          optional group c6 (LIST) {
729            repeated group list {
730              optional group element (MAP) {
731                repeated group key_value {
732                  required int96 key;
733                  optional int96 value;
734                }
735              }
736            }
737          }
738        }
739        ";
740
741        let schema = parse_message_type(spark_schema).expect("should parse schema");
742        let descr = SchemaDescriptor::new(Arc::new(schema));
743
744        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
745
746        let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond)
747            .coerce()
748            .unwrap();
749
750        let expected_schema = Schema::new(vec![
751            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
752            Field::new_struct(
753                "c1",
754                vec![Field::new(
755                    "c0",
756                    DataType::Timestamp(TimeUnit::Microsecond, None),
757                    true,
758                )],
759                true,
760            ),
761            Field::new_struct(
762                "c2",
763                vec![Field::new_list(
764                    "c0",
765                    Field::new(
766                        "element",
767                        DataType::Timestamp(TimeUnit::Microsecond, None),
768                        true,
769                    ),
770                    true,
771                )],
772                true,
773            ),
774            Field::new_list(
775                "c3",
776                Field::new(
777                    "element",
778                    DataType::Timestamp(TimeUnit::Microsecond, None),
779                    true,
780                ),
781                true,
782            ),
783            Field::new_list(
784                "c4",
785                Field::new_struct(
786                    "element",
787                    vec![
788                        Field::new(
789                            "c0",
790                            DataType::Timestamp(TimeUnit::Microsecond, None),
791                            true,
792                        ),
793                        Field::new(
794                            "c1",
795                            DataType::Timestamp(TimeUnit::Microsecond, None),
796                            true,
797                        ),
798                    ],
799                    true,
800                ),
801                true,
802            ),
803            Field::new_map(
804                "c5",
805                "key_value",
806                Field::new(
807                    "key",
808                    DataType::Timestamp(TimeUnit::Microsecond, None),
809                    false,
810                ),
811                Field::new(
812                    "value",
813                    DataType::Timestamp(TimeUnit::Microsecond, None),
814                    true,
815                ),
816                false,
817                true,
818            ),
819            Field::new_list(
820                "c6",
821                Field::new_map(
822                    "element",
823                    "key_value",
824                    Field::new(
825                        "key",
826                        DataType::Timestamp(TimeUnit::Microsecond, None),
827                        false,
828                    ),
829                    Field::new(
830                        "value",
831                        DataType::Timestamp(TimeUnit::Microsecond, None),
832                        true,
833                    ),
834                    false,
835                    true,
836                ),
837                true,
838            ),
839        ]);
840
841        assert_eq!(result, expected_schema);
842    }
843}