Skip to main content

datafusion_datasource_parquet/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::cell::RefCell;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{fmt, vec};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
30use datafusion_datasource::TableSchema;
31use datafusion_datasource::file_compression_type::FileCompressionType;
32use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
33use datafusion_datasource::write::{
34    ObjectWriterBuilder, SharedBuffer, get_writer_schema,
35};
36
37use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
38use datafusion_datasource::write::demux::DemuxedStreamReceiver;
39
40use arrow::datatypes::{DataType, Field, FieldRef};
41use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
42use datafusion_common::encryption::FileDecryptionProperties;
43use datafusion_common::parsers::CompressionTypeVariant;
44use datafusion_common::{
45    DEFAULT_PARQUET_EXTENSION, DataFusionError, GetExt, HashSet, Result,
46    internal_datafusion_err, internal_err, not_impl_err,
47};
48use datafusion_common::{HashMap, Statistics};
49use datafusion_common_runtime::{JoinSet, SpawnedTask};
50use datafusion_datasource::display::FileGroupDisplay;
51use datafusion_datasource::file::FileSource;
52use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
53use datafusion_datasource::sink::{DataSink, DataSinkExec};
54use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
55use datafusion_execution::{SendableRecordBatchStream, TaskContext};
56use datafusion_expr::dml::InsertOp;
57use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
58use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
59use datafusion_session::Session;
60
61use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns};
62use crate::reader::CachedParquetFileReaderFactory;
63use crate::source::{ParquetSource, parse_coerce_int96_string};
64use async_trait::async_trait;
65use bytes::Bytes;
66use datafusion_datasource::source::DataSourceExec;
67use datafusion_execution::cache::cache_manager::FileMetadataCache;
68use datafusion_execution::runtime_env::RuntimeEnv;
69use futures::future::BoxFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use object_store::buffered::BufWriter;
72use object_store::path::Path;
73use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
74use parquet::arrow::arrow_writer::{
75    ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory,
76    ArrowWriterOptions, compute_leaves,
77};
78use parquet::arrow::async_reader::MetadataFetch;
79use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
80use parquet::basic::Type;
81#[cfg(feature = "parquet_encryption")]
82use parquet::encryption::encrypt::FileEncryptionProperties;
83use parquet::errors::ParquetError;
84use parquet::file::metadata::{ParquetMetaData, SortingColumn};
85use parquet::file::properties::{
86    DEFAULT_MAX_ROW_GROUP_ROW_COUNT, WriterProperties, WriterPropertiesBuilder,
87};
88use parquet::file::writer::SerializedFileWriter;
89use parquet::schema::types::SchemaDescriptor;
90use tokio::io::{AsyncWrite, AsyncWriteExt};
91use tokio::sync::mpsc::{self, Receiver, Sender};
92
93/// Initial writing buffer size. Note this is just a size hint for efficiency. It
94/// will grow beyond the set value if needed.
95const INITIAL_BUFFER_BYTES: usize = 1048576;
96
97/// When writing parquet files in parallel, if the buffered Parquet data exceeds
98/// this size, it is flushed to object store
99const BUFFER_FLUSH_BYTES: usize = 1024000;
100
101#[derive(Default)]
102/// Factory struct used to create [ParquetFormat]
103pub struct ParquetFormatFactory {
104    /// inner options for parquet
105    pub options: Option<TableParquetOptions>,
106}
107
108impl ParquetFormatFactory {
109    /// Creates an instance of [ParquetFormatFactory]
110    pub fn new() -> Self {
111        Self { options: None }
112    }
113
114    /// Creates an instance of [ParquetFormatFactory] with customized default options
115    pub fn new_with_options(options: TableParquetOptions) -> Self {
116        Self {
117            options: Some(options),
118        }
119    }
120}
121
122impl FileFormatFactory for ParquetFormatFactory {
123    fn create(
124        &self,
125        state: &dyn Session,
126        format_options: &std::collections::HashMap<String, String>,
127    ) -> Result<Arc<dyn FileFormat>> {
128        let parquet_options = match &self.options {
129            None => {
130                let mut table_options = state.default_table_options();
131                table_options.set_config_format(ConfigFileType::PARQUET);
132                table_options.alter_with_string_hash_map(format_options)?;
133                table_options.parquet
134            }
135            Some(parquet_options) => {
136                let mut parquet_options = parquet_options.clone();
137                for (k, v) in format_options {
138                    parquet_options.set(k, v)?;
139                }
140                parquet_options
141            }
142        };
143
144        Ok(Arc::new(
145            ParquetFormat::default().with_options(parquet_options),
146        ))
147    }
148
149    fn default(&self) -> Arc<dyn FileFormat> {
150        Arc::new(ParquetFormat::default())
151    }
152
153    fn as_any(&self) -> &dyn Any {
154        self
155    }
156}
157
158impl GetExt for ParquetFormatFactory {
159    fn get_ext(&self) -> String {
160        // Removes the dot, i.e. ".parquet" -> "parquet"
161        DEFAULT_PARQUET_EXTENSION[1..].to_string()
162    }
163}
164
165impl Debug for ParquetFormatFactory {
166    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167        f.debug_struct("ParquetFormatFactory")
168            .field("ParquetFormatFactory", &self.options)
169            .finish()
170    }
171}
172/// The Apache Parquet `FileFormat` implementation
173#[derive(Debug, Default)]
174pub struct ParquetFormat {
175    options: TableParquetOptions,
176}
177
178impl ParquetFormat {
179    /// Construct a new Format with no local overrides
180    pub fn new() -> Self {
181        Self::default()
182    }
183
184    /// Activate statistics based row group level pruning
185    /// - If `None`, defaults to value on `config_options`
186    pub fn with_enable_pruning(mut self, enable: bool) -> Self {
187        self.options.global.pruning = enable;
188        self
189    }
190
191    /// Return `true` if pruning is enabled
192    pub fn enable_pruning(&self) -> bool {
193        self.options.global.pruning
194    }
195
196    /// Provide a hint to the size of the file metadata. If a hint is provided
197    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
198    /// Without a hint, two read are required. One read to fetch the 8-byte parquet footer and then
199    /// another read to fetch the metadata length encoded in the footer.
200    ///
201    /// - If `None`, defaults to value on `config_options`
202    pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
203        self.options.global.metadata_size_hint = size_hint;
204        self
205    }
206
207    /// Return the metadata size hint if set
208    pub fn metadata_size_hint(&self) -> Option<usize> {
209        self.options.global.metadata_size_hint
210    }
211
212    /// Tell the parquet reader to skip any metadata that may be in
213    /// the file Schema. This can help avoid schema conflicts due to
214    /// metadata.
215    ///
216    /// - If `None`, defaults to value on `config_options`
217    pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
218        self.options.global.skip_metadata = skip_metadata;
219        self
220    }
221
222    /// Returns `true` if schema metadata will be cleared prior to
223    /// schema merging.
224    pub fn skip_metadata(&self) -> bool {
225        self.options.global.skip_metadata
226    }
227
228    /// Set Parquet options for the ParquetFormat
229    pub fn with_options(mut self, options: TableParquetOptions) -> Self {
230        self.options = options;
231        self
232    }
233
234    /// Parquet options
235    pub fn options(&self) -> &TableParquetOptions {
236        &self.options
237    }
238
239    /// Return `true` if should use view types.
240    ///
241    /// If this returns true, DataFusion will instruct the parquet reader
242    /// to read string / binary columns using view `StringView` or `BinaryView`
243    /// if the table schema specifies those types, regardless of any embedded metadata
244    /// that may specify an alternate Arrow type. The parquet reader is optimized
245    /// for reading `StringView` and `BinaryView` and such queries are significantly faster.
246    ///
247    /// If this returns false, the parquet reader will read the columns according to the
248    /// defaults or any embedded Arrow type information. This may result in reading
249    /// `StringArrays` and then casting to `StringViewArray` which is less efficient.
250    pub fn force_view_types(&self) -> bool {
251        self.options.global.schema_force_view_types
252    }
253
254    /// If true, will use view types. See [`Self::force_view_types`] for details
255    pub fn with_force_view_types(mut self, use_views: bool) -> Self {
256        self.options.global.schema_force_view_types = use_views;
257        self
258    }
259
260    /// Return `true` if binary types will be read as strings.
261    ///
262    /// If this returns true, DataFusion will instruct the parquet reader
263    /// to read binary columns such as `Binary` or `BinaryView` as the
264    /// corresponding string type such as `Utf8` or `LargeUtf8`.
265    /// The parquet reader has special optimizations for `Utf8` and `LargeUtf8`
266    /// validation, and such queries are significantly faster than reading
267    /// binary columns and then casting to string columns.
268    pub fn binary_as_string(&self) -> bool {
269        self.options.global.binary_as_string
270    }
271
272    /// If true, will read binary types as strings. See [`Self::binary_as_string`] for details
273    pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
274        self.options.global.binary_as_string = binary_as_string;
275        self
276    }
277
278    pub fn coerce_int96(&self) -> Option<String> {
279        self.options.global.coerce_int96.clone()
280    }
281
282    pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
283        self.options.global.coerce_int96 = time_unit;
284        self
285    }
286}
287
288/// Clears all metadata (Schema level and field level) on an iterator
289/// of Schemas
290fn clear_metadata(
291    schemas: impl IntoIterator<Item = Schema>,
292) -> impl Iterator<Item = Schema> {
293    schemas.into_iter().map(|schema| {
294        let fields = schema
295            .fields()
296            .iter()
297            .map(|field| {
298                field.as_ref().clone().with_metadata(Default::default()) // clear meta
299            })
300            .collect::<Fields>();
301        Schema::new(fields)
302    })
303}
304
305#[cfg(feature = "parquet_encryption")]
306async fn get_file_decryption_properties(
307    state: &dyn Session,
308    options: &TableParquetOptions,
309    file_path: &Path,
310) -> Result<Option<Arc<FileDecryptionProperties>>> {
311    Ok(match &options.crypto.file_decryption {
312        Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))),
313        None => match &options.crypto.factory_id {
314            Some(factory_id) => {
315                let factory =
316                    state.runtime_env().parquet_encryption_factory(factory_id)?;
317                factory
318                    .get_file_decryption_properties(
319                        &options.crypto.factory_options,
320                        file_path,
321                    )
322                    .await?
323            }
324            None => None,
325        },
326    })
327}
328
329#[cfg(not(feature = "parquet_encryption"))]
330async fn get_file_decryption_properties(
331    _state: &dyn Session,
332    _options: &TableParquetOptions,
333    _file_path: &Path,
334) -> Result<Option<Arc<FileDecryptionProperties>>> {
335    Ok(None)
336}
337
338#[async_trait]
339impl FileFormat for ParquetFormat {
340    fn as_any(&self) -> &dyn Any {
341        self
342    }
343
344    fn get_ext(&self) -> String {
345        ParquetFormatFactory::new().get_ext()
346    }
347
348    fn get_ext_with_compression(
349        &self,
350        file_compression_type: &FileCompressionType,
351    ) -> Result<String> {
352        let ext = self.get_ext();
353        match file_compression_type.get_variant() {
354            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
355            _ => internal_err!("Parquet FileFormat does not support compression."),
356        }
357    }
358
359    fn compression_type(&self) -> Option<FileCompressionType> {
360        None
361    }
362
363    async fn infer_schema(
364        &self,
365        state: &dyn Session,
366        store: &Arc<dyn ObjectStore>,
367        objects: &[ObjectMeta],
368    ) -> Result<SchemaRef> {
369        let coerce_int96 = match self.coerce_int96() {
370            Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
371            None => None,
372        };
373
374        let file_metadata_cache =
375            state.runtime_env().cache_manager.get_file_metadata_cache();
376
377        let mut schemas: Vec<_> = futures::stream::iter(objects)
378            .map(|object| async {
379                let file_decryption_properties = get_file_decryption_properties(
380                    state,
381                    &self.options,
382                    &object.location,
383                )
384                .await?;
385                let result = DFParquetMetadata::new(store.as_ref(), object)
386                    .with_metadata_size_hint(self.metadata_size_hint())
387                    .with_decryption_properties(file_decryption_properties)
388                    .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
389                    .with_coerce_int96(coerce_int96)
390                    .fetch_schema_with_location()
391                    .await?;
392                Ok::<_, DataFusionError>(result)
393            })
394            .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
395            // fetch schemas concurrently, if requested
396            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
397            .try_collect()
398            .await?;
399
400        // Schema inference adds fields based the order they are seen
401        // which depends on the order the files are processed. For some
402        // object stores (like local file systems) the order returned from list
403        // is not deterministic. Thus, to ensure deterministic schema inference
404        // sort the files first.
405        // https://github.com/apache/datafusion/pull/6629
406        schemas
407            .sort_unstable_by(|(location1, _), (location2, _)| location1.cmp(location2));
408
409        let schemas = schemas.into_iter().map(|(_, schema)| schema);
410
411        let schema = if self.skip_metadata() {
412            Schema::try_merge(clear_metadata(schemas))
413        } else {
414            Schema::try_merge(schemas)
415        }?;
416
417        let schema = if self.binary_as_string() {
418            transform_binary_to_string(&schema)
419        } else {
420            schema
421        };
422
423        let schema = if self.force_view_types() {
424            transform_schema_to_view(&schema)
425        } else {
426            schema
427        };
428
429        Ok(Arc::new(schema))
430    }
431
432    async fn infer_stats(
433        &self,
434        state: &dyn Session,
435        store: &Arc<dyn ObjectStore>,
436        table_schema: SchemaRef,
437        object: &ObjectMeta,
438    ) -> Result<Statistics> {
439        let file_decryption_properties =
440            get_file_decryption_properties(state, &self.options, &object.location)
441                .await?;
442        let file_metadata_cache =
443            state.runtime_env().cache_manager.get_file_metadata_cache();
444        DFParquetMetadata::new(store, object)
445            .with_metadata_size_hint(self.metadata_size_hint())
446            .with_decryption_properties(file_decryption_properties)
447            .with_file_metadata_cache(Some(file_metadata_cache))
448            .fetch_statistics(&table_schema)
449            .await
450    }
451
452    async fn infer_ordering(
453        &self,
454        state: &dyn Session,
455        store: &Arc<dyn ObjectStore>,
456        table_schema: SchemaRef,
457        object: &ObjectMeta,
458    ) -> Result<Option<LexOrdering>> {
459        let file_decryption_properties =
460            get_file_decryption_properties(state, &self.options, &object.location)
461                .await?;
462        let file_metadata_cache =
463            state.runtime_env().cache_manager.get_file_metadata_cache();
464        let metadata = DFParquetMetadata::new(store, object)
465            .with_metadata_size_hint(self.metadata_size_hint())
466            .with_decryption_properties(file_decryption_properties)
467            .with_file_metadata_cache(Some(file_metadata_cache))
468            .fetch_metadata()
469            .await?;
470        crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)
471    }
472
473    async fn infer_stats_and_ordering(
474        &self,
475        state: &dyn Session,
476        store: &Arc<dyn ObjectStore>,
477        table_schema: SchemaRef,
478        object: &ObjectMeta,
479    ) -> Result<datafusion_datasource::file_format::FileMeta> {
480        let file_decryption_properties =
481            get_file_decryption_properties(state, &self.options, &object.location)
482                .await?;
483        let file_metadata_cache =
484            state.runtime_env().cache_manager.get_file_metadata_cache();
485        let metadata = DFParquetMetadata::new(store, object)
486            .with_metadata_size_hint(self.metadata_size_hint())
487            .with_decryption_properties(file_decryption_properties)
488            .with_file_metadata_cache(Some(file_metadata_cache))
489            .fetch_metadata()
490            .await?;
491        let statistics = DFParquetMetadata::statistics_from_parquet_metadata(
492            &metadata,
493            &table_schema,
494        )?;
495        let ordering =
496            crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)?;
497        Ok(
498            datafusion_datasource::file_format::FileMeta::new(statistics)
499                .with_ordering(ordering),
500        )
501    }
502
503    async fn create_physical_plan(
504        &self,
505        state: &dyn Session,
506        conf: FileScanConfig,
507    ) -> Result<Arc<dyn ExecutionPlan>> {
508        let mut metadata_size_hint = None;
509
510        if let Some(metadata) = self.metadata_size_hint() {
511            metadata_size_hint = Some(metadata);
512        }
513
514        let mut source = conf
515            .file_source()
516            .as_any()
517            .downcast_ref::<ParquetSource>()
518            .cloned()
519            .ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
520        source = source.with_table_parquet_options(self.options.clone());
521
522        // Use the CachedParquetFileReaderFactory
523        let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
524        let store = state
525            .runtime_env()
526            .object_store(conf.object_store_url.clone())?;
527        let cached_parquet_read_factory =
528            Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
529        source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
530
531        if let Some(metadata_size_hint) = metadata_size_hint {
532            source = source.with_metadata_size_hint(metadata_size_hint)
533        }
534
535        source = self.set_source_encryption_factory(source, state)?;
536
537        let conf = FileScanConfigBuilder::from(conf)
538            .with_source(Arc::new(source))
539            .build();
540        Ok(DataSourceExec::from_data_source(conf))
541    }
542
543    async fn create_writer_physical_plan(
544        &self,
545        input: Arc<dyn ExecutionPlan>,
546        _state: &dyn Session,
547        conf: FileSinkConfig,
548        order_requirements: Option<LexRequirement>,
549    ) -> Result<Arc<dyn ExecutionPlan>> {
550        if conf.insert_op != InsertOp::Append {
551            return not_impl_err!("Overwrites are not implemented yet for Parquet");
552        }
553
554        // Convert ordering requirements to Parquet SortingColumns for file metadata
555        let sorting_columns = if let Some(ref requirements) = order_requirements {
556            let ordering: LexOrdering = requirements.clone().into();
557            // In cases like `COPY (... ORDER BY ...) TO ...` the ORDER BY clause
558            // may not be compatible with Parquet sorting columns (e.g. ordering on `random()`).
559            // So if we cannot create a Parquet sorting column from the ordering requirement,
560            // we skip setting sorting columns on the Parquet sink.
561            lex_ordering_to_sorting_columns(&ordering).ok()
562        } else {
563            None
564        };
565
566        let sink = Arc::new(
567            ParquetSink::new(conf, self.options.clone())
568                .with_sorting_columns(sorting_columns),
569        );
570
571        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
572    }
573
574    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
575        Arc::new(
576            ParquetSource::new(table_schema)
577                .with_table_parquet_options(self.options.clone()),
578        )
579    }
580}
581
582#[cfg(feature = "parquet_encryption")]
583impl ParquetFormat {
584    fn set_source_encryption_factory(
585        &self,
586        source: ParquetSource,
587        state: &dyn Session,
588    ) -> Result<ParquetSource> {
589        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
590            Ok(source.with_encryption_factory(
591                state
592                    .runtime_env()
593                    .parquet_encryption_factory(encryption_factory_id)?,
594            ))
595        } else {
596            Ok(source)
597        }
598    }
599}
600
601#[cfg(not(feature = "parquet_encryption"))]
602impl ParquetFormat {
603    fn set_source_encryption_factory(
604        &self,
605        source: ParquetSource,
606        _state: &dyn Session,
607    ) -> Result<ParquetSource> {
608        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
609            Err(DataFusionError::Configuration(format!(
610                "Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"
611            )))
612        } else {
613            Ok(source)
614        }
615    }
616}
617
618/// Apply necessary schema type coercions to make file schema match table schema.
619///
620/// This function performs two main types of transformations in a single pass:
621/// 1. Binary types to string types conversion - Converts binary data types to their
622///    corresponding string types when the table schema expects string data
623/// 2. Regular to view types conversion - Converts standard string/binary types to
624///    view types when the table schema uses view types
625///
626/// # Arguments
627/// * `table_schema` - The table schema containing the desired types
628/// * `file_schema` - The file schema to be transformed
629///
630/// # Returns
631/// * `Some(Schema)` - If any transformations were applied, returns the transformed schema
632/// * `None` - If no transformations were needed
633pub fn apply_file_schema_type_coercions(
634    table_schema: &Schema,
635    file_schema: &Schema,
636) -> Option<Schema> {
637    let mut needs_view_transform = false;
638    let mut needs_string_transform = false;
639
640    // Create a mapping of table field names to their data types for fast lookup
641    // and simultaneously check if we need any transformations
642    let table_fields: HashMap<_, _> = table_schema
643        .fields()
644        .iter()
645        .map(|f| {
646            let dt = f.data_type();
647            // Check if we need view type transformation
648            if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
649                needs_view_transform = true;
650            }
651            // Check if we need string type transformation
652            if matches!(
653                dt,
654                &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
655            ) {
656                needs_string_transform = true;
657            }
658
659            (f.name(), dt)
660        })
661        .collect();
662
663    // Early return if no transformation needed
664    if !needs_view_transform && !needs_string_transform {
665        return None;
666    }
667
668    let transformed_fields: Vec<Arc<Field>> = file_schema
669        .fields()
670        .iter()
671        .map(|field| {
672            let field_name = field.name();
673            let field_type = field.data_type();
674
675            // Look up the corresponding field type in the table schema
676            if let Some(table_type) = table_fields.get(field_name) {
677                match (table_type, field_type) {
678                    // table schema uses string type, coerce the file schema to use string type
679                    (
680                        &DataType::Utf8,
681                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
682                    ) => {
683                        return field_with_new_type(field, DataType::Utf8);
684                    }
685                    // table schema uses large string type, coerce the file schema to use large string type
686                    (
687                        &DataType::LargeUtf8,
688                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
689                    ) => {
690                        return field_with_new_type(field, DataType::LargeUtf8);
691                    }
692                    // table schema uses string view type, coerce the file schema to use view type
693                    (
694                        &DataType::Utf8View,
695                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
696                    ) => {
697                        return field_with_new_type(field, DataType::Utf8View);
698                    }
699                    // Handle view type conversions
700                    (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
701                        return field_with_new_type(field, DataType::Utf8View);
702                    }
703                    (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
704                        return field_with_new_type(field, DataType::BinaryView);
705                    }
706                    _ => {}
707                }
708            }
709
710            // If no transformation is needed, keep the original field
711            Arc::clone(field)
712        })
713        .collect();
714
715    Some(Schema::new_with_metadata(
716        transformed_fields,
717        file_schema.metadata.clone(),
718    ))
719}
720
721/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96.
722pub fn coerce_int96_to_resolution(
723    parquet_schema: &SchemaDescriptor,
724    file_schema: &Schema,
725    time_unit: &TimeUnit,
726) -> Option<Schema> {
727    // Traverse the parquet_schema columns looking for int96 physical types. If encountered, insert
728    // the field's full path into a set.
729    let int96_fields: HashSet<_> = parquet_schema
730        .columns()
731        .iter()
732        .filter(|f| f.physical_type() == Type::INT96)
733        .map(|f| f.path().string())
734        .collect();
735
736    if int96_fields.is_empty() {
737        // The schema doesn't contain any int96 fields, so skip the remaining logic.
738        return None;
739    }
740
741    // Do a DFS into the schema using a stack, looking for timestamp(nanos) fields that originated
742    // as int96 to coerce to the provided time_unit.
743
744    type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
745    type StackContext<'a> = (
746        Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1") for the current field.
747        &'a FieldRef, // The current field to be processed.
748        NestedFields, // The parent's fields that this field will be (possibly) type-coerced and
749        // inserted into. All fields have a parent, so this is not an Option type.
750        Option<NestedFields>, // Nested types need to create their own vector of fields for their
751                              // children. For primitive types this will remain None. For nested
752                              // types it is None the first time they are processed. Then, we
753                              // instantiate a vector for its children, push the field back onto the
754                              // stack to be processed again, and DFS into its children. The next
755                              // time we process the field, we know we have DFS'd into the children
756                              // because this field is Some.
757    );
758
759    // This is our top-level fields from which we will construct our schema. We pass this into our
760    // initial stack context as the parent fields, and the DFS populates it.
761    let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
762
763    // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we
764    // use some sort of LPM data structure to check if we're currently DFS'ing nested types that are
765    // in a column path that contains an int96. That can be a future optimization for large schemas.
766    let transformed_schema = {
767        // Populate the stack with our top-level fields.
768        let mut stack: Vec<StackContext> = file_schema
769            .fields()
770            .iter()
771            .rev()
772            .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
773            .collect();
774
775        // Pop fields to DFS into until we have exhausted the stack.
776        while let Some((parquet_path, current_field, parent_fields, child_fields)) =
777            stack.pop()
778        {
779            match (current_field.data_type(), child_fields) {
780                (DataType::Struct(unprocessed_children), None) => {
781                    // This is the first time popping off this struct. We don't yet know the
782                    // correct types of its children (i.e., if they need coercing) so we create
783                    // a vector for child_fields, push the struct node back onto the stack to be
784                    // processed again (see below) after processing all its children.
785                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
786                        unprocessed_children.len(),
787                    )));
788                    // Note that here we push the struct back onto the stack with its
789                    // parent_fields in the same position, now with Some(child_fields).
790                    stack.push((
791                        parquet_path.clone(),
792                        current_field,
793                        parent_fields,
794                        Some(Rc::clone(&child_fields)),
795                    ));
796                    // Push all the children in reverse to maintain original schema order due to
797                    // stack processing.
798                    for child in unprocessed_children.into_iter().rev() {
799                        let mut child_path = parquet_path.clone();
800                        // Build up a normalized path that we'll use as a key into the original
801                        // int96_fields set above to test if this originated as int96.
802                        child_path.push(".");
803                        child_path.push(child.name());
804                        // Note that here we push the field onto the stack using the struct's
805                        // new child_fields vector as the field's parent_fields.
806                        stack.push((child_path, child, Rc::clone(&child_fields), None));
807                    }
808                }
809                (DataType::Struct(unprocessed_children), Some(processed_children)) => {
810                    // This is the second time popping off this struct. The child_fields vector
811                    // now contains each field that has been DFS'd into, and we can construct
812                    // the resulting struct with correct child types.
813                    let processed_children = processed_children.borrow();
814                    assert_eq!(processed_children.len(), unprocessed_children.len());
815                    let processed_struct = Field::new_struct(
816                        current_field.name(),
817                        processed_children.as_slice(),
818                        current_field.is_nullable(),
819                    );
820                    parent_fields.borrow_mut().push(Arc::new(processed_struct));
821                }
822                (DataType::List(unprocessed_child), None) => {
823                    // This is the first time popping off this list. See struct docs above.
824                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
825                    stack.push((
826                        parquet_path.clone(),
827                        current_field,
828                        parent_fields,
829                        Some(Rc::clone(&child_fields)),
830                    ));
831                    let mut child_path = parquet_path.clone();
832                    // Spark uses a definition for arrays/lists that results in a group
833                    // named "list" that is not maintained when parsing to Arrow. We just push
834                    // this name into the path.
835                    child_path.push(".list.");
836                    child_path.push(unprocessed_child.name());
837                    stack.push((
838                        child_path.clone(),
839                        unprocessed_child,
840                        Rc::clone(&child_fields),
841                        None,
842                    ));
843                }
844                (DataType::List(_), Some(processed_children)) => {
845                    // This is the second time popping off this list. See struct docs above.
846                    let processed_children = processed_children.borrow();
847                    assert_eq!(processed_children.len(), 1);
848                    let processed_list = Field::new_list(
849                        current_field.name(),
850                        Arc::clone(&processed_children[0]),
851                        current_field.is_nullable(),
852                    );
853                    parent_fields.borrow_mut().push(Arc::new(processed_list));
854                }
855                (DataType::Map(unprocessed_child, _), None) => {
856                    // This is the first time popping off this map. See struct docs above.
857                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
858                    stack.push((
859                        parquet_path.clone(),
860                        current_field,
861                        parent_fields,
862                        Some(Rc::clone(&child_fields)),
863                    ));
864                    let mut child_path = parquet_path.clone();
865                    child_path.push(".");
866                    child_path.push(unprocessed_child.name());
867                    stack.push((
868                        child_path.clone(),
869                        unprocessed_child,
870                        Rc::clone(&child_fields),
871                        None,
872                    ));
873                }
874                (DataType::Map(_, sorted), Some(processed_children)) => {
875                    // This is the second time popping off this map. See struct docs above.
876                    let processed_children = processed_children.borrow();
877                    assert_eq!(processed_children.len(), 1);
878                    let processed_map = Field::new(
879                        current_field.name(),
880                        DataType::Map(Arc::clone(&processed_children[0]), *sorted),
881                        current_field.is_nullable(),
882                    );
883                    parent_fields.borrow_mut().push(Arc::new(processed_map));
884                }
885                (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
886                    if int96_fields.contains(parquet_path.concat().as_str()) =>
887                // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct
888                // time_unit.
889                {
890                    parent_fields.borrow_mut().push(field_with_new_type(
891                        current_field,
892                        DataType::Timestamp(*time_unit, None),
893                    ));
894                }
895                // Other types can be cloned as they are.
896                _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
897            }
898        }
899        assert_eq!(fields.borrow().len(), file_schema.fields.len());
900        Schema::new_with_metadata(
901            fields.borrow_mut().clone(),
902            file_schema.metadata.clone(),
903        )
904    };
905
906    Some(transformed_schema)
907}
908
909/// Coerces the file schema if the table schema uses a view type.
910#[deprecated(
911    since = "47.0.0",
912    note = "Use `apply_file_schema_type_coercions` instead"
913)]
914pub fn coerce_file_schema_to_view_type(
915    table_schema: &Schema,
916    file_schema: &Schema,
917) -> Option<Schema> {
918    let mut transform = false;
919    let table_fields: HashMap<_, _> = table_schema
920        .fields
921        .iter()
922        .map(|f| {
923            let dt = f.data_type();
924            if dt.equals_datatype(&DataType::Utf8View)
925                || dt.equals_datatype(&DataType::BinaryView)
926            {
927                transform = true;
928            }
929            (f.name(), dt)
930        })
931        .collect();
932
933    if !transform {
934        return None;
935    }
936
937    let transformed_fields: Vec<Arc<Field>> = file_schema
938        .fields
939        .iter()
940        .map(
941            |field| match (table_fields.get(field.name()), field.data_type()) {
942                (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
943                    field_with_new_type(field, DataType::Utf8View)
944                }
945                (
946                    Some(DataType::BinaryView),
947                    DataType::Binary | DataType::LargeBinary,
948                ) => field_with_new_type(field, DataType::BinaryView),
949                _ => Arc::clone(field),
950            },
951        )
952        .collect();
953
954    Some(Schema::new_with_metadata(
955        transformed_fields,
956        file_schema.metadata.clone(),
957    ))
958}
959
960/// If the table schema uses a string type, coerce the file schema to use a string type.
961///
962/// See [ParquetFormat::binary_as_string] for details
963#[deprecated(
964    since = "47.0.0",
965    note = "Use `apply_file_schema_type_coercions` instead"
966)]
967pub fn coerce_file_schema_to_string_type(
968    table_schema: &Schema,
969    file_schema: &Schema,
970) -> Option<Schema> {
971    let mut transform = false;
972    let table_fields: HashMap<_, _> = table_schema
973        .fields
974        .iter()
975        .map(|f| (f.name(), f.data_type()))
976        .collect();
977    let transformed_fields: Vec<Arc<Field>> = file_schema
978        .fields
979        .iter()
980        .map(
981            |field| match (table_fields.get(field.name()), field.data_type()) {
982                // table schema uses string type, coerce the file schema to use string type
983                (
984                    Some(DataType::Utf8),
985                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
986                ) => {
987                    transform = true;
988                    field_with_new_type(field, DataType::Utf8)
989                }
990                // table schema uses large string type, coerce the file schema to use large string type
991                (
992                    Some(DataType::LargeUtf8),
993                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
994                ) => {
995                    transform = true;
996                    field_with_new_type(field, DataType::LargeUtf8)
997                }
998                // table schema uses string view type, coerce the file schema to use view type
999                (
1000                    Some(DataType::Utf8View),
1001                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
1002                ) => {
1003                    transform = true;
1004                    field_with_new_type(field, DataType::Utf8View)
1005                }
1006                _ => Arc::clone(field),
1007            },
1008        )
1009        .collect();
1010
1011    if !transform {
1012        None
1013    } else {
1014        Some(Schema::new_with_metadata(
1015            transformed_fields,
1016            file_schema.metadata.clone(),
1017        ))
1018    }
1019}
1020
1021/// Create a new field with the specified data type, copying the other
1022/// properties from the input field
1023fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
1024    Arc::new(field.as_ref().clone().with_data_type(new_type))
1025}
1026
1027/// Transform a schema to use view types for Utf8 and Binary
1028///
1029/// See [ParquetFormat::force_view_types] for details
1030pub fn transform_schema_to_view(schema: &Schema) -> Schema {
1031    let transformed_fields: Vec<Arc<Field>> = schema
1032        .fields
1033        .iter()
1034        .map(|field| match field.data_type() {
1035            DataType::Utf8 | DataType::LargeUtf8 => {
1036                field_with_new_type(field, DataType::Utf8View)
1037            }
1038            DataType::Binary | DataType::LargeBinary => {
1039                field_with_new_type(field, DataType::BinaryView)
1040            }
1041            _ => Arc::clone(field),
1042        })
1043        .collect();
1044    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
1045}
1046
1047/// Transform a schema so that any binary types are strings
1048pub fn transform_binary_to_string(schema: &Schema) -> Schema {
1049    let transformed_fields: Vec<Arc<Field>> = schema
1050        .fields
1051        .iter()
1052        .map(|field| match field.data_type() {
1053            DataType::Binary => field_with_new_type(field, DataType::Utf8),
1054            DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
1055            DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
1056            _ => Arc::clone(field),
1057        })
1058        .collect();
1059    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
1060}
1061
1062/// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`]
1063pub struct ObjectStoreFetch<'a> {
1064    store: &'a dyn ObjectStore,
1065    meta: &'a ObjectMeta,
1066}
1067
1068impl<'a> ObjectStoreFetch<'a> {
1069    pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
1070        Self { store, meta }
1071    }
1072}
1073
1074impl MetadataFetch for ObjectStoreFetch<'_> {
1075    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
1076        async {
1077            self.store
1078                .get_range(&self.meta.location, range)
1079                .await
1080                .map_err(ParquetError::from)
1081        }
1082        .boxed()
1083    }
1084}
1085
1086/// Fetches parquet metadata from ObjectStore for given object
1087///
1088/// This component is a subject to **change** in near future and is exposed for low level integrations
1089/// through [`ParquetFileReaderFactory`].
1090///
1091/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory
1092#[deprecated(
1093    since = "50.0.0",
1094    note = "Use `DFParquetMetadata::fetch_metadata` instead"
1095)]
1096pub async fn fetch_parquet_metadata(
1097    store: &dyn ObjectStore,
1098    object_meta: &ObjectMeta,
1099    size_hint: Option<usize>,
1100    decryption_properties: Option<&FileDecryptionProperties>,
1101    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1102) -> Result<Arc<ParquetMetaData>> {
1103    let decryption_properties = decryption_properties.cloned().map(Arc::new);
1104    DFParquetMetadata::new(store, object_meta)
1105        .with_metadata_size_hint(size_hint)
1106        .with_decryption_properties(decryption_properties)
1107        .with_file_metadata_cache(file_metadata_cache)
1108        .fetch_metadata()
1109        .await
1110}
1111
1112/// Read and parse the statistics of the Parquet file at location `path`
1113///
1114/// See [`statistics_from_parquet_meta_calc`] for more details
1115#[deprecated(
1116    since = "50.0.0",
1117    note = "Use `DFParquetMetadata::fetch_statistics` instead"
1118)]
1119pub async fn fetch_statistics(
1120    store: &dyn ObjectStore,
1121    table_schema: SchemaRef,
1122    file: &ObjectMeta,
1123    metadata_size_hint: Option<usize>,
1124    decryption_properties: Option<&FileDecryptionProperties>,
1125    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1126) -> Result<Statistics> {
1127    let decryption_properties = decryption_properties.cloned().map(Arc::new);
1128    DFParquetMetadata::new(store, file)
1129        .with_metadata_size_hint(metadata_size_hint)
1130        .with_decryption_properties(decryption_properties)
1131        .with_file_metadata_cache(file_metadata_cache)
1132        .fetch_statistics(&table_schema)
1133        .await
1134}
1135
1136#[deprecated(
1137    since = "50.0.0",
1138    note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
1139)]
1140#[expect(clippy::needless_pass_by_value)]
1141pub fn statistics_from_parquet_meta_calc(
1142    metadata: &ParquetMetaData,
1143    table_schema: SchemaRef,
1144) -> Result<Statistics> {
1145    DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
1146}
1147
1148/// Implements [`DataSink`] for writing to a parquet file.
1149pub struct ParquetSink {
1150    /// Config options for writing data
1151    config: FileSinkConfig,
1152    /// Underlying parquet options
1153    parquet_options: TableParquetOptions,
1154    /// File metadata from successfully produced parquet files. The Mutex is only used
1155    /// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
1156    written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
1157    /// Optional sorting columns to write to Parquet metadata
1158    sorting_columns: Option<Vec<SortingColumn>>,
1159}
1160
1161impl Debug for ParquetSink {
1162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1163        f.debug_struct("ParquetSink").finish()
1164    }
1165}
1166
1167impl DisplayAs for ParquetSink {
1168    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1169        match t {
1170            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1171                write!(f, "ParquetSink(file_groups=",)?;
1172                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1173                write!(f, ")")
1174            }
1175            DisplayFormatType::TreeRender => {
1176                // TODO: collect info
1177                write!(f, "")
1178            }
1179        }
1180    }
1181}
1182
1183impl ParquetSink {
1184    /// Create from config.
1185    pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1186        Self {
1187            config,
1188            parquet_options,
1189            written: Default::default(),
1190            sorting_columns: None,
1191        }
1192    }
1193
1194    /// Set sorting columns for the Parquet file metadata.
1195    pub fn with_sorting_columns(
1196        mut self,
1197        sorting_columns: Option<Vec<SortingColumn>>,
1198    ) -> Self {
1199        self.sorting_columns = sorting_columns;
1200        self
1201    }
1202
1203    /// Retrieve the file metadata for the written files, keyed to the path
1204    /// which may be partitioned (in the case of hive style partitioning).
1205    pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
1206        self.written.lock().clone()
1207    }
1208
1209    /// Create writer properties based upon configuration settings,
1210    /// including partitioning and the inclusion of arrow schema metadata.
1211    async fn create_writer_props(
1212        &self,
1213        runtime: &Arc<RuntimeEnv>,
1214        path: &Path,
1215    ) -> Result<WriterProperties> {
1216        let schema = self.config.output_schema();
1217
1218        // TODO: avoid this clone in follow up PR, where the writer properties & schema
1219        // are calculated once on `ParquetSink::new`
1220        let mut parquet_opts = self.parquet_options.clone();
1221        if !self.parquet_options.global.skip_arrow_metadata {
1222            parquet_opts.arrow_schema(schema);
1223        }
1224
1225        let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1226
1227        // Set sorting columns if configured
1228        if let Some(ref sorting_columns) = self.sorting_columns {
1229            builder = builder.set_sorting_columns(Some(sorting_columns.clone()));
1230        }
1231
1232        builder = set_writer_encryption_properties(
1233            builder,
1234            runtime,
1235            parquet_opts,
1236            schema,
1237            path,
1238        )
1239        .await?;
1240        Ok(builder.build())
1241    }
1242
1243    /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
1244    /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
1245    async fn create_async_arrow_writer(
1246        &self,
1247        location: &Path,
1248        object_store: Arc<dyn ObjectStore>,
1249        context: &Arc<TaskContext>,
1250        parquet_props: WriterProperties,
1251    ) -> Result<AsyncArrowWriter<BufWriter>> {
1252        let buf_writer = BufWriter::with_capacity(
1253            object_store,
1254            location.clone(),
1255            context
1256                .session_config()
1257                .options()
1258                .execution
1259                .objectstore_writer_buffer_size,
1260        );
1261        let options = ArrowWriterOptions::new()
1262            .with_properties(parquet_props)
1263            .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1264
1265        let writer = AsyncArrowWriter::try_new_with_options(
1266            buf_writer,
1267            get_writer_schema(&self.config),
1268            options,
1269        )?;
1270        Ok(writer)
1271    }
1272
1273    /// Parquet options
1274    pub fn parquet_options(&self) -> &TableParquetOptions {
1275        &self.parquet_options
1276    }
1277}
1278
1279#[cfg(feature = "parquet_encryption")]
1280async fn set_writer_encryption_properties(
1281    builder: WriterPropertiesBuilder,
1282    runtime: &Arc<RuntimeEnv>,
1283    parquet_opts: TableParquetOptions,
1284    schema: &Arc<Schema>,
1285    path: &Path,
1286) -> Result<WriterPropertiesBuilder> {
1287    if let Some(file_encryption_properties) = parquet_opts.crypto.file_encryption {
1288        // Encryption properties have been specified directly
1289        return Ok(builder.with_file_encryption_properties(Arc::new(
1290            FileEncryptionProperties::from(file_encryption_properties),
1291        )));
1292    } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
1293        // Encryption properties will be generated by an encryption factory
1294        let encryption_factory =
1295            runtime.parquet_encryption_factory(encryption_factory_id)?;
1296        let file_encryption_properties = encryption_factory
1297            .get_file_encryption_properties(
1298                &parquet_opts.crypto.factory_options,
1299                schema,
1300                path,
1301            )
1302            .await?;
1303        if let Some(file_encryption_properties) = file_encryption_properties {
1304            return Ok(
1305                builder.with_file_encryption_properties(file_encryption_properties)
1306            );
1307        }
1308    }
1309    Ok(builder)
1310}
1311
1312#[cfg(not(feature = "parquet_encryption"))]
1313async fn set_writer_encryption_properties(
1314    builder: WriterPropertiesBuilder,
1315    _runtime: &Arc<RuntimeEnv>,
1316    _parquet_opts: TableParquetOptions,
1317    _schema: &Arc<Schema>,
1318    _path: &Path,
1319) -> Result<WriterPropertiesBuilder> {
1320    Ok(builder)
1321}
1322
1323#[async_trait]
1324impl FileSink for ParquetSink {
1325    fn config(&self) -> &FileSinkConfig {
1326        &self.config
1327    }
1328
1329    async fn spawn_writer_tasks_and_join(
1330        &self,
1331        context: &Arc<TaskContext>,
1332        demux_task: SpawnedTask<Result<()>>,
1333        mut file_stream_rx: DemuxedStreamReceiver,
1334        object_store: Arc<dyn ObjectStore>,
1335    ) -> Result<u64> {
1336        let parquet_opts = &self.parquet_options;
1337
1338        let mut file_write_tasks: JoinSet<
1339            std::result::Result<(Path, ParquetMetaData), DataFusionError>,
1340        > = JoinSet::new();
1341
1342        let runtime = context.runtime_env();
1343        let parallel_options = ParallelParquetWriterOptions {
1344            max_parallel_row_groups: parquet_opts
1345                .global
1346                .maximum_parallel_row_group_writers,
1347            max_buffered_record_batches_per_stream: parquet_opts
1348                .global
1349                .maximum_buffered_record_batches_per_stream,
1350        };
1351
1352        while let Some((path, mut rx)) = file_stream_rx.recv().await {
1353            let parquet_props = self.create_writer_props(&runtime, &path).await?;
1354            if !parquet_opts.global.allow_single_file_parallelism {
1355                let mut writer = self
1356                    .create_async_arrow_writer(
1357                        &path,
1358                        Arc::clone(&object_store),
1359                        context,
1360                        parquet_props.clone(),
1361                    )
1362                    .await?;
1363                let reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
1364                    .register(context.memory_pool());
1365                file_write_tasks.spawn(async move {
1366                    while let Some(batch) = rx.recv().await {
1367                        writer.write(&batch).await?;
1368                        reservation.try_resize(writer.memory_size())?;
1369                    }
1370                    let parquet_meta_data = writer
1371                        .close()
1372                        .await
1373                        .map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1374                    Ok((path, parquet_meta_data))
1375                });
1376            } else {
1377                let writer = ObjectWriterBuilder::new(
1378                    // Parquet files as a whole are never compressed, since they
1379                    // manage compressed blocks themselves.
1380                    FileCompressionType::UNCOMPRESSED,
1381                    &path,
1382                    Arc::clone(&object_store),
1383                )
1384                .with_buffer_size(Some(
1385                    context
1386                        .session_config()
1387                        .options()
1388                        .execution
1389                        .objectstore_writer_buffer_size,
1390                ))
1391                .build()?;
1392                let schema = get_writer_schema(&self.config);
1393                let props = parquet_props.clone();
1394                let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
1395                let parallel_options_clone = parallel_options.clone();
1396                let pool = Arc::clone(context.memory_pool());
1397                file_write_tasks.spawn(async move {
1398                    let parquet_meta_data = output_single_parquet_file_parallelized(
1399                        writer,
1400                        rx,
1401                        schema,
1402                        &props,
1403                        skip_arrow_metadata,
1404                        parallel_options_clone,
1405                        pool,
1406                    )
1407                    .await?;
1408                    Ok((path, parquet_meta_data))
1409                });
1410            }
1411        }
1412
1413        let mut row_count = 0;
1414        while let Some(result) = file_write_tasks.join_next().await {
1415            match result {
1416                Ok(r) => {
1417                    let (path, parquet_meta_data) = r?;
1418                    row_count += parquet_meta_data.file_metadata().num_rows();
1419                    let mut written_files = self.written.lock();
1420                    written_files
1421                        .try_insert(path.clone(), parquet_meta_data)
1422                        .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1423                    drop(written_files);
1424                }
1425                Err(e) => {
1426                    if e.is_panic() {
1427                        std::panic::resume_unwind(e.into_panic());
1428                    } else {
1429                        unreachable!();
1430                    }
1431                }
1432            }
1433        }
1434
1435        demux_task
1436            .join_unwind()
1437            .await
1438            .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1439
1440        Ok(row_count as u64)
1441    }
1442}
1443
1444#[async_trait]
1445impl DataSink for ParquetSink {
1446    fn as_any(&self) -> &dyn Any {
1447        self
1448    }
1449
1450    fn schema(&self) -> &SchemaRef {
1451        self.config.output_schema()
1452    }
1453
1454    async fn write_all(
1455        &self,
1456        data: SendableRecordBatchStream,
1457        context: &Arc<TaskContext>,
1458    ) -> Result<u64> {
1459        FileSink::write_all(self, data, context).await
1460    }
1461}
1462
1463/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them using an [ArrowColumnWriter]
1464/// Once the channel is exhausted, returns the ArrowColumnWriter.
1465async fn column_serializer_task(
1466    mut rx: Receiver<ArrowLeafColumn>,
1467    mut writer: ArrowColumnWriter,
1468    reservation: MemoryReservation,
1469) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1470    while let Some(col) = rx.recv().await {
1471        writer.write(&col)?;
1472        reservation.try_resize(writer.memory_size())?;
1473    }
1474    Ok((writer, reservation))
1475}
1476
1477type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1478type ColSender = Sender<ArrowLeafColumn>;
1479
1480/// Spawns a parallel serialization task for each column
1481/// Returns join handles for each columns serialization task along with a send channel
1482/// to send arrow arrays to each serialization task.
1483fn spawn_column_parallel_row_group_writer(
1484    col_writers: Vec<ArrowColumnWriter>,
1485    max_buffer_size: usize,
1486    pool: &Arc<dyn MemoryPool>,
1487) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1488    let num_columns = col_writers.len();
1489
1490    let mut col_writer_tasks = Vec::with_capacity(num_columns);
1491    let mut col_array_channels = Vec::with_capacity(num_columns);
1492    for writer in col_writers.into_iter() {
1493        // Buffer size of this channel limits the number of arrays queued up for column level serialization
1494        let (send_array, receive_array) =
1495            mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1496        col_array_channels.push(send_array);
1497
1498        let reservation =
1499            MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1500        let task = SpawnedTask::spawn(column_serializer_task(
1501            receive_array,
1502            writer,
1503            reservation,
1504        ));
1505        col_writer_tasks.push(task);
1506    }
1507
1508    Ok((col_writer_tasks, col_array_channels))
1509}
1510
1511/// Settings related to writing parquet files in parallel
1512#[derive(Clone)]
1513struct ParallelParquetWriterOptions {
1514    max_parallel_row_groups: usize,
1515    max_buffered_record_batches_per_stream: usize,
1516}
1517
1518/// This is the return type of calling [ArrowColumnWriter].close() on each column
1519/// i.e. the Vec of encoded columns which can be appended to a row group
1520type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1521
1522/// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective
1523/// parallel column serializers.
1524async fn send_arrays_to_col_writers(
1525    col_array_channels: &[ColSender],
1526    rb: &RecordBatch,
1527    schema: Arc<Schema>,
1528) -> Result<()> {
1529    // Each leaf column has its own channel, increment next_channel for each leaf column sent.
1530    let mut next_channel = 0;
1531    for (array, field) in rb.columns().iter().zip(schema.fields()) {
1532        for c in compute_leaves(field, array)? {
1533            // Do not surface error from closed channel (means something
1534            // else hit an error, and the plan is shutting down).
1535            if col_array_channels[next_channel].send(c).await.is_err() {
1536                return Ok(());
1537            }
1538
1539            next_channel += 1;
1540        }
1541    }
1542
1543    Ok(())
1544}
1545
1546/// Spawns a tokio task which joins the parallel column writer tasks,
1547/// and finalizes the row group
1548fn spawn_rg_join_and_finalize_task(
1549    column_writer_tasks: Vec<ColumnWriterTask>,
1550    rg_rows: usize,
1551    pool: &Arc<dyn MemoryPool>,
1552) -> SpawnedTask<RBStreamSerializeResult> {
1553    let rg_reservation =
1554        MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1555
1556    SpawnedTask::spawn(async move {
1557        let num_cols = column_writer_tasks.len();
1558        let mut finalized_rg = Vec::with_capacity(num_cols);
1559        for task in column_writer_tasks.into_iter() {
1560            let (writer, _col_reservation) = task
1561                .join_unwind()
1562                .await
1563                .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1564            let encoded_size = writer.get_estimated_total_bytes();
1565            rg_reservation.grow(encoded_size);
1566            finalized_rg.push(writer.close()?);
1567        }
1568
1569        Ok((finalized_rg, rg_reservation, rg_rows))
1570    })
1571}
1572
1573/// This task coordinates the serialization of a parquet file in parallel.
1574/// As the query produces RecordBatches, these are written to a RowGroup
1575/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per
1576/// row group is reached, the parallel tasks are joined on another separate task
1577/// and sent to a concatenation task. This task immediately continues to work
1578/// on the next row group in parallel. So, parquet serialization is parallelized
1579/// across both columns and row_groups, with a theoretical max number of parallel tasks
1580/// given by n_columns * num_row_groups.
1581fn spawn_parquet_parallel_serialization_task(
1582    row_group_writer_factory: ArrowRowGroupWriterFactory,
1583    mut data: Receiver<RecordBatch>,
1584    serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1585    schema: Arc<Schema>,
1586    writer_props: Arc<WriterProperties>,
1587    parallel_options: Arc<ParallelParquetWriterOptions>,
1588    pool: Arc<dyn MemoryPool>,
1589) -> SpawnedTask<Result<(), DataFusionError>> {
1590    SpawnedTask::spawn(async move {
1591        let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1592        let max_row_group_rows = writer_props
1593            .max_row_group_row_count()
1594            .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT);
1595        let mut row_group_index = 0;
1596        let col_writers =
1597            row_group_writer_factory.create_column_writers(row_group_index)?;
1598        let (mut column_writer_handles, mut col_array_channels) =
1599            spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
1600        let mut current_rg_rows = 0;
1601
1602        while let Some(mut rb) = data.recv().await {
1603            // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
1604            // when max_row_group_rows < execution.batch_size as an alternative to a recursive async
1605            // function.
1606            loop {
1607                if current_rg_rows + rb.num_rows() < max_row_group_rows {
1608                    send_arrays_to_col_writers(
1609                        &col_array_channels,
1610                        &rb,
1611                        Arc::clone(&schema),
1612                    )
1613                    .await?;
1614                    current_rg_rows += rb.num_rows();
1615                    break;
1616                } else {
1617                    let rows_left = max_row_group_rows - current_rg_rows;
1618                    let a = rb.slice(0, rows_left);
1619                    send_arrays_to_col_writers(
1620                        &col_array_channels,
1621                        &a,
1622                        Arc::clone(&schema),
1623                    )
1624                    .await?;
1625
1626                    // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
1627                    // on a separate task, so that we can immediately start on the next RG before waiting
1628                    // for the current one to finish.
1629                    drop(col_array_channels);
1630                    let finalize_rg_task = spawn_rg_join_and_finalize_task(
1631                        column_writer_handles,
1632                        max_row_group_rows,
1633                        &pool,
1634                    );
1635
1636                    // Do not surface error from closed channel (means something
1637                    // else hit an error, and the plan is shutting down).
1638                    if serialize_tx.send(finalize_rg_task).await.is_err() {
1639                        return Ok(());
1640                    }
1641
1642                    current_rg_rows = 0;
1643                    rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1644
1645                    row_group_index += 1;
1646                    let col_writers = row_group_writer_factory
1647                        .create_column_writers(row_group_index)?;
1648                    (column_writer_handles, col_array_channels) =
1649                        spawn_column_parallel_row_group_writer(
1650                            col_writers,
1651                            max_buffer_rb,
1652                            &pool,
1653                        )?;
1654                }
1655            }
1656        }
1657
1658        drop(col_array_channels);
1659        // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows
1660        if current_rg_rows > 0 {
1661            let finalize_rg_task = spawn_rg_join_and_finalize_task(
1662                column_writer_handles,
1663                current_rg_rows,
1664                &pool,
1665            );
1666
1667            // Do not surface error from closed channel (means something
1668            // else hit an error, and the plan is shutting down).
1669            if serialize_tx.send(finalize_rg_task).await.is_err() {
1670                return Ok(());
1671            }
1672        }
1673
1674        Ok(())
1675    })
1676}
1677
1678/// Consume RowGroups serialized by other parallel tasks and concatenate them in
1679/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
1680async fn concatenate_parallel_row_groups(
1681    mut parquet_writer: SerializedFileWriter<SharedBuffer>,
1682    merged_buff: SharedBuffer,
1683    mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1684    mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1685    pool: Arc<dyn MemoryPool>,
1686) -> Result<ParquetMetaData> {
1687    let file_reservation =
1688        MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1689
1690    while let Some(task) = serialize_rx.recv().await {
1691        let result = task.join_unwind().await;
1692        let (serialized_columns, rg_reservation, _cnt) =
1693            result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1694
1695        let mut rg_out = parquet_writer.next_row_group()?;
1696        for chunk in serialized_columns {
1697            chunk.append_to_row_group(&mut rg_out)?;
1698            rg_reservation.free();
1699
1700            let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1701            file_reservation.try_resize(buff_to_flush.len())?;
1702
1703            if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1704                object_store_writer
1705                    .write_all(buff_to_flush.as_slice())
1706                    .await?;
1707                buff_to_flush.clear();
1708                file_reservation.try_resize(buff_to_flush.len())?; // will set to zero
1709            }
1710        }
1711        rg_out.close()?;
1712    }
1713
1714    let parquet_meta_data = parquet_writer.close()?;
1715    let final_buff = merged_buff.buffer.try_lock().unwrap();
1716
1717    object_store_writer.write_all(final_buff.as_slice()).await?;
1718    object_store_writer.shutdown().await?;
1719    file_reservation.free();
1720
1721    Ok(parquet_meta_data)
1722}
1723
1724/// Parallelizes the serialization of a single parquet file, by first serializing N
1725/// independent RecordBatch streams in parallel to RowGroups in memory. Another
1726/// task then stitches these independent RowGroups together and streams this large
1727/// single parquet file to an ObjectStore in multiple parts.
1728async fn output_single_parquet_file_parallelized(
1729    object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1730    data: Receiver<RecordBatch>,
1731    output_schema: Arc<Schema>,
1732    parquet_props: &WriterProperties,
1733    skip_arrow_metadata: bool,
1734    parallel_options: ParallelParquetWriterOptions,
1735    pool: Arc<dyn MemoryPool>,
1736) -> Result<ParquetMetaData> {
1737    let max_rowgroups = parallel_options.max_parallel_row_groups;
1738    // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
1739    let (serialize_tx, serialize_rx) =
1740        mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1741
1742    let arc_props = Arc::new(parquet_props.clone());
1743    let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1744    let options = ArrowWriterOptions::new()
1745        .with_properties(parquet_props.clone())
1746        .with_skip_arrow_metadata(skip_arrow_metadata);
1747    let writer = ArrowWriter::try_new_with_options(
1748        merged_buff.clone(),
1749        Arc::clone(&output_schema),
1750        options,
1751    )?;
1752    let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
1753
1754    let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1755        row_group_writer_factory,
1756        data,
1757        serialize_tx,
1758        Arc::clone(&output_schema),
1759        Arc::clone(&arc_props),
1760        parallel_options.into(),
1761        Arc::clone(&pool),
1762    );
1763    let parquet_meta_data = concatenate_parallel_row_groups(
1764        writer,
1765        merged_buff,
1766        serialize_rx,
1767        object_store_writer,
1768        pool,
1769    )
1770    .await?;
1771
1772    launch_serialization_task
1773        .join_unwind()
1774        .await
1775        .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1776    Ok(parquet_meta_data)
1777}
1778
1779#[cfg(test)]
1780mod tests {
1781    use parquet::arrow::parquet_to_arrow_schema;
1782    use std::sync::Arc;
1783
1784    use super::*;
1785
1786    use arrow::datatypes::DataType;
1787    use parquet::schema::parser::parse_message_type;
1788
1789    #[test]
1790    fn coerce_int96_to_resolution_with_mixed_timestamps() {
1791        // Unclear if Spark (or other writer) could generate a file with mixed timestamps like this,
1792        // but we want to test the scenario just in case since it's at least a valid schema as far
1793        // as the Parquet spec is concerned.
1794        let spark_schema = "
1795        message spark_schema {
1796          optional int96 c0;
1797          optional int64 c1 (TIMESTAMP(NANOS,true));
1798          optional int64 c2 (TIMESTAMP(NANOS,false));
1799          optional int64 c3 (TIMESTAMP(MILLIS,true));
1800          optional int64 c4 (TIMESTAMP(MILLIS,false));
1801          optional int64 c5 (TIMESTAMP(MICROS,true));
1802          optional int64 c6 (TIMESTAMP(MICROS,false));
1803        }
1804        ";
1805
1806        let schema = parse_message_type(spark_schema).expect("should parse schema");
1807        let descr = SchemaDescriptor::new(Arc::new(schema));
1808
1809        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1810
1811        let result =
1812            coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1813                .unwrap();
1814
1815        // Only the first field (c0) should be converted to a microsecond timestamp because it's the
1816        // only timestamp that originated from an INT96.
1817        let expected_schema = Schema::new(vec![
1818            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1819            Field::new(
1820                "c1",
1821                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1822                true,
1823            ),
1824            Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1825            Field::new(
1826                "c3",
1827                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1828                true,
1829            ),
1830            Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1831            Field::new(
1832                "c5",
1833                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1834                true,
1835            ),
1836            Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1837        ]);
1838
1839        assert_eq!(result, expected_schema);
1840    }
1841
1842    #[test]
1843    fn coerce_int96_to_resolution_with_nested_types() {
1844        // This schema is derived from Comet's CometFuzzTestSuite ParquetGenerator only using int96
1845        // primitive types with generateStruct, generateArray, and generateMap set to true, with one
1846        // additional field added to c4's struct to make sure all fields in a struct get modified.
1847        // https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala
1848        let spark_schema = "
1849        message spark_schema {
1850          optional int96 c0;
1851          optional group c1 {
1852            optional int96 c0;
1853          }
1854          optional group c2 {
1855            optional group c0 (LIST) {
1856              repeated group list {
1857                optional int96 element;
1858              }
1859            }
1860          }
1861          optional group c3 (LIST) {
1862            repeated group list {
1863              optional int96 element;
1864            }
1865          }
1866          optional group c4 (LIST) {
1867            repeated group list {
1868              optional group element {
1869                optional int96 c0;
1870                optional int96 c1;
1871              }
1872            }
1873          }
1874          optional group c5 (MAP) {
1875            repeated group key_value {
1876              required int96 key;
1877              optional int96 value;
1878            }
1879          }
1880          optional group c6 (LIST) {
1881            repeated group list {
1882              optional group element (MAP) {
1883                repeated group key_value {
1884                  required int96 key;
1885                  optional int96 value;
1886                }
1887              }
1888            }
1889          }
1890        }
1891        ";
1892
1893        let schema = parse_message_type(spark_schema).expect("should parse schema");
1894        let descr = SchemaDescriptor::new(Arc::new(schema));
1895
1896        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1897
1898        let result =
1899            coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1900                .unwrap();
1901
1902        let expected_schema = Schema::new(vec![
1903            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1904            Field::new_struct(
1905                "c1",
1906                vec![Field::new(
1907                    "c0",
1908                    DataType::Timestamp(TimeUnit::Microsecond, None),
1909                    true,
1910                )],
1911                true,
1912            ),
1913            Field::new_struct(
1914                "c2",
1915                vec![Field::new_list(
1916                    "c0",
1917                    Field::new(
1918                        "element",
1919                        DataType::Timestamp(TimeUnit::Microsecond, None),
1920                        true,
1921                    ),
1922                    true,
1923                )],
1924                true,
1925            ),
1926            Field::new_list(
1927                "c3",
1928                Field::new(
1929                    "element",
1930                    DataType::Timestamp(TimeUnit::Microsecond, None),
1931                    true,
1932                ),
1933                true,
1934            ),
1935            Field::new_list(
1936                "c4",
1937                Field::new_struct(
1938                    "element",
1939                    vec![
1940                        Field::new(
1941                            "c0",
1942                            DataType::Timestamp(TimeUnit::Microsecond, None),
1943                            true,
1944                        ),
1945                        Field::new(
1946                            "c1",
1947                            DataType::Timestamp(TimeUnit::Microsecond, None),
1948                            true,
1949                        ),
1950                    ],
1951                    true,
1952                ),
1953                true,
1954            ),
1955            Field::new_map(
1956                "c5",
1957                "key_value",
1958                Field::new(
1959                    "key",
1960                    DataType::Timestamp(TimeUnit::Microsecond, None),
1961                    false,
1962                ),
1963                Field::new(
1964                    "value",
1965                    DataType::Timestamp(TimeUnit::Microsecond, None),
1966                    true,
1967                ),
1968                false,
1969                true,
1970            ),
1971            Field::new_list(
1972                "c6",
1973                Field::new_map(
1974                    "element",
1975                    "key_value",
1976                    Field::new(
1977                        "key",
1978                        DataType::Timestamp(TimeUnit::Microsecond, None),
1979                        false,
1980                    ),
1981                    Field::new(
1982                        "value",
1983                        DataType::Timestamp(TimeUnit::Microsecond, None),
1984                        true,
1985                    ),
1986                    false,
1987                    true,
1988                ),
1989                true,
1990            ),
1991        ]);
1992
1993        assert_eq!(result, expected_schema);
1994    }
1995}