datafusion_datasource_parquet/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::cell::RefCell;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{fmt, vec};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
30use datafusion_datasource::TableSchema;
31use datafusion_datasource::file_compression_type::FileCompressionType;
32use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
33use datafusion_datasource::write::{
34    ObjectWriterBuilder, SharedBuffer, get_writer_schema,
35};
36
37use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
38use datafusion_datasource::write::demux::DemuxedStreamReceiver;
39
40use arrow::datatypes::{DataType, Field, FieldRef};
41use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
42use datafusion_common::encryption::FileDecryptionProperties;
43use datafusion_common::parsers::CompressionTypeVariant;
44use datafusion_common::{
45    DEFAULT_PARQUET_EXTENSION, DataFusionError, GetExt, HashSet, Result,
46    internal_datafusion_err, internal_err, not_impl_err,
47};
48use datafusion_common::{HashMap, Statistics};
49use datafusion_common_runtime::{JoinSet, SpawnedTask};
50use datafusion_datasource::display::FileGroupDisplay;
51use datafusion_datasource::file::FileSource;
52use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
53use datafusion_datasource::sink::{DataSink, DataSinkExec};
54use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
55use datafusion_execution::{SendableRecordBatchStream, TaskContext};
56use datafusion_expr::dml::InsertOp;
57use datafusion_physical_expr_common::sort_expr::LexRequirement;
58use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
59use datafusion_session::Session;
60
61use crate::metadata::DFParquetMetadata;
62use crate::reader::CachedParquetFileReaderFactory;
63use crate::source::{ParquetSource, parse_coerce_int96_string};
64use async_trait::async_trait;
65use bytes::Bytes;
66use datafusion_datasource::source::DataSourceExec;
67use datafusion_execution::cache::cache_manager::FileMetadataCache;
68use datafusion_execution::runtime_env::RuntimeEnv;
69use futures::future::BoxFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use object_store::buffered::BufWriter;
72use object_store::path::Path;
73use object_store::{ObjectMeta, ObjectStore};
74use parquet::arrow::arrow_writer::{
75    ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory,
76    ArrowWriterOptions, compute_leaves,
77};
78use parquet::arrow::async_reader::MetadataFetch;
79use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
80use parquet::basic::Type;
81#[cfg(feature = "parquet_encryption")]
82use parquet::encryption::encrypt::FileEncryptionProperties;
83use parquet::errors::ParquetError;
84use parquet::file::metadata::ParquetMetaData;
85use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
86use parquet::file::writer::SerializedFileWriter;
87use parquet::schema::types::SchemaDescriptor;
88use tokio::io::{AsyncWrite, AsyncWriteExt};
89use tokio::sync::mpsc::{self, Receiver, Sender};
90
91/// Initial writing buffer size. Note this is just a size hint for efficiency. It
92/// will grow beyond the set value if needed.
93const INITIAL_BUFFER_BYTES: usize = 1048576;
94
95/// When writing parquet files in parallel, if the buffered Parquet data exceeds
96/// this size, it is flushed to object store
97const BUFFER_FLUSH_BYTES: usize = 1024000;
98
99#[derive(Default)]
100/// Factory struct used to create [ParquetFormat]
101pub struct ParquetFormatFactory {
102    /// inner options for parquet
103    pub options: Option<TableParquetOptions>,
104}
105
106impl ParquetFormatFactory {
107    /// Creates an instance of [ParquetFormatFactory]
108    pub fn new() -> Self {
109        Self { options: None }
110    }
111
112    /// Creates an instance of [ParquetFormatFactory] with customized default options
113    pub fn new_with_options(options: TableParquetOptions) -> Self {
114        Self {
115            options: Some(options),
116        }
117    }
118}
119
120impl FileFormatFactory for ParquetFormatFactory {
121    fn create(
122        &self,
123        state: &dyn Session,
124        format_options: &std::collections::HashMap<String, String>,
125    ) -> Result<Arc<dyn FileFormat>> {
126        let parquet_options = match &self.options {
127            None => {
128                let mut table_options = state.default_table_options();
129                table_options.set_config_format(ConfigFileType::PARQUET);
130                table_options.alter_with_string_hash_map(format_options)?;
131                table_options.parquet
132            }
133            Some(parquet_options) => {
134                let mut parquet_options = parquet_options.clone();
135                for (k, v) in format_options {
136                    parquet_options.set(k, v)?;
137                }
138                parquet_options
139            }
140        };
141
142        Ok(Arc::new(
143            ParquetFormat::default().with_options(parquet_options),
144        ))
145    }
146
147    fn default(&self) -> Arc<dyn FileFormat> {
148        Arc::new(ParquetFormat::default())
149    }
150
151    fn as_any(&self) -> &dyn Any {
152        self
153    }
154}
155
156impl GetExt for ParquetFormatFactory {
157    fn get_ext(&self) -> String {
158        // Removes the dot, i.e. ".parquet" -> "parquet"
159        DEFAULT_PARQUET_EXTENSION[1..].to_string()
160    }
161}
162
163impl Debug for ParquetFormatFactory {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        f.debug_struct("ParquetFormatFactory")
166            .field("ParquetFormatFactory", &self.options)
167            .finish()
168    }
169}
170/// The Apache Parquet `FileFormat` implementation
171#[derive(Debug, Default)]
172pub struct ParquetFormat {
173    options: TableParquetOptions,
174}
175
176impl ParquetFormat {
177    /// Construct a new Format with no local overrides
178    pub fn new() -> Self {
179        Self::default()
180    }
181
182    /// Activate statistics based row group level pruning
183    /// - If `None`, defaults to value on `config_options`
184    pub fn with_enable_pruning(mut self, enable: bool) -> Self {
185        self.options.global.pruning = enable;
186        self
187    }
188
189    /// Return `true` if pruning is enabled
190    pub fn enable_pruning(&self) -> bool {
191        self.options.global.pruning
192    }
193
194    /// Provide a hint to the size of the file metadata. If a hint is provided
195    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
196    /// Without a hint, two read are required. One read to fetch the 8-byte parquet footer and then
197    /// another read to fetch the metadata length encoded in the footer.
198    ///
199    /// - If `None`, defaults to value on `config_options`
200    pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
201        self.options.global.metadata_size_hint = size_hint;
202        self
203    }
204
205    /// Return the metadata size hint if set
206    pub fn metadata_size_hint(&self) -> Option<usize> {
207        self.options.global.metadata_size_hint
208    }
209
210    /// Tell the parquet reader to skip any metadata that may be in
211    /// the file Schema. This can help avoid schema conflicts due to
212    /// metadata.
213    ///
214    /// - If `None`, defaults to value on `config_options`
215    pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
216        self.options.global.skip_metadata = skip_metadata;
217        self
218    }
219
220    /// Returns `true` if schema metadata will be cleared prior to
221    /// schema merging.
222    pub fn skip_metadata(&self) -> bool {
223        self.options.global.skip_metadata
224    }
225
226    /// Set Parquet options for the ParquetFormat
227    pub fn with_options(mut self, options: TableParquetOptions) -> Self {
228        self.options = options;
229        self
230    }
231
232    /// Parquet options
233    pub fn options(&self) -> &TableParquetOptions {
234        &self.options
235    }
236
237    /// Return `true` if should use view types.
238    ///
239    /// If this returns true, DataFusion will instruct the parquet reader
240    /// to read string / binary columns using view `StringView` or `BinaryView`
241    /// if the table schema specifies those types, regardless of any embedded metadata
242    /// that may specify an alternate Arrow type. The parquet reader is optimized
243    /// for reading `StringView` and `BinaryView` and such queries are significantly faster.
244    ///
245    /// If this returns false, the parquet reader will read the columns according to the
246    /// defaults or any embedded Arrow type information. This may result in reading
247    /// `StringArrays` and then casting to `StringViewArray` which is less efficient.
248    pub fn force_view_types(&self) -> bool {
249        self.options.global.schema_force_view_types
250    }
251
252    /// If true, will use view types. See [`Self::force_view_types`] for details
253    pub fn with_force_view_types(mut self, use_views: bool) -> Self {
254        self.options.global.schema_force_view_types = use_views;
255        self
256    }
257
258    /// Return `true` if binary types will be read as strings.
259    ///
260    /// If this returns true, DataFusion will instruct the parquet reader
261    /// to read binary columns such as `Binary` or `BinaryView` as the
262    /// corresponding string type such as `Utf8` or `LargeUtf8`.
263    /// The parquet reader has special optimizations for `Utf8` and `LargeUtf8`
264    /// validation, and such queries are significantly faster than reading
265    /// binary columns and then casting to string columns.
266    pub fn binary_as_string(&self) -> bool {
267        self.options.global.binary_as_string
268    }
269
270    /// If true, will read binary types as strings. See [`Self::binary_as_string`] for details
271    pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
272        self.options.global.binary_as_string = binary_as_string;
273        self
274    }
275
276    pub fn coerce_int96(&self) -> Option<String> {
277        self.options.global.coerce_int96.clone()
278    }
279
280    pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
281        self.options.global.coerce_int96 = time_unit;
282        self
283    }
284}
285
286/// Clears all metadata (Schema level and field level) on an iterator
287/// of Schemas
288fn clear_metadata(
289    schemas: impl IntoIterator<Item = Schema>,
290) -> impl Iterator<Item = Schema> {
291    schemas.into_iter().map(|schema| {
292        let fields = schema
293            .fields()
294            .iter()
295            .map(|field| {
296                field.as_ref().clone().with_metadata(Default::default()) // clear meta
297            })
298            .collect::<Fields>();
299        Schema::new(fields)
300    })
301}
302
303#[cfg(feature = "parquet_encryption")]
304async fn get_file_decryption_properties(
305    state: &dyn Session,
306    options: &TableParquetOptions,
307    file_path: &Path,
308) -> Result<Option<Arc<FileDecryptionProperties>>> {
309    Ok(match &options.crypto.file_decryption {
310        Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))),
311        None => match &options.crypto.factory_id {
312            Some(factory_id) => {
313                let factory =
314                    state.runtime_env().parquet_encryption_factory(factory_id)?;
315                factory
316                    .get_file_decryption_properties(
317                        &options.crypto.factory_options,
318                        file_path,
319                    )
320                    .await?
321            }
322            None => None,
323        },
324    })
325}
326
327#[cfg(not(feature = "parquet_encryption"))]
328async fn get_file_decryption_properties(
329    _state: &dyn Session,
330    _options: &TableParquetOptions,
331    _file_path: &Path,
332) -> Result<Option<Arc<FileDecryptionProperties>>> {
333    Ok(None)
334}
335
336#[async_trait]
337impl FileFormat for ParquetFormat {
338    fn as_any(&self) -> &dyn Any {
339        self
340    }
341
342    fn get_ext(&self) -> String {
343        ParquetFormatFactory::new().get_ext()
344    }
345
346    fn get_ext_with_compression(
347        &self,
348        file_compression_type: &FileCompressionType,
349    ) -> Result<String> {
350        let ext = self.get_ext();
351        match file_compression_type.get_variant() {
352            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
353            _ => internal_err!("Parquet FileFormat does not support compression."),
354        }
355    }
356
357    fn compression_type(&self) -> Option<FileCompressionType> {
358        None
359    }
360
361    async fn infer_schema(
362        &self,
363        state: &dyn Session,
364        store: &Arc<dyn ObjectStore>,
365        objects: &[ObjectMeta],
366    ) -> Result<SchemaRef> {
367        let coerce_int96 = match self.coerce_int96() {
368            Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
369            None => None,
370        };
371
372        let file_metadata_cache =
373            state.runtime_env().cache_manager.get_file_metadata_cache();
374
375        let mut schemas: Vec<_> = futures::stream::iter(objects)
376            .map(|object| async {
377                let file_decryption_properties = get_file_decryption_properties(
378                    state,
379                    &self.options,
380                    &object.location,
381                )
382                .await?;
383                let result = DFParquetMetadata::new(store.as_ref(), object)
384                    .with_metadata_size_hint(self.metadata_size_hint())
385                    .with_decryption_properties(file_decryption_properties)
386                    .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
387                    .with_coerce_int96(coerce_int96)
388                    .fetch_schema_with_location()
389                    .await?;
390                Ok::<_, DataFusionError>(result)
391            })
392            .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
393            // fetch schemas concurrently, if requested
394            .buffered(state.config_options().execution.meta_fetch_concurrency)
395            .try_collect()
396            .await?;
397
398        // Schema inference adds fields based the order they are seen
399        // which depends on the order the files are processed. For some
400        // object stores (like local file systems) the order returned from list
401        // is not deterministic. Thus, to ensure deterministic schema inference
402        // sort the files first.
403        // https://github.com/apache/datafusion/pull/6629
404        schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
405
406        let schemas = schemas
407            .into_iter()
408            .map(|(_, schema)| schema)
409            .collect::<Vec<_>>();
410
411        let schema = if self.skip_metadata() {
412            Schema::try_merge(clear_metadata(schemas))
413        } else {
414            Schema::try_merge(schemas)
415        }?;
416
417        let schema = if self.binary_as_string() {
418            transform_binary_to_string(&schema)
419        } else {
420            schema
421        };
422
423        let schema = if self.force_view_types() {
424            transform_schema_to_view(&schema)
425        } else {
426            schema
427        };
428
429        Ok(Arc::new(schema))
430    }
431
432    async fn infer_stats(
433        &self,
434        state: &dyn Session,
435        store: &Arc<dyn ObjectStore>,
436        table_schema: SchemaRef,
437        object: &ObjectMeta,
438    ) -> Result<Statistics> {
439        let file_decryption_properties =
440            get_file_decryption_properties(state, &self.options, &object.location)
441                .await?;
442        let file_metadata_cache =
443            state.runtime_env().cache_manager.get_file_metadata_cache();
444        DFParquetMetadata::new(store, object)
445            .with_metadata_size_hint(self.metadata_size_hint())
446            .with_decryption_properties(file_decryption_properties)
447            .with_file_metadata_cache(Some(file_metadata_cache))
448            .fetch_statistics(&table_schema)
449            .await
450    }
451
452    async fn create_physical_plan(
453        &self,
454        state: &dyn Session,
455        conf: FileScanConfig,
456    ) -> Result<Arc<dyn ExecutionPlan>> {
457        let mut metadata_size_hint = None;
458
459        if let Some(metadata) = self.metadata_size_hint() {
460            metadata_size_hint = Some(metadata);
461        }
462
463        let mut source = conf
464            .file_source()
465            .as_any()
466            .downcast_ref::<ParquetSource>()
467            .cloned()
468            .ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
469        source = source.with_table_parquet_options(self.options.clone());
470
471        // Use the CachedParquetFileReaderFactory
472        let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
473        let store = state
474            .runtime_env()
475            .object_store(conf.object_store_url.clone())?;
476        let cached_parquet_read_factory =
477            Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
478        source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
479
480        if let Some(metadata_size_hint) = metadata_size_hint {
481            source = source.with_metadata_size_hint(metadata_size_hint)
482        }
483
484        source = self.set_source_encryption_factory(source, state)?;
485
486        let conf = FileScanConfigBuilder::from(conf)
487            .with_source(Arc::new(source))
488            .build();
489        Ok(DataSourceExec::from_data_source(conf))
490    }
491
492    async fn create_writer_physical_plan(
493        &self,
494        input: Arc<dyn ExecutionPlan>,
495        _state: &dyn Session,
496        conf: FileSinkConfig,
497        order_requirements: Option<LexRequirement>,
498    ) -> Result<Arc<dyn ExecutionPlan>> {
499        if conf.insert_op != InsertOp::Append {
500            return not_impl_err!("Overwrites are not implemented yet for Parquet");
501        }
502
503        let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
504
505        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
506    }
507
508    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
509        Arc::new(
510            ParquetSource::new(table_schema)
511                .with_table_parquet_options(self.options.clone()),
512        )
513    }
514}
515
516#[cfg(feature = "parquet_encryption")]
517impl ParquetFormat {
518    fn set_source_encryption_factory(
519        &self,
520        source: ParquetSource,
521        state: &dyn Session,
522    ) -> Result<ParquetSource> {
523        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
524            Ok(source.with_encryption_factory(
525                state
526                    .runtime_env()
527                    .parquet_encryption_factory(encryption_factory_id)?,
528            ))
529        } else {
530            Ok(source)
531        }
532    }
533}
534
535#[cfg(not(feature = "parquet_encryption"))]
536impl ParquetFormat {
537    fn set_source_encryption_factory(
538        &self,
539        source: ParquetSource,
540        _state: &dyn Session,
541    ) -> Result<ParquetSource> {
542        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
543            Err(DataFusionError::Configuration(format!(
544                "Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"
545            )))
546        } else {
547            Ok(source)
548        }
549    }
550}
551
552/// Apply necessary schema type coercions to make file schema match table schema.
553///
554/// This function performs two main types of transformations in a single pass:
555/// 1. Binary types to string types conversion - Converts binary data types to their
556///    corresponding string types when the table schema expects string data
557/// 2. Regular to view types conversion - Converts standard string/binary types to
558///    view types when the table schema uses view types
559///
560/// # Arguments
561/// * `table_schema` - The table schema containing the desired types
562/// * `file_schema` - The file schema to be transformed
563///
564/// # Returns
565/// * `Some(Schema)` - If any transformations were applied, returns the transformed schema
566/// * `None` - If no transformations were needed
567pub fn apply_file_schema_type_coercions(
568    table_schema: &Schema,
569    file_schema: &Schema,
570) -> Option<Schema> {
571    let mut needs_view_transform = false;
572    let mut needs_string_transform = false;
573
574    // Create a mapping of table field names to their data types for fast lookup
575    // and simultaneously check if we need any transformations
576    let table_fields: HashMap<_, _> = table_schema
577        .fields()
578        .iter()
579        .map(|f| {
580            let dt = f.data_type();
581            // Check if we need view type transformation
582            if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
583                needs_view_transform = true;
584            }
585            // Check if we need string type transformation
586            if matches!(
587                dt,
588                &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
589            ) {
590                needs_string_transform = true;
591            }
592
593            (f.name(), dt)
594        })
595        .collect();
596
597    // Early return if no transformation needed
598    if !needs_view_transform && !needs_string_transform {
599        return None;
600    }
601
602    let transformed_fields: Vec<Arc<Field>> = file_schema
603        .fields()
604        .iter()
605        .map(|field| {
606            let field_name = field.name();
607            let field_type = field.data_type();
608
609            // Look up the corresponding field type in the table schema
610            if let Some(table_type) = table_fields.get(field_name) {
611                match (table_type, field_type) {
612                    // table schema uses string type, coerce the file schema to use string type
613                    (
614                        &DataType::Utf8,
615                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
616                    ) => {
617                        return field_with_new_type(field, DataType::Utf8);
618                    }
619                    // table schema uses large string type, coerce the file schema to use large string type
620                    (
621                        &DataType::LargeUtf8,
622                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
623                    ) => {
624                        return field_with_new_type(field, DataType::LargeUtf8);
625                    }
626                    // table schema uses string view type, coerce the file schema to use view type
627                    (
628                        &DataType::Utf8View,
629                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
630                    ) => {
631                        return field_with_new_type(field, DataType::Utf8View);
632                    }
633                    // Handle view type conversions
634                    (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
635                        return field_with_new_type(field, DataType::Utf8View);
636                    }
637                    (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
638                        return field_with_new_type(field, DataType::BinaryView);
639                    }
640                    _ => {}
641                }
642            }
643
644            // If no transformation is needed, keep the original field
645            Arc::clone(field)
646        })
647        .collect();
648
649    Some(Schema::new_with_metadata(
650        transformed_fields,
651        file_schema.metadata.clone(),
652    ))
653}
654
655/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96.
656pub fn coerce_int96_to_resolution(
657    parquet_schema: &SchemaDescriptor,
658    file_schema: &Schema,
659    time_unit: &TimeUnit,
660) -> Option<Schema> {
661    // Traverse the parquet_schema columns looking for int96 physical types. If encountered, insert
662    // the field's full path into a set.
663    let int96_fields: HashSet<_> = parquet_schema
664        .columns()
665        .iter()
666        .filter(|f| f.physical_type() == Type::INT96)
667        .map(|f| f.path().string())
668        .collect();
669
670    if int96_fields.is_empty() {
671        // The schema doesn't contain any int96 fields, so skip the remaining logic.
672        return None;
673    }
674
675    // Do a DFS into the schema using a stack, looking for timestamp(nanos) fields that originated
676    // as int96 to coerce to the provided time_unit.
677
678    type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
679    type StackContext<'a> = (
680        Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1") for the current field.
681        &'a FieldRef, // The current field to be processed.
682        NestedFields, // The parent's fields that this field will be (possibly) type-coerced and
683        // inserted into. All fields have a parent, so this is not an Option type.
684        Option<NestedFields>, // Nested types need to create their own vector of fields for their
685                              // children. For primitive types this will remain None. For nested
686                              // types it is None the first time they are processed. Then, we
687                              // instantiate a vector for its children, push the field back onto the
688                              // stack to be processed again, and DFS into its children. The next
689                              // time we process the field, we know we have DFS'd into the children
690                              // because this field is Some.
691    );
692
693    // This is our top-level fields from which we will construct our schema. We pass this into our
694    // initial stack context as the parent fields, and the DFS populates it.
695    let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
696
697    // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we
698    // use some sort of LPM data structure to check if we're currently DFS'ing nested types that are
699    // in a column path that contains an int96. That can be a future optimization for large schemas.
700    let transformed_schema = {
701        // Populate the stack with our top-level fields.
702        let mut stack: Vec<StackContext> = file_schema
703            .fields()
704            .iter()
705            .rev()
706            .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
707            .collect();
708
709        // Pop fields to DFS into until we have exhausted the stack.
710        while let Some((parquet_path, current_field, parent_fields, child_fields)) =
711            stack.pop()
712        {
713            match (current_field.data_type(), child_fields) {
714                (DataType::Struct(unprocessed_children), None) => {
715                    // This is the first time popping off this struct. We don't yet know the
716                    // correct types of its children (i.e., if they need coercing) so we create
717                    // a vector for child_fields, push the struct node back onto the stack to be
718                    // processed again (see below) after processing all its children.
719                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
720                        unprocessed_children.len(),
721                    )));
722                    // Note that here we push the struct back onto the stack with its
723                    // parent_fields in the same position, now with Some(child_fields).
724                    stack.push((
725                        parquet_path.clone(),
726                        current_field,
727                        parent_fields,
728                        Some(Rc::clone(&child_fields)),
729                    ));
730                    // Push all the children in reverse to maintain original schema order due to
731                    // stack processing.
732                    for child in unprocessed_children.into_iter().rev() {
733                        let mut child_path = parquet_path.clone();
734                        // Build up a normalized path that we'll use as a key into the original
735                        // int96_fields set above to test if this originated as int96.
736                        child_path.push(".");
737                        child_path.push(child.name());
738                        // Note that here we push the field onto the stack using the struct's
739                        // new child_fields vector as the field's parent_fields.
740                        stack.push((child_path, child, Rc::clone(&child_fields), None));
741                    }
742                }
743                (DataType::Struct(unprocessed_children), Some(processed_children)) => {
744                    // This is the second time popping off this struct. The child_fields vector
745                    // now contains each field that has been DFS'd into, and we can construct
746                    // the resulting struct with correct child types.
747                    let processed_children = processed_children.borrow();
748                    assert_eq!(processed_children.len(), unprocessed_children.len());
749                    let processed_struct = Field::new_struct(
750                        current_field.name(),
751                        processed_children.as_slice(),
752                        current_field.is_nullable(),
753                    );
754                    parent_fields.borrow_mut().push(Arc::new(processed_struct));
755                }
756                (DataType::List(unprocessed_child), None) => {
757                    // This is the first time popping off this list. See struct docs above.
758                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
759                    stack.push((
760                        parquet_path.clone(),
761                        current_field,
762                        parent_fields,
763                        Some(Rc::clone(&child_fields)),
764                    ));
765                    let mut child_path = parquet_path.clone();
766                    // Spark uses a definition for arrays/lists that results in a group
767                    // named "list" that is not maintained when parsing to Arrow. We just push
768                    // this name into the path.
769                    child_path.push(".list.");
770                    child_path.push(unprocessed_child.name());
771                    stack.push((
772                        child_path.clone(),
773                        unprocessed_child,
774                        Rc::clone(&child_fields),
775                        None,
776                    ));
777                }
778                (DataType::List(_), Some(processed_children)) => {
779                    // This is the second time popping off this list. See struct docs above.
780                    let processed_children = processed_children.borrow();
781                    assert_eq!(processed_children.len(), 1);
782                    let processed_list = Field::new_list(
783                        current_field.name(),
784                        Arc::clone(&processed_children[0]),
785                        current_field.is_nullable(),
786                    );
787                    parent_fields.borrow_mut().push(Arc::new(processed_list));
788                }
789                (DataType::Map(unprocessed_child, _), None) => {
790                    // This is the first time popping off this map. See struct docs above.
791                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
792                    stack.push((
793                        parquet_path.clone(),
794                        current_field,
795                        parent_fields,
796                        Some(Rc::clone(&child_fields)),
797                    ));
798                    let mut child_path = parquet_path.clone();
799                    child_path.push(".");
800                    child_path.push(unprocessed_child.name());
801                    stack.push((
802                        child_path.clone(),
803                        unprocessed_child,
804                        Rc::clone(&child_fields),
805                        None,
806                    ));
807                }
808                (DataType::Map(_, sorted), Some(processed_children)) => {
809                    // This is the second time popping off this map. See struct docs above.
810                    let processed_children = processed_children.borrow();
811                    assert_eq!(processed_children.len(), 1);
812                    let processed_map = Field::new(
813                        current_field.name(),
814                        DataType::Map(Arc::clone(&processed_children[0]), *sorted),
815                        current_field.is_nullable(),
816                    );
817                    parent_fields.borrow_mut().push(Arc::new(processed_map));
818                }
819                (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
820                    if int96_fields.contains(parquet_path.concat().as_str()) =>
821                // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct
822                // time_unit.
823                {
824                    parent_fields.borrow_mut().push(field_with_new_type(
825                        current_field,
826                        DataType::Timestamp(*time_unit, None),
827                    ));
828                }
829                // Other types can be cloned as they are.
830                _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
831            }
832        }
833        assert_eq!(fields.borrow().len(), file_schema.fields.len());
834        Schema::new_with_metadata(
835            fields.borrow_mut().clone(),
836            file_schema.metadata.clone(),
837        )
838    };
839
840    Some(transformed_schema)
841}
842
843/// Coerces the file schema if the table schema uses a view type.
844#[deprecated(
845    since = "47.0.0",
846    note = "Use `apply_file_schema_type_coercions` instead"
847)]
848pub fn coerce_file_schema_to_view_type(
849    table_schema: &Schema,
850    file_schema: &Schema,
851) -> Option<Schema> {
852    let mut transform = false;
853    let table_fields: HashMap<_, _> = table_schema
854        .fields
855        .iter()
856        .map(|f| {
857            let dt = f.data_type();
858            if dt.equals_datatype(&DataType::Utf8View)
859                || dt.equals_datatype(&DataType::BinaryView)
860            {
861                transform = true;
862            }
863            (f.name(), dt)
864        })
865        .collect();
866
867    if !transform {
868        return None;
869    }
870
871    let transformed_fields: Vec<Arc<Field>> = file_schema
872        .fields
873        .iter()
874        .map(
875            |field| match (table_fields.get(field.name()), field.data_type()) {
876                (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
877                    field_with_new_type(field, DataType::Utf8View)
878                }
879                (
880                    Some(DataType::BinaryView),
881                    DataType::Binary | DataType::LargeBinary,
882                ) => field_with_new_type(field, DataType::BinaryView),
883                _ => Arc::clone(field),
884            },
885        )
886        .collect();
887
888    Some(Schema::new_with_metadata(
889        transformed_fields,
890        file_schema.metadata.clone(),
891    ))
892}
893
894/// If the table schema uses a string type, coerce the file schema to use a string type.
895///
896/// See [ParquetFormat::binary_as_string] for details
897#[deprecated(
898    since = "47.0.0",
899    note = "Use `apply_file_schema_type_coercions` instead"
900)]
901pub fn coerce_file_schema_to_string_type(
902    table_schema: &Schema,
903    file_schema: &Schema,
904) -> Option<Schema> {
905    let mut transform = false;
906    let table_fields: HashMap<_, _> = table_schema
907        .fields
908        .iter()
909        .map(|f| (f.name(), f.data_type()))
910        .collect();
911    let transformed_fields: Vec<Arc<Field>> = file_schema
912        .fields
913        .iter()
914        .map(
915            |field| match (table_fields.get(field.name()), field.data_type()) {
916                // table schema uses string type, coerce the file schema to use string type
917                (
918                    Some(DataType::Utf8),
919                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
920                ) => {
921                    transform = true;
922                    field_with_new_type(field, DataType::Utf8)
923                }
924                // table schema uses large string type, coerce the file schema to use large string type
925                (
926                    Some(DataType::LargeUtf8),
927                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
928                ) => {
929                    transform = true;
930                    field_with_new_type(field, DataType::LargeUtf8)
931                }
932                // table schema uses string view type, coerce the file schema to use view type
933                (
934                    Some(DataType::Utf8View),
935                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
936                ) => {
937                    transform = true;
938                    field_with_new_type(field, DataType::Utf8View)
939                }
940                _ => Arc::clone(field),
941            },
942        )
943        .collect();
944
945    if !transform {
946        None
947    } else {
948        Some(Schema::new_with_metadata(
949            transformed_fields,
950            file_schema.metadata.clone(),
951        ))
952    }
953}
954
955/// Create a new field with the specified data type, copying the other
956/// properties from the input field
957fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
958    Arc::new(field.as_ref().clone().with_data_type(new_type))
959}
960
961/// Transform a schema to use view types for Utf8 and Binary
962///
963/// See [ParquetFormat::force_view_types] for details
964pub fn transform_schema_to_view(schema: &Schema) -> Schema {
965    let transformed_fields: Vec<Arc<Field>> = schema
966        .fields
967        .iter()
968        .map(|field| match field.data_type() {
969            DataType::Utf8 | DataType::LargeUtf8 => {
970                field_with_new_type(field, DataType::Utf8View)
971            }
972            DataType::Binary | DataType::LargeBinary => {
973                field_with_new_type(field, DataType::BinaryView)
974            }
975            _ => Arc::clone(field),
976        })
977        .collect();
978    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
979}
980
981/// Transform a schema so that any binary types are strings
982pub fn transform_binary_to_string(schema: &Schema) -> Schema {
983    let transformed_fields: Vec<Arc<Field>> = schema
984        .fields
985        .iter()
986        .map(|field| match field.data_type() {
987            DataType::Binary => field_with_new_type(field, DataType::Utf8),
988            DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
989            DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
990            _ => Arc::clone(field),
991        })
992        .collect();
993    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
994}
995
996/// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`]
997pub struct ObjectStoreFetch<'a> {
998    store: &'a dyn ObjectStore,
999    meta: &'a ObjectMeta,
1000}
1001
1002impl<'a> ObjectStoreFetch<'a> {
1003    pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
1004        Self { store, meta }
1005    }
1006}
1007
1008impl MetadataFetch for ObjectStoreFetch<'_> {
1009    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
1010        async {
1011            self.store
1012                .get_range(&self.meta.location, range)
1013                .await
1014                .map_err(ParquetError::from)
1015        }
1016        .boxed()
1017    }
1018}
1019
1020/// Fetches parquet metadata from ObjectStore for given object
1021///
1022/// This component is a subject to **change** in near future and is exposed for low level integrations
1023/// through [`ParquetFileReaderFactory`].
1024///
1025/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory
1026#[deprecated(
1027    since = "50.0.0",
1028    note = "Use `DFParquetMetadata::fetch_metadata` instead"
1029)]
1030pub async fn fetch_parquet_metadata(
1031    store: &dyn ObjectStore,
1032    object_meta: &ObjectMeta,
1033    size_hint: Option<usize>,
1034    decryption_properties: Option<&FileDecryptionProperties>,
1035    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1036) -> Result<Arc<ParquetMetaData>> {
1037    let decryption_properties = decryption_properties.cloned().map(Arc::new);
1038    DFParquetMetadata::new(store, object_meta)
1039        .with_metadata_size_hint(size_hint)
1040        .with_decryption_properties(decryption_properties)
1041        .with_file_metadata_cache(file_metadata_cache)
1042        .fetch_metadata()
1043        .await
1044}
1045
1046/// Read and parse the statistics of the Parquet file at location `path`
1047///
1048/// See [`statistics_from_parquet_meta_calc`] for more details
1049#[deprecated(
1050    since = "50.0.0",
1051    note = "Use `DFParquetMetadata::fetch_statistics` instead"
1052)]
1053pub async fn fetch_statistics(
1054    store: &dyn ObjectStore,
1055    table_schema: SchemaRef,
1056    file: &ObjectMeta,
1057    metadata_size_hint: Option<usize>,
1058    decryption_properties: Option<&FileDecryptionProperties>,
1059    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1060) -> Result<Statistics> {
1061    let decryption_properties = decryption_properties.cloned().map(Arc::new);
1062    DFParquetMetadata::new(store, file)
1063        .with_metadata_size_hint(metadata_size_hint)
1064        .with_decryption_properties(decryption_properties)
1065        .with_file_metadata_cache(file_metadata_cache)
1066        .fetch_statistics(&table_schema)
1067        .await
1068}
1069
1070#[deprecated(
1071    since = "50.0.0",
1072    note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
1073)]
1074#[expect(clippy::needless_pass_by_value)]
1075pub fn statistics_from_parquet_meta_calc(
1076    metadata: &ParquetMetaData,
1077    table_schema: SchemaRef,
1078) -> Result<Statistics> {
1079    DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
1080}
1081
1082/// Implements [`DataSink`] for writing to a parquet file.
1083pub struct ParquetSink {
1084    /// Config options for writing data
1085    config: FileSinkConfig,
1086    /// Underlying parquet options
1087    parquet_options: TableParquetOptions,
1088    /// File metadata from successfully produced parquet files. The Mutex is only used
1089    /// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
1090    written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
1091}
1092
1093impl Debug for ParquetSink {
1094    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1095        f.debug_struct("ParquetSink").finish()
1096    }
1097}
1098
1099impl DisplayAs for ParquetSink {
1100    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1101        match t {
1102            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1103                write!(f, "ParquetSink(file_groups=",)?;
1104                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1105                write!(f, ")")
1106            }
1107            DisplayFormatType::TreeRender => {
1108                // TODO: collect info
1109                write!(f, "")
1110            }
1111        }
1112    }
1113}
1114
1115impl ParquetSink {
1116    /// Create from config.
1117    pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1118        Self {
1119            config,
1120            parquet_options,
1121            written: Default::default(),
1122        }
1123    }
1124
1125    /// Retrieve the file metadata for the written files, keyed to the path
1126    /// which may be partitioned (in the case of hive style partitioning).
1127    pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
1128        self.written.lock().clone()
1129    }
1130
1131    /// Create writer properties based upon configuration settings,
1132    /// including partitioning and the inclusion of arrow schema metadata.
1133    async fn create_writer_props(
1134        &self,
1135        runtime: &Arc<RuntimeEnv>,
1136        path: &Path,
1137    ) -> Result<WriterProperties> {
1138        let schema = self.config.output_schema();
1139
1140        // TODO: avoid this clone in follow up PR, where the writer properties & schema
1141        // are calculated once on `ParquetSink::new`
1142        let mut parquet_opts = self.parquet_options.clone();
1143        if !self.parquet_options.global.skip_arrow_metadata {
1144            parquet_opts.arrow_schema(schema);
1145        }
1146
1147        let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1148        builder = set_writer_encryption_properties(
1149            builder,
1150            runtime,
1151            parquet_opts,
1152            schema,
1153            path,
1154        )
1155        .await?;
1156        Ok(builder.build())
1157    }
1158
1159    /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
1160    /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
1161    async fn create_async_arrow_writer(
1162        &self,
1163        location: &Path,
1164        object_store: Arc<dyn ObjectStore>,
1165        context: &Arc<TaskContext>,
1166        parquet_props: WriterProperties,
1167    ) -> Result<AsyncArrowWriter<BufWriter>> {
1168        let buf_writer = BufWriter::with_capacity(
1169            object_store,
1170            location.clone(),
1171            context
1172                .session_config()
1173                .options()
1174                .execution
1175                .objectstore_writer_buffer_size,
1176        );
1177        let options = ArrowWriterOptions::new()
1178            .with_properties(parquet_props)
1179            .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1180
1181        let writer = AsyncArrowWriter::try_new_with_options(
1182            buf_writer,
1183            get_writer_schema(&self.config),
1184            options,
1185        )?;
1186        Ok(writer)
1187    }
1188
1189    /// Parquet options
1190    pub fn parquet_options(&self) -> &TableParquetOptions {
1191        &self.parquet_options
1192    }
1193}
1194
1195#[cfg(feature = "parquet_encryption")]
1196async fn set_writer_encryption_properties(
1197    builder: WriterPropertiesBuilder,
1198    runtime: &Arc<RuntimeEnv>,
1199    parquet_opts: TableParquetOptions,
1200    schema: &Arc<Schema>,
1201    path: &Path,
1202) -> Result<WriterPropertiesBuilder> {
1203    if let Some(file_encryption_properties) = parquet_opts.crypto.file_encryption {
1204        // Encryption properties have been specified directly
1205        return Ok(builder.with_file_encryption_properties(Arc::new(
1206            FileEncryptionProperties::from(file_encryption_properties),
1207        )));
1208    } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
1209        // Encryption properties will be generated by an encryption factory
1210        let encryption_factory =
1211            runtime.parquet_encryption_factory(encryption_factory_id)?;
1212        let file_encryption_properties = encryption_factory
1213            .get_file_encryption_properties(
1214                &parquet_opts.crypto.factory_options,
1215                schema,
1216                path,
1217            )
1218            .await?;
1219        if let Some(file_encryption_properties) = file_encryption_properties {
1220            return Ok(
1221                builder.with_file_encryption_properties(file_encryption_properties)
1222            );
1223        }
1224    }
1225    Ok(builder)
1226}
1227
1228#[cfg(not(feature = "parquet_encryption"))]
1229async fn set_writer_encryption_properties(
1230    builder: WriterPropertiesBuilder,
1231    _runtime: &Arc<RuntimeEnv>,
1232    _parquet_opts: TableParquetOptions,
1233    _schema: &Arc<Schema>,
1234    _path: &Path,
1235) -> Result<WriterPropertiesBuilder> {
1236    Ok(builder)
1237}
1238
1239#[async_trait]
1240impl FileSink for ParquetSink {
1241    fn config(&self) -> &FileSinkConfig {
1242        &self.config
1243    }
1244
1245    async fn spawn_writer_tasks_and_join(
1246        &self,
1247        context: &Arc<TaskContext>,
1248        demux_task: SpawnedTask<Result<()>>,
1249        mut file_stream_rx: DemuxedStreamReceiver,
1250        object_store: Arc<dyn ObjectStore>,
1251    ) -> Result<u64> {
1252        let parquet_opts = &self.parquet_options;
1253
1254        let mut file_write_tasks: JoinSet<
1255            std::result::Result<(Path, ParquetMetaData), DataFusionError>,
1256        > = JoinSet::new();
1257
1258        let runtime = context.runtime_env();
1259        let parallel_options = ParallelParquetWriterOptions {
1260            max_parallel_row_groups: parquet_opts
1261                .global
1262                .maximum_parallel_row_group_writers,
1263            max_buffered_record_batches_per_stream: parquet_opts
1264                .global
1265                .maximum_buffered_record_batches_per_stream,
1266        };
1267
1268        while let Some((path, mut rx)) = file_stream_rx.recv().await {
1269            let parquet_props = self.create_writer_props(&runtime, &path).await?;
1270            if !parquet_opts.global.allow_single_file_parallelism {
1271                let mut writer = self
1272                    .create_async_arrow_writer(
1273                        &path,
1274                        Arc::clone(&object_store),
1275                        context,
1276                        parquet_props.clone(),
1277                    )
1278                    .await?;
1279                let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
1280                    .register(context.memory_pool());
1281                file_write_tasks.spawn(async move {
1282                    while let Some(batch) = rx.recv().await {
1283                        writer.write(&batch).await?;
1284                        reservation.try_resize(writer.memory_size())?;
1285                    }
1286                    let parquet_meta_data = writer
1287                        .close()
1288                        .await
1289                        .map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1290                    Ok((path, parquet_meta_data))
1291                });
1292            } else {
1293                let writer = ObjectWriterBuilder::new(
1294                    // Parquet files as a whole are never compressed, since they
1295                    // manage compressed blocks themselves.
1296                    FileCompressionType::UNCOMPRESSED,
1297                    &path,
1298                    Arc::clone(&object_store),
1299                )
1300                .with_buffer_size(Some(
1301                    context
1302                        .session_config()
1303                        .options()
1304                        .execution
1305                        .objectstore_writer_buffer_size,
1306                ))
1307                .build()?;
1308                let schema = get_writer_schema(&self.config);
1309                let props = parquet_props.clone();
1310                let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
1311                let parallel_options_clone = parallel_options.clone();
1312                let pool = Arc::clone(context.memory_pool());
1313                file_write_tasks.spawn(async move {
1314                    let parquet_meta_data = output_single_parquet_file_parallelized(
1315                        writer,
1316                        rx,
1317                        schema,
1318                        &props,
1319                        skip_arrow_metadata,
1320                        parallel_options_clone,
1321                        pool,
1322                    )
1323                    .await?;
1324                    Ok((path, parquet_meta_data))
1325                });
1326            }
1327        }
1328
1329        let mut row_count = 0;
1330        while let Some(result) = file_write_tasks.join_next().await {
1331            match result {
1332                Ok(r) => {
1333                    let (path, parquet_meta_data) = r?;
1334                    row_count += parquet_meta_data.file_metadata().num_rows();
1335                    let mut written_files = self.written.lock();
1336                    written_files
1337                        .try_insert(path.clone(), parquet_meta_data)
1338                        .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1339                    drop(written_files);
1340                }
1341                Err(e) => {
1342                    if e.is_panic() {
1343                        std::panic::resume_unwind(e.into_panic());
1344                    } else {
1345                        unreachable!();
1346                    }
1347                }
1348            }
1349        }
1350
1351        demux_task
1352            .join_unwind()
1353            .await
1354            .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1355
1356        Ok(row_count as u64)
1357    }
1358}
1359
1360#[async_trait]
1361impl DataSink for ParquetSink {
1362    fn as_any(&self) -> &dyn Any {
1363        self
1364    }
1365
1366    fn schema(&self) -> &SchemaRef {
1367        self.config.output_schema()
1368    }
1369
1370    async fn write_all(
1371        &self,
1372        data: SendableRecordBatchStream,
1373        context: &Arc<TaskContext>,
1374    ) -> Result<u64> {
1375        FileSink::write_all(self, data, context).await
1376    }
1377}
1378
1379/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them using an [ArrowColumnWriter]
1380/// Once the channel is exhausted, returns the ArrowColumnWriter.
1381async fn column_serializer_task(
1382    mut rx: Receiver<ArrowLeafColumn>,
1383    mut writer: ArrowColumnWriter,
1384    mut reservation: MemoryReservation,
1385) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1386    while let Some(col) = rx.recv().await {
1387        writer.write(&col)?;
1388        reservation.try_resize(writer.memory_size())?;
1389    }
1390    Ok((writer, reservation))
1391}
1392
1393type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1394type ColSender = Sender<ArrowLeafColumn>;
1395
1396/// Spawns a parallel serialization task for each column
1397/// Returns join handles for each columns serialization task along with a send channel
1398/// to send arrow arrays to each serialization task.
1399fn spawn_column_parallel_row_group_writer(
1400    col_writers: Vec<ArrowColumnWriter>,
1401    max_buffer_size: usize,
1402    pool: &Arc<dyn MemoryPool>,
1403) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1404    let num_columns = col_writers.len();
1405
1406    let mut col_writer_tasks = Vec::with_capacity(num_columns);
1407    let mut col_array_channels = Vec::with_capacity(num_columns);
1408    for writer in col_writers.into_iter() {
1409        // Buffer size of this channel limits the number of arrays queued up for column level serialization
1410        let (send_array, receive_array) =
1411            mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1412        col_array_channels.push(send_array);
1413
1414        let reservation =
1415            MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1416        let task = SpawnedTask::spawn(column_serializer_task(
1417            receive_array,
1418            writer,
1419            reservation,
1420        ));
1421        col_writer_tasks.push(task);
1422    }
1423
1424    Ok((col_writer_tasks, col_array_channels))
1425}
1426
1427/// Settings related to writing parquet files in parallel
1428#[derive(Clone)]
1429struct ParallelParquetWriterOptions {
1430    max_parallel_row_groups: usize,
1431    max_buffered_record_batches_per_stream: usize,
1432}
1433
1434/// This is the return type of calling [ArrowColumnWriter].close() on each column
1435/// i.e. the Vec of encoded columns which can be appended to a row group
1436type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1437
1438/// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective
1439/// parallel column serializers.
1440async fn send_arrays_to_col_writers(
1441    col_array_channels: &[ColSender],
1442    rb: &RecordBatch,
1443    schema: Arc<Schema>,
1444) -> Result<()> {
1445    // Each leaf column has its own channel, increment next_channel for each leaf column sent.
1446    let mut next_channel = 0;
1447    for (array, field) in rb.columns().iter().zip(schema.fields()) {
1448        for c in compute_leaves(field, array)? {
1449            // Do not surface error from closed channel (means something
1450            // else hit an error, and the plan is shutting down).
1451            if col_array_channels[next_channel].send(c).await.is_err() {
1452                return Ok(());
1453            }
1454
1455            next_channel += 1;
1456        }
1457    }
1458
1459    Ok(())
1460}
1461
1462/// Spawns a tokio task which joins the parallel column writer tasks,
1463/// and finalizes the row group
1464fn spawn_rg_join_and_finalize_task(
1465    column_writer_tasks: Vec<ColumnWriterTask>,
1466    rg_rows: usize,
1467    pool: &Arc<dyn MemoryPool>,
1468) -> SpawnedTask<RBStreamSerializeResult> {
1469    let mut rg_reservation =
1470        MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1471
1472    SpawnedTask::spawn(async move {
1473        let num_cols = column_writer_tasks.len();
1474        let mut finalized_rg = Vec::with_capacity(num_cols);
1475        for task in column_writer_tasks.into_iter() {
1476            let (writer, _col_reservation) = task
1477                .join_unwind()
1478                .await
1479                .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1480            let encoded_size = writer.get_estimated_total_bytes();
1481            rg_reservation.grow(encoded_size);
1482            finalized_rg.push(writer.close()?);
1483        }
1484
1485        Ok((finalized_rg, rg_reservation, rg_rows))
1486    })
1487}
1488
1489/// This task coordinates the serialization of a parquet file in parallel.
1490/// As the query produces RecordBatches, these are written to a RowGroup
1491/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per
1492/// row group is reached, the parallel tasks are joined on another separate task
1493/// and sent to a concatenation task. This task immediately continues to work
1494/// on the next row group in parallel. So, parquet serialization is parallelized
1495/// across both columns and row_groups, with a theoretical max number of parallel tasks
1496/// given by n_columns * num_row_groups.
1497fn spawn_parquet_parallel_serialization_task(
1498    row_group_writer_factory: ArrowRowGroupWriterFactory,
1499    mut data: Receiver<RecordBatch>,
1500    serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1501    schema: Arc<Schema>,
1502    writer_props: Arc<WriterProperties>,
1503    parallel_options: Arc<ParallelParquetWriterOptions>,
1504    pool: Arc<dyn MemoryPool>,
1505) -> SpawnedTask<Result<(), DataFusionError>> {
1506    SpawnedTask::spawn(async move {
1507        let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1508        let max_row_group_rows = writer_props.max_row_group_size();
1509        let mut row_group_index = 0;
1510        let col_writers =
1511            row_group_writer_factory.create_column_writers(row_group_index)?;
1512        let (mut column_writer_handles, mut col_array_channels) =
1513            spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
1514        let mut current_rg_rows = 0;
1515
1516        while let Some(mut rb) = data.recv().await {
1517            // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
1518            // when max_row_group_rows < execution.batch_size as an alternative to a recursive async
1519            // function.
1520            loop {
1521                if current_rg_rows + rb.num_rows() < max_row_group_rows {
1522                    send_arrays_to_col_writers(
1523                        &col_array_channels,
1524                        &rb,
1525                        Arc::clone(&schema),
1526                    )
1527                    .await?;
1528                    current_rg_rows += rb.num_rows();
1529                    break;
1530                } else {
1531                    let rows_left = max_row_group_rows - current_rg_rows;
1532                    let a = rb.slice(0, rows_left);
1533                    send_arrays_to_col_writers(
1534                        &col_array_channels,
1535                        &a,
1536                        Arc::clone(&schema),
1537                    )
1538                    .await?;
1539
1540                    // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
1541                    // on a separate task, so that we can immediately start on the next RG before waiting
1542                    // for the current one to finish.
1543                    drop(col_array_channels);
1544                    let finalize_rg_task = spawn_rg_join_and_finalize_task(
1545                        column_writer_handles,
1546                        max_row_group_rows,
1547                        &pool,
1548                    );
1549
1550                    // Do not surface error from closed channel (means something
1551                    // else hit an error, and the plan is shutting down).
1552                    if serialize_tx.send(finalize_rg_task).await.is_err() {
1553                        return Ok(());
1554                    }
1555
1556                    current_rg_rows = 0;
1557                    rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1558
1559                    row_group_index += 1;
1560                    let col_writers = row_group_writer_factory
1561                        .create_column_writers(row_group_index)?;
1562                    (column_writer_handles, col_array_channels) =
1563                        spawn_column_parallel_row_group_writer(
1564                            col_writers,
1565                            max_buffer_rb,
1566                            &pool,
1567                        )?;
1568                }
1569            }
1570        }
1571
1572        drop(col_array_channels);
1573        // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows
1574        if current_rg_rows > 0 {
1575            let finalize_rg_task = spawn_rg_join_and_finalize_task(
1576                column_writer_handles,
1577                current_rg_rows,
1578                &pool,
1579            );
1580
1581            // Do not surface error from closed channel (means something
1582            // else hit an error, and the plan is shutting down).
1583            if serialize_tx.send(finalize_rg_task).await.is_err() {
1584                return Ok(());
1585            }
1586        }
1587
1588        Ok(())
1589    })
1590}
1591
1592/// Consume RowGroups serialized by other parallel tasks and concatenate them in
1593/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
1594async fn concatenate_parallel_row_groups(
1595    mut parquet_writer: SerializedFileWriter<SharedBuffer>,
1596    merged_buff: SharedBuffer,
1597    mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1598    mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1599    pool: Arc<dyn MemoryPool>,
1600) -> Result<ParquetMetaData> {
1601    let mut file_reservation =
1602        MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1603
1604    while let Some(task) = serialize_rx.recv().await {
1605        let result = task.join_unwind().await;
1606        let (serialized_columns, mut rg_reservation, _cnt) =
1607            result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1608
1609        let mut rg_out = parquet_writer.next_row_group()?;
1610        for chunk in serialized_columns {
1611            chunk.append_to_row_group(&mut rg_out)?;
1612            rg_reservation.free();
1613
1614            let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1615            file_reservation.try_resize(buff_to_flush.len())?;
1616
1617            if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1618                object_store_writer
1619                    .write_all(buff_to_flush.as_slice())
1620                    .await?;
1621                buff_to_flush.clear();
1622                file_reservation.try_resize(buff_to_flush.len())?; // will set to zero
1623            }
1624        }
1625        rg_out.close()?;
1626    }
1627
1628    let parquet_meta_data = parquet_writer.close()?;
1629    let final_buff = merged_buff.buffer.try_lock().unwrap();
1630
1631    object_store_writer.write_all(final_buff.as_slice()).await?;
1632    object_store_writer.shutdown().await?;
1633    file_reservation.free();
1634
1635    Ok(parquet_meta_data)
1636}
1637
1638/// Parallelizes the serialization of a single parquet file, by first serializing N
1639/// independent RecordBatch streams in parallel to RowGroups in memory. Another
1640/// task then stitches these independent RowGroups together and streams this large
1641/// single parquet file to an ObjectStore in multiple parts.
1642async fn output_single_parquet_file_parallelized(
1643    object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1644    data: Receiver<RecordBatch>,
1645    output_schema: Arc<Schema>,
1646    parquet_props: &WriterProperties,
1647    skip_arrow_metadata: bool,
1648    parallel_options: ParallelParquetWriterOptions,
1649    pool: Arc<dyn MemoryPool>,
1650) -> Result<ParquetMetaData> {
1651    let max_rowgroups = parallel_options.max_parallel_row_groups;
1652    // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
1653    let (serialize_tx, serialize_rx) =
1654        mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1655
1656    let arc_props = Arc::new(parquet_props.clone());
1657    let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1658    let options = ArrowWriterOptions::new()
1659        .with_properties(parquet_props.clone())
1660        .with_skip_arrow_metadata(skip_arrow_metadata);
1661    let writer = ArrowWriter::try_new_with_options(
1662        merged_buff.clone(),
1663        Arc::clone(&output_schema),
1664        options,
1665    )?;
1666    let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
1667
1668    let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1669        row_group_writer_factory,
1670        data,
1671        serialize_tx,
1672        Arc::clone(&output_schema),
1673        Arc::clone(&arc_props),
1674        parallel_options.into(),
1675        Arc::clone(&pool),
1676    );
1677    let parquet_meta_data = concatenate_parallel_row_groups(
1678        writer,
1679        merged_buff,
1680        serialize_rx,
1681        object_store_writer,
1682        pool,
1683    )
1684    .await?;
1685
1686    launch_serialization_task
1687        .join_unwind()
1688        .await
1689        .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1690    Ok(parquet_meta_data)
1691}
1692
1693#[cfg(test)]
1694mod tests {
1695    use parquet::arrow::parquet_to_arrow_schema;
1696    use std::sync::Arc;
1697
1698    use super::*;
1699
1700    use arrow::datatypes::DataType;
1701    use parquet::schema::parser::parse_message_type;
1702
1703    #[test]
1704    fn coerce_int96_to_resolution_with_mixed_timestamps() {
1705        // Unclear if Spark (or other writer) could generate a file with mixed timestamps like this,
1706        // but we want to test the scenario just in case since it's at least a valid schema as far
1707        // as the Parquet spec is concerned.
1708        let spark_schema = "
1709        message spark_schema {
1710          optional int96 c0;
1711          optional int64 c1 (TIMESTAMP(NANOS,true));
1712          optional int64 c2 (TIMESTAMP(NANOS,false));
1713          optional int64 c3 (TIMESTAMP(MILLIS,true));
1714          optional int64 c4 (TIMESTAMP(MILLIS,false));
1715          optional int64 c5 (TIMESTAMP(MICROS,true));
1716          optional int64 c6 (TIMESTAMP(MICROS,false));
1717        }
1718        ";
1719
1720        let schema = parse_message_type(spark_schema).expect("should parse schema");
1721        let descr = SchemaDescriptor::new(Arc::new(schema));
1722
1723        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1724
1725        let result =
1726            coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1727                .unwrap();
1728
1729        // Only the first field (c0) should be converted to a microsecond timestamp because it's the
1730        // only timestamp that originated from an INT96.
1731        let expected_schema = Schema::new(vec![
1732            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1733            Field::new(
1734                "c1",
1735                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1736                true,
1737            ),
1738            Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1739            Field::new(
1740                "c3",
1741                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1742                true,
1743            ),
1744            Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1745            Field::new(
1746                "c5",
1747                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1748                true,
1749            ),
1750            Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1751        ]);
1752
1753        assert_eq!(result, expected_schema);
1754    }
1755
1756    #[test]
1757    fn coerce_int96_to_resolution_with_nested_types() {
1758        // This schema is derived from Comet's CometFuzzTestSuite ParquetGenerator only using int96
1759        // primitive types with generateStruct, generateArray, and generateMap set to true, with one
1760        // additional field added to c4's struct to make sure all fields in a struct get modified.
1761        // https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala
1762        let spark_schema = "
1763        message spark_schema {
1764          optional int96 c0;
1765          optional group c1 {
1766            optional int96 c0;
1767          }
1768          optional group c2 {
1769            optional group c0 (LIST) {
1770              repeated group list {
1771                optional int96 element;
1772              }
1773            }
1774          }
1775          optional group c3 (LIST) {
1776            repeated group list {
1777              optional int96 element;
1778            }
1779          }
1780          optional group c4 (LIST) {
1781            repeated group list {
1782              optional group element {
1783                optional int96 c0;
1784                optional int96 c1;
1785              }
1786            }
1787          }
1788          optional group c5 (MAP) {
1789            repeated group key_value {
1790              required int96 key;
1791              optional int96 value;
1792            }
1793          }
1794          optional group c6 (LIST) {
1795            repeated group list {
1796              optional group element (MAP) {
1797                repeated group key_value {
1798                  required int96 key;
1799                  optional int96 value;
1800                }
1801              }
1802            }
1803          }
1804        }
1805        ";
1806
1807        let schema = parse_message_type(spark_schema).expect("should parse schema");
1808        let descr = SchemaDescriptor::new(Arc::new(schema));
1809
1810        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1811
1812        let result =
1813            coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1814                .unwrap();
1815
1816        let expected_schema = Schema::new(vec![
1817            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1818            Field::new_struct(
1819                "c1",
1820                vec![Field::new(
1821                    "c0",
1822                    DataType::Timestamp(TimeUnit::Microsecond, None),
1823                    true,
1824                )],
1825                true,
1826            ),
1827            Field::new_struct(
1828                "c2",
1829                vec![Field::new_list(
1830                    "c0",
1831                    Field::new(
1832                        "element",
1833                        DataType::Timestamp(TimeUnit::Microsecond, None),
1834                        true,
1835                    ),
1836                    true,
1837                )],
1838                true,
1839            ),
1840            Field::new_list(
1841                "c3",
1842                Field::new(
1843                    "element",
1844                    DataType::Timestamp(TimeUnit::Microsecond, None),
1845                    true,
1846                ),
1847                true,
1848            ),
1849            Field::new_list(
1850                "c4",
1851                Field::new_struct(
1852                    "element",
1853                    vec![
1854                        Field::new(
1855                            "c0",
1856                            DataType::Timestamp(TimeUnit::Microsecond, None),
1857                            true,
1858                        ),
1859                        Field::new(
1860                            "c1",
1861                            DataType::Timestamp(TimeUnit::Microsecond, None),
1862                            true,
1863                        ),
1864                    ],
1865                    true,
1866                ),
1867                true,
1868            ),
1869            Field::new_map(
1870                "c5",
1871                "key_value",
1872                Field::new(
1873                    "key",
1874                    DataType::Timestamp(TimeUnit::Microsecond, None),
1875                    false,
1876                ),
1877                Field::new(
1878                    "value",
1879                    DataType::Timestamp(TimeUnit::Microsecond, None),
1880                    true,
1881                ),
1882                false,
1883                true,
1884            ),
1885            Field::new_list(
1886                "c6",
1887                Field::new_map(
1888                    "element",
1889                    "key_value",
1890                    Field::new(
1891                        "key",
1892                        DataType::Timestamp(TimeUnit::Microsecond, None),
1893                        false,
1894                    ),
1895                    Field::new(
1896                        "value",
1897                        DataType::Timestamp(TimeUnit::Microsecond, None),
1898                        true,
1899                    ),
1900                    false,
1901                    true,
1902                ),
1903                true,
1904            ),
1905        ]);
1906
1907        assert_eq!(result, expected_schema);
1908    }
1909}