datafusion_datasource_parquet/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::cell::RefCell;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{fmt, vec};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
32use datafusion_datasource::write::{
33    get_writer_schema, ObjectWriterBuilder, SharedBuffer,
34};
35
36use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
37use datafusion_datasource::write::demux::DemuxedStreamReceiver;
38
39use arrow::datatypes::{DataType, Field, FieldRef};
40use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
41#[cfg(feature = "parquet_encryption")]
42use datafusion_common::encryption::map_config_decryption_to_decryption;
43use datafusion_common::encryption::FileDecryptionProperties;
44use datafusion_common::parsers::CompressionTypeVariant;
45use datafusion_common::{
46    internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt,
47    HashSet, Result, DEFAULT_PARQUET_EXTENSION,
48};
49use datafusion_common::{HashMap, Statistics};
50use datafusion_common_runtime::{JoinSet, SpawnedTask};
51use datafusion_datasource::display::FileGroupDisplay;
52use datafusion_datasource::file::FileSource;
53use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
54use datafusion_datasource::sink::{DataSink, DataSinkExec};
55use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
56use datafusion_execution::{SendableRecordBatchStream, TaskContext};
57use datafusion_expr::dml::InsertOp;
58use datafusion_physical_expr_common::sort_expr::LexRequirement;
59use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
60use datafusion_session::Session;
61
62use crate::reader::CachedParquetFileReaderFactory;
63use crate::source::{parse_coerce_int96_string, ParquetSource};
64use async_trait::async_trait;
65use bytes::Bytes;
66use datafusion_datasource::source::DataSourceExec;
67use datafusion_execution::runtime_env::RuntimeEnv;
68use futures::future::BoxFuture;
69use futures::{FutureExt, StreamExt, TryStreamExt};
70use object_store::buffered::BufWriter;
71use object_store::path::Path;
72use object_store::{ObjectMeta, ObjectStore};
73use parquet::arrow::arrow_writer::{
74    compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
75    ArrowLeafColumn, ArrowWriterOptions,
76};
77use parquet::arrow::async_reader::MetadataFetch;
78use parquet::arrow::{ArrowSchemaConverter, AsyncArrowWriter};
79use parquet::basic::Type;
80
81use crate::metadata::DFParquetMetadata;
82use datafusion_execution::cache::cache_manager::FileMetadataCache;
83use parquet::errors::ParquetError;
84use parquet::file::metadata::ParquetMetaData;
85use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
86use parquet::file::writer::SerializedFileWriter;
87use parquet::format::FileMetaData;
88use parquet::schema::types::SchemaDescriptor;
89use tokio::io::{AsyncWrite, AsyncWriteExt};
90use tokio::sync::mpsc::{self, Receiver, Sender};
91
92/// Initial writing buffer size. Note this is just a size hint for efficiency. It
93/// will grow beyond the set value if needed.
94const INITIAL_BUFFER_BYTES: usize = 1048576;
95
96/// When writing parquet files in parallel, if the buffered Parquet data exceeds
97/// this size, it is flushed to object store
98const BUFFER_FLUSH_BYTES: usize = 1024000;
99
100#[derive(Default)]
101/// Factory struct used to create [ParquetFormat]
102pub struct ParquetFormatFactory {
103    /// inner options for parquet
104    pub options: Option<TableParquetOptions>,
105}
106
107impl ParquetFormatFactory {
108    /// Creates an instance of [ParquetFormatFactory]
109    pub fn new() -> Self {
110        Self { options: None }
111    }
112
113    /// Creates an instance of [ParquetFormatFactory] with customized default options
114    pub fn new_with_options(options: TableParquetOptions) -> Self {
115        Self {
116            options: Some(options),
117        }
118    }
119}
120
121impl FileFormatFactory for ParquetFormatFactory {
122    fn create(
123        &self,
124        state: &dyn Session,
125        format_options: &std::collections::HashMap<String, String>,
126    ) -> Result<Arc<dyn FileFormat>> {
127        let parquet_options = match &self.options {
128            None => {
129                let mut table_options = state.default_table_options();
130                table_options.set_config_format(ConfigFileType::PARQUET);
131                table_options.alter_with_string_hash_map(format_options)?;
132                table_options.parquet
133            }
134            Some(parquet_options) => {
135                let mut parquet_options = parquet_options.clone();
136                for (k, v) in format_options {
137                    parquet_options.set(k, v)?;
138                }
139                parquet_options
140            }
141        };
142
143        Ok(Arc::new(
144            ParquetFormat::default().with_options(parquet_options),
145        ))
146    }
147
148    fn default(&self) -> Arc<dyn FileFormat> {
149        Arc::new(ParquetFormat::default())
150    }
151
152    fn as_any(&self) -> &dyn Any {
153        self
154    }
155}
156
157impl GetExt for ParquetFormatFactory {
158    fn get_ext(&self) -> String {
159        // Removes the dot, i.e. ".parquet" -> "parquet"
160        DEFAULT_PARQUET_EXTENSION[1..].to_string()
161    }
162}
163
164impl Debug for ParquetFormatFactory {
165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166        f.debug_struct("ParquetFormatFactory")
167            .field("ParquetFormatFactory", &self.options)
168            .finish()
169    }
170}
171/// The Apache Parquet `FileFormat` implementation
172#[derive(Debug, Default)]
173pub struct ParquetFormat {
174    options: TableParquetOptions,
175}
176
177impl ParquetFormat {
178    /// Construct a new Format with no local overrides
179    pub fn new() -> Self {
180        Self::default()
181    }
182
183    /// Activate statistics based row group level pruning
184    /// - If `None`, defaults to value on `config_options`
185    pub fn with_enable_pruning(mut self, enable: bool) -> Self {
186        self.options.global.pruning = enable;
187        self
188    }
189
190    /// Return `true` if pruning is enabled
191    pub fn enable_pruning(&self) -> bool {
192        self.options.global.pruning
193    }
194
195    /// Provide a hint to the size of the file metadata. If a hint is provided
196    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
197    /// Without a hint, two read are required. One read to fetch the 8-byte parquet footer and then
198    /// another read to fetch the metadata length encoded in the footer.
199    ///
200    /// - If `None`, defaults to value on `config_options`
201    pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
202        self.options.global.metadata_size_hint = size_hint;
203        self
204    }
205
206    /// Return the metadata size hint if set
207    pub fn metadata_size_hint(&self) -> Option<usize> {
208        self.options.global.metadata_size_hint
209    }
210
211    /// Tell the parquet reader to skip any metadata that may be in
212    /// the file Schema. This can help avoid schema conflicts due to
213    /// metadata.
214    ///
215    /// - If `None`, defaults to value on `config_options`
216    pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
217        self.options.global.skip_metadata = skip_metadata;
218        self
219    }
220
221    /// Returns `true` if schema metadata will be cleared prior to
222    /// schema merging.
223    pub fn skip_metadata(&self) -> bool {
224        self.options.global.skip_metadata
225    }
226
227    /// Set Parquet options for the ParquetFormat
228    pub fn with_options(mut self, options: TableParquetOptions) -> Self {
229        self.options = options;
230        self
231    }
232
233    /// Parquet options
234    pub fn options(&self) -> &TableParquetOptions {
235        &self.options
236    }
237
238    /// Return `true` if should use view types.
239    ///
240    /// If this returns true, DataFusion will instruct the parquet reader
241    /// to read string / binary columns using view `StringView` or `BinaryView`
242    /// if the table schema specifies those types, regardless of any embedded metadata
243    /// that may specify an alternate Arrow type. The parquet reader is optimized
244    /// for reading `StringView` and `BinaryView` and such queries are significantly faster.
245    ///
246    /// If this returns false, the parquet reader will read the columns according to the
247    /// defaults or any embedded Arrow type information. This may result in reading
248    /// `StringArrays` and then casting to `StringViewArray` which is less efficient.
249    pub fn force_view_types(&self) -> bool {
250        self.options.global.schema_force_view_types
251    }
252
253    /// If true, will use view types. See [`Self::force_view_types`] for details
254    pub fn with_force_view_types(mut self, use_views: bool) -> Self {
255        self.options.global.schema_force_view_types = use_views;
256        self
257    }
258
259    /// Return `true` if binary types will be read as strings.
260    ///
261    /// If this returns true, DataFusion will instruct the parquet reader
262    /// to read binary columns such as `Binary` or `BinaryView` as the
263    /// corresponding string type such as `Utf8` or `LargeUtf8`.
264    /// The parquet reader has special optimizations for `Utf8` and `LargeUtf8`
265    /// validation, and such queries are significantly faster than reading
266    /// binary columns and then casting to string columns.
267    pub fn binary_as_string(&self) -> bool {
268        self.options.global.binary_as_string
269    }
270
271    /// If true, will read binary types as strings. See [`Self::binary_as_string`] for details
272    pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
273        self.options.global.binary_as_string = binary_as_string;
274        self
275    }
276
277    pub fn coerce_int96(&self) -> Option<String> {
278        self.options.global.coerce_int96.clone()
279    }
280
281    pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
282        self.options.global.coerce_int96 = time_unit;
283        self
284    }
285}
286
287/// Clears all metadata (Schema level and field level) on an iterator
288/// of Schemas
289fn clear_metadata(
290    schemas: impl IntoIterator<Item = Schema>,
291) -> impl Iterator<Item = Schema> {
292    schemas.into_iter().map(|schema| {
293        let fields = schema
294            .fields()
295            .iter()
296            .map(|field| {
297                field.as_ref().clone().with_metadata(Default::default()) // clear meta
298            })
299            .collect::<Fields>();
300        Schema::new(fields)
301    })
302}
303
304#[cfg(feature = "parquet_encryption")]
305async fn get_file_decryption_properties(
306    state: &dyn Session,
307    options: &TableParquetOptions,
308    file_path: &Path,
309) -> Result<Option<FileDecryptionProperties>> {
310    let file_decryption_properties: Option<FileDecryptionProperties> =
311        match &options.crypto.file_decryption {
312            Some(cfd) => Some(map_config_decryption_to_decryption(cfd)),
313            None => match &options.crypto.factory_id {
314                Some(factory_id) => {
315                    let factory =
316                        state.runtime_env().parquet_encryption_factory(factory_id)?;
317                    factory
318                        .get_file_decryption_properties(
319                            &options.crypto.factory_options,
320                            file_path,
321                        )
322                        .await?
323                }
324                None => None,
325            },
326        };
327    Ok(file_decryption_properties)
328}
329
330#[cfg(not(feature = "parquet_encryption"))]
331async fn get_file_decryption_properties(
332    _state: &dyn Session,
333    _options: &TableParquetOptions,
334    _file_path: &Path,
335) -> Result<Option<FileDecryptionProperties>> {
336    Ok(None)
337}
338
339#[async_trait]
340impl FileFormat for ParquetFormat {
341    fn as_any(&self) -> &dyn Any {
342        self
343    }
344
345    fn get_ext(&self) -> String {
346        ParquetFormatFactory::new().get_ext()
347    }
348
349    fn get_ext_with_compression(
350        &self,
351        file_compression_type: &FileCompressionType,
352    ) -> Result<String> {
353        let ext = self.get_ext();
354        match file_compression_type.get_variant() {
355            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
356            _ => internal_err!("Parquet FileFormat does not support compression."),
357        }
358    }
359
360    fn compression_type(&self) -> Option<FileCompressionType> {
361        None
362    }
363
364    async fn infer_schema(
365        &self,
366        state: &dyn Session,
367        store: &Arc<dyn ObjectStore>,
368        objects: &[ObjectMeta],
369    ) -> Result<SchemaRef> {
370        let coerce_int96 = match self.coerce_int96() {
371            Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
372            None => None,
373        };
374
375        let file_metadata_cache =
376            state.runtime_env().cache_manager.get_file_metadata_cache();
377
378        let mut schemas: Vec<_> = futures::stream::iter(objects)
379            .map(|object| async {
380                let file_decryption_properties = get_file_decryption_properties(
381                    state,
382                    &self.options,
383                    &object.location,
384                )
385                .await?;
386                let result = DFParquetMetadata::new(store.as_ref(), object)
387                    .with_metadata_size_hint(self.metadata_size_hint())
388                    .with_decryption_properties(file_decryption_properties.as_ref())
389                    .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
390                    .with_coerce_int96(coerce_int96)
391                    .fetch_schema_with_location()
392                    .await?;
393                Ok::<_, DataFusionError>(result)
394            })
395            .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
396            // fetch schemas concurrently, if requested
397            .buffered(state.config_options().execution.meta_fetch_concurrency)
398            .try_collect()
399            .await?;
400
401        // Schema inference adds fields based the order they are seen
402        // which depends on the order the files are processed. For some
403        // object stores (like local file systems) the order returned from list
404        // is not deterministic. Thus, to ensure deterministic schema inference
405        // sort the files first.
406        // https://github.com/apache/datafusion/pull/6629
407        schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
408
409        let schemas = schemas
410            .into_iter()
411            .map(|(_, schema)| schema)
412            .collect::<Vec<_>>();
413
414        let schema = if self.skip_metadata() {
415            Schema::try_merge(clear_metadata(schemas))
416        } else {
417            Schema::try_merge(schemas)
418        }?;
419
420        let schema = if self.binary_as_string() {
421            transform_binary_to_string(&schema)
422        } else {
423            schema
424        };
425
426        let schema = if self.force_view_types() {
427            transform_schema_to_view(&schema)
428        } else {
429            schema
430        };
431
432        Ok(Arc::new(schema))
433    }
434
435    async fn infer_stats(
436        &self,
437        state: &dyn Session,
438        store: &Arc<dyn ObjectStore>,
439        table_schema: SchemaRef,
440        object: &ObjectMeta,
441    ) -> Result<Statistics> {
442        let file_decryption_properties =
443            get_file_decryption_properties(state, &self.options, &object.location)
444                .await?;
445        let file_metadata_cache =
446            state.runtime_env().cache_manager.get_file_metadata_cache();
447        DFParquetMetadata::new(store, object)
448            .with_metadata_size_hint(self.metadata_size_hint())
449            .with_decryption_properties(file_decryption_properties.as_ref())
450            .with_file_metadata_cache(Some(file_metadata_cache))
451            .fetch_statistics(&table_schema)
452            .await
453    }
454
455    async fn create_physical_plan(
456        &self,
457        state: &dyn Session,
458        conf: FileScanConfig,
459    ) -> Result<Arc<dyn ExecutionPlan>> {
460        let mut metadata_size_hint = None;
461
462        if let Some(metadata) = self.metadata_size_hint() {
463            metadata_size_hint = Some(metadata);
464        }
465
466        let mut source = ParquetSource::new(self.options.clone());
467
468        // Use the CachedParquetFileReaderFactory
469        let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
470        let store = state
471            .runtime_env()
472            .object_store(conf.object_store_url.clone())?;
473        let cached_parquet_read_factory =
474            Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
475        source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
476
477        if let Some(metadata_size_hint) = metadata_size_hint {
478            source = source.with_metadata_size_hint(metadata_size_hint)
479        }
480
481        source = self.set_source_encryption_factory(source, state)?;
482
483        // Apply schema adapter factory before building the new config
484        let file_source = source.apply_schema_adapter(&conf)?;
485
486        let conf = FileScanConfigBuilder::from(conf)
487            .with_source(file_source)
488            .build();
489        Ok(DataSourceExec::from_data_source(conf))
490    }
491
492    async fn create_writer_physical_plan(
493        &self,
494        input: Arc<dyn ExecutionPlan>,
495        _state: &dyn Session,
496        conf: FileSinkConfig,
497        order_requirements: Option<LexRequirement>,
498    ) -> Result<Arc<dyn ExecutionPlan>> {
499        if conf.insert_op != InsertOp::Append {
500            return not_impl_err!("Overwrites are not implemented yet for Parquet");
501        }
502
503        let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
504
505        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
506    }
507
508    fn file_source(&self) -> Arc<dyn FileSource> {
509        Arc::new(ParquetSource::default())
510    }
511}
512
513#[cfg(feature = "parquet_encryption")]
514impl ParquetFormat {
515    fn set_source_encryption_factory(
516        &self,
517        source: ParquetSource,
518        state: &dyn Session,
519    ) -> Result<ParquetSource> {
520        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
521            Ok(source.with_encryption_factory(
522                state
523                    .runtime_env()
524                    .parquet_encryption_factory(encryption_factory_id)?,
525            ))
526        } else {
527            Ok(source)
528        }
529    }
530}
531
532#[cfg(not(feature = "parquet_encryption"))]
533impl ParquetFormat {
534    fn set_source_encryption_factory(
535        &self,
536        source: ParquetSource,
537        _state: &dyn Session,
538    ) -> Result<ParquetSource> {
539        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
540            Err(DataFusionError::Configuration(
541                format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled")))
542        } else {
543            Ok(source)
544        }
545    }
546}
547
548/// Apply necessary schema type coercions to make file schema match table schema.
549///
550/// This function performs two main types of transformations in a single pass:
551/// 1. Binary types to string types conversion - Converts binary data types to their
552///    corresponding string types when the table schema expects string data
553/// 2. Regular to view types conversion - Converts standard string/binary types to
554///    view types when the table schema uses view types
555///
556/// # Arguments
557/// * `table_schema` - The table schema containing the desired types
558/// * `file_schema` - The file schema to be transformed
559///
560/// # Returns
561/// * `Some(Schema)` - If any transformations were applied, returns the transformed schema
562/// * `None` - If no transformations were needed
563pub fn apply_file_schema_type_coercions(
564    table_schema: &Schema,
565    file_schema: &Schema,
566) -> Option<Schema> {
567    let mut needs_view_transform = false;
568    let mut needs_string_transform = false;
569
570    // Create a mapping of table field names to their data types for fast lookup
571    // and simultaneously check if we need any transformations
572    let table_fields: HashMap<_, _> = table_schema
573        .fields()
574        .iter()
575        .map(|f| {
576            let dt = f.data_type();
577            // Check if we need view type transformation
578            if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
579                needs_view_transform = true;
580            }
581            // Check if we need string type transformation
582            if matches!(
583                dt,
584                &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
585            ) {
586                needs_string_transform = true;
587            }
588
589            (f.name(), dt)
590        })
591        .collect();
592
593    // Early return if no transformation needed
594    if !needs_view_transform && !needs_string_transform {
595        return None;
596    }
597
598    let transformed_fields: Vec<Arc<Field>> = file_schema
599        .fields()
600        .iter()
601        .map(|field| {
602            let field_name = field.name();
603            let field_type = field.data_type();
604
605            // Look up the corresponding field type in the table schema
606            if let Some(table_type) = table_fields.get(field_name) {
607                match (table_type, field_type) {
608                    // table schema uses string type, coerce the file schema to use string type
609                    (
610                        &DataType::Utf8,
611                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
612                    ) => {
613                        return field_with_new_type(field, DataType::Utf8);
614                    }
615                    // table schema uses large string type, coerce the file schema to use large string type
616                    (
617                        &DataType::LargeUtf8,
618                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
619                    ) => {
620                        return field_with_new_type(field, DataType::LargeUtf8);
621                    }
622                    // table schema uses string view type, coerce the file schema to use view type
623                    (
624                        &DataType::Utf8View,
625                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
626                    ) => {
627                        return field_with_new_type(field, DataType::Utf8View);
628                    }
629                    // Handle view type conversions
630                    (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
631                        return field_with_new_type(field, DataType::Utf8View);
632                    }
633                    (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
634                        return field_with_new_type(field, DataType::BinaryView);
635                    }
636                    _ => {}
637                }
638            }
639
640            // If no transformation is needed, keep the original field
641            Arc::clone(field)
642        })
643        .collect();
644
645    Some(Schema::new_with_metadata(
646        transformed_fields,
647        file_schema.metadata.clone(),
648    ))
649}
650
651/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96.
652pub fn coerce_int96_to_resolution(
653    parquet_schema: &SchemaDescriptor,
654    file_schema: &Schema,
655    time_unit: &TimeUnit,
656) -> Option<Schema> {
657    // Traverse the parquet_schema columns looking for int96 physical types. If encountered, insert
658    // the field's full path into a set.
659    let int96_fields: HashSet<_> = parquet_schema
660        .columns()
661        .iter()
662        .filter(|f| f.physical_type() == Type::INT96)
663        .map(|f| f.path().string())
664        .collect();
665
666    if int96_fields.is_empty() {
667        // The schema doesn't contain any int96 fields, so skip the remaining logic.
668        return None;
669    }
670
671    // Do a DFS into the schema using a stack, looking for timestamp(nanos) fields that originated
672    // as int96 to coerce to the provided time_unit.
673
674    type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
675    type StackContext<'a> = (
676        Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1") for the current field.
677        &'a FieldRef, // The current field to be processed.
678        NestedFields, // The parent's fields that this field will be (possibly) type-coerced and
679        // inserted into. All fields have a parent, so this is not an Option type.
680        Option<NestedFields>, // Nested types need to create their own vector of fields for their
681                              // children. For primitive types this will remain None. For nested
682                              // types it is None the first time they are processed. Then, we
683                              // instantiate a vector for its children, push the field back onto the
684                              // stack to be processed again, and DFS into its children. The next
685                              // time we process the field, we know we have DFS'd into the children
686                              // because this field is Some.
687    );
688
689    // This is our top-level fields from which we will construct our schema. We pass this into our
690    // initial stack context as the parent fields, and the DFS populates it.
691    let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
692
693    // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we
694    // use some sort of LPM data structure to check if we're currently DFS'ing nested types that are
695    // in a column path that contains an int96. That can be a future optimization for large schemas.
696    let transformed_schema = {
697        // Populate the stack with our top-level fields.
698        let mut stack: Vec<StackContext> = file_schema
699            .fields()
700            .iter()
701            .rev()
702            .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
703            .collect();
704
705        // Pop fields to DFS into until we have exhausted the stack.
706        while let Some((parquet_path, current_field, parent_fields, child_fields)) =
707            stack.pop()
708        {
709            match (current_field.data_type(), child_fields) {
710                (DataType::Struct(unprocessed_children), None) => {
711                    // This is the first time popping off this struct. We don't yet know the
712                    // correct types of its children (i.e., if they need coercing) so we create
713                    // a vector for child_fields, push the struct node back onto the stack to be
714                    // processed again (see below) after processing all its children.
715                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
716                        unprocessed_children.len(),
717                    )));
718                    // Note that here we push the struct back onto the stack with its
719                    // parent_fields in the same position, now with Some(child_fields).
720                    stack.push((
721                        parquet_path.clone(),
722                        current_field,
723                        parent_fields,
724                        Some(Rc::clone(&child_fields)),
725                    ));
726                    // Push all the children in reverse to maintain original schema order due to
727                    // stack processing.
728                    for child in unprocessed_children.into_iter().rev() {
729                        let mut child_path = parquet_path.clone();
730                        // Build up a normalized path that we'll use as a key into the original
731                        // int96_fields set above to test if this originated as int96.
732                        child_path.push(".");
733                        child_path.push(child.name());
734                        // Note that here we push the field onto the stack using the struct's
735                        // new child_fields vector as the field's parent_fields.
736                        stack.push((child_path, child, Rc::clone(&child_fields), None));
737                    }
738                }
739                (DataType::Struct(unprocessed_children), Some(processed_children)) => {
740                    // This is the second time popping off this struct. The child_fields vector
741                    // now contains each field that has been DFS'd into, and we can construct
742                    // the resulting struct with correct child types.
743                    let processed_children = processed_children.borrow();
744                    assert_eq!(processed_children.len(), unprocessed_children.len());
745                    let processed_struct = Field::new_struct(
746                        current_field.name(),
747                        processed_children.as_slice(),
748                        current_field.is_nullable(),
749                    );
750                    parent_fields.borrow_mut().push(Arc::new(processed_struct));
751                }
752                (DataType::List(unprocessed_child), None) => {
753                    // This is the first time popping off this list. See struct docs above.
754                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
755                    stack.push((
756                        parquet_path.clone(),
757                        current_field,
758                        parent_fields,
759                        Some(Rc::clone(&child_fields)),
760                    ));
761                    let mut child_path = parquet_path.clone();
762                    // Spark uses a definition for arrays/lists that results in a group
763                    // named "list" that is not maintained when parsing to Arrow. We just push
764                    // this name into the path.
765                    child_path.push(".list.");
766                    child_path.push(unprocessed_child.name());
767                    stack.push((
768                        child_path.clone(),
769                        unprocessed_child,
770                        Rc::clone(&child_fields),
771                        None,
772                    ));
773                }
774                (DataType::List(_), Some(processed_children)) => {
775                    // This is the second time popping off this list. See struct docs above.
776                    let processed_children = processed_children.borrow();
777                    assert_eq!(processed_children.len(), 1);
778                    let processed_list = Field::new_list(
779                        current_field.name(),
780                        Arc::clone(&processed_children[0]),
781                        current_field.is_nullable(),
782                    );
783                    parent_fields.borrow_mut().push(Arc::new(processed_list));
784                }
785                (DataType::Map(unprocessed_child, _), None) => {
786                    // This is the first time popping off this map. See struct docs above.
787                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
788                    stack.push((
789                        parquet_path.clone(),
790                        current_field,
791                        parent_fields,
792                        Some(Rc::clone(&child_fields)),
793                    ));
794                    let mut child_path = parquet_path.clone();
795                    child_path.push(".");
796                    child_path.push(unprocessed_child.name());
797                    stack.push((
798                        child_path.clone(),
799                        unprocessed_child,
800                        Rc::clone(&child_fields),
801                        None,
802                    ));
803                }
804                (DataType::Map(_, sorted), Some(processed_children)) => {
805                    // This is the second time popping off this map. See struct docs above.
806                    let processed_children = processed_children.borrow();
807                    assert_eq!(processed_children.len(), 1);
808                    let processed_map = Field::new(
809                        current_field.name(),
810                        DataType::Map(Arc::clone(&processed_children[0]), *sorted),
811                        current_field.is_nullable(),
812                    );
813                    parent_fields.borrow_mut().push(Arc::new(processed_map));
814                }
815                (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
816                    if int96_fields.contains(parquet_path.concat().as_str()) =>
817                // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct
818                // time_unit.
819                {
820                    parent_fields.borrow_mut().push(field_with_new_type(
821                        current_field,
822                        DataType::Timestamp(*time_unit, None),
823                    ));
824                }
825                // Other types can be cloned as they are.
826                _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
827            }
828        }
829        assert_eq!(fields.borrow().len(), file_schema.fields.len());
830        Schema::new_with_metadata(
831            fields.borrow_mut().clone(),
832            file_schema.metadata.clone(),
833        )
834    };
835
836    Some(transformed_schema)
837}
838
839/// Coerces the file schema if the table schema uses a view type.
840#[deprecated(
841    since = "47.0.0",
842    note = "Use `apply_file_schema_type_coercions` instead"
843)]
844pub fn coerce_file_schema_to_view_type(
845    table_schema: &Schema,
846    file_schema: &Schema,
847) -> Option<Schema> {
848    let mut transform = false;
849    let table_fields: HashMap<_, _> = table_schema
850        .fields
851        .iter()
852        .map(|f| {
853            let dt = f.data_type();
854            if dt.equals_datatype(&DataType::Utf8View)
855                || dt.equals_datatype(&DataType::BinaryView)
856            {
857                transform = true;
858            }
859            (f.name(), dt)
860        })
861        .collect();
862
863    if !transform {
864        return None;
865    }
866
867    let transformed_fields: Vec<Arc<Field>> = file_schema
868        .fields
869        .iter()
870        .map(
871            |field| match (table_fields.get(field.name()), field.data_type()) {
872                (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
873                    field_with_new_type(field, DataType::Utf8View)
874                }
875                (
876                    Some(DataType::BinaryView),
877                    DataType::Binary | DataType::LargeBinary,
878                ) => field_with_new_type(field, DataType::BinaryView),
879                _ => Arc::clone(field),
880            },
881        )
882        .collect();
883
884    Some(Schema::new_with_metadata(
885        transformed_fields,
886        file_schema.metadata.clone(),
887    ))
888}
889
890/// If the table schema uses a string type, coerce the file schema to use a string type.
891///
892/// See [ParquetFormat::binary_as_string] for details
893#[deprecated(
894    since = "47.0.0",
895    note = "Use `apply_file_schema_type_coercions` instead"
896)]
897pub fn coerce_file_schema_to_string_type(
898    table_schema: &Schema,
899    file_schema: &Schema,
900) -> Option<Schema> {
901    let mut transform = false;
902    let table_fields: HashMap<_, _> = table_schema
903        .fields
904        .iter()
905        .map(|f| (f.name(), f.data_type()))
906        .collect();
907    let transformed_fields: Vec<Arc<Field>> = file_schema
908        .fields
909        .iter()
910        .map(
911            |field| match (table_fields.get(field.name()), field.data_type()) {
912                // table schema uses string type, coerce the file schema to use string type
913                (
914                    Some(DataType::Utf8),
915                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
916                ) => {
917                    transform = true;
918                    field_with_new_type(field, DataType::Utf8)
919                }
920                // table schema uses large string type, coerce the file schema to use large string type
921                (
922                    Some(DataType::LargeUtf8),
923                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
924                ) => {
925                    transform = true;
926                    field_with_new_type(field, DataType::LargeUtf8)
927                }
928                // table schema uses string view type, coerce the file schema to use view type
929                (
930                    Some(DataType::Utf8View),
931                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
932                ) => {
933                    transform = true;
934                    field_with_new_type(field, DataType::Utf8View)
935                }
936                _ => Arc::clone(field),
937            },
938        )
939        .collect();
940
941    if !transform {
942        None
943    } else {
944        Some(Schema::new_with_metadata(
945            transformed_fields,
946            file_schema.metadata.clone(),
947        ))
948    }
949}
950
951/// Create a new field with the specified data type, copying the other
952/// properties from the input field
953fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
954    Arc::new(field.as_ref().clone().with_data_type(new_type))
955}
956
957/// Transform a schema to use view types for Utf8 and Binary
958///
959/// See [ParquetFormat::force_view_types] for details
960pub fn transform_schema_to_view(schema: &Schema) -> Schema {
961    let transformed_fields: Vec<Arc<Field>> = schema
962        .fields
963        .iter()
964        .map(|field| match field.data_type() {
965            DataType::Utf8 | DataType::LargeUtf8 => {
966                field_with_new_type(field, DataType::Utf8View)
967            }
968            DataType::Binary | DataType::LargeBinary => {
969                field_with_new_type(field, DataType::BinaryView)
970            }
971            _ => Arc::clone(field),
972        })
973        .collect();
974    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
975}
976
977/// Transform a schema so that any binary types are strings
978pub fn transform_binary_to_string(schema: &Schema) -> Schema {
979    let transformed_fields: Vec<Arc<Field>> = schema
980        .fields
981        .iter()
982        .map(|field| match field.data_type() {
983            DataType::Binary => field_with_new_type(field, DataType::Utf8),
984            DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
985            DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
986            _ => Arc::clone(field),
987        })
988        .collect();
989    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
990}
991
992/// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`]
993pub struct ObjectStoreFetch<'a> {
994    store: &'a dyn ObjectStore,
995    meta: &'a ObjectMeta,
996}
997
998impl<'a> ObjectStoreFetch<'a> {
999    pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
1000        Self { store, meta }
1001    }
1002}
1003
1004impl MetadataFetch for ObjectStoreFetch<'_> {
1005    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
1006        async {
1007            self.store
1008                .get_range(&self.meta.location, range)
1009                .await
1010                .map_err(ParquetError::from)
1011        }
1012        .boxed()
1013    }
1014}
1015
1016/// Fetches parquet metadata from ObjectStore for given object
1017///
1018/// This component is a subject to **change** in near future and is exposed for low level integrations
1019/// through [`ParquetFileReaderFactory`].
1020///
1021/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory
1022#[deprecated(
1023    since = "50.0.0",
1024    note = "Use `DFParquetMetadata::fetch_metadata` instead"
1025)]
1026pub async fn fetch_parquet_metadata(
1027    store: &dyn ObjectStore,
1028    object_meta: &ObjectMeta,
1029    size_hint: Option<usize>,
1030    #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>,
1031    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1032) -> Result<Arc<ParquetMetaData>> {
1033    DFParquetMetadata::new(store, object_meta)
1034        .with_metadata_size_hint(size_hint)
1035        .with_decryption_properties(decryption_properties)
1036        .with_file_metadata_cache(file_metadata_cache)
1037        .fetch_metadata()
1038        .await
1039}
1040
1041/// Read and parse the statistics of the Parquet file at location `path`
1042///
1043/// See [`statistics_from_parquet_meta_calc`] for more details
1044#[deprecated(
1045    since = "50.0.0",
1046    note = "Use `DFParquetMetadata::fetch_statistics` instead"
1047)]
1048pub async fn fetch_statistics(
1049    store: &dyn ObjectStore,
1050    table_schema: SchemaRef,
1051    file: &ObjectMeta,
1052    metadata_size_hint: Option<usize>,
1053    decryption_properties: Option<&FileDecryptionProperties>,
1054    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1055) -> Result<Statistics> {
1056    DFParquetMetadata::new(store, file)
1057        .with_metadata_size_hint(metadata_size_hint)
1058        .with_decryption_properties(decryption_properties)
1059        .with_file_metadata_cache(file_metadata_cache)
1060        .fetch_statistics(&table_schema)
1061        .await
1062}
1063
1064#[deprecated(
1065    since = "50.0.0",
1066    note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
1067)]
1068pub fn statistics_from_parquet_meta_calc(
1069    metadata: &ParquetMetaData,
1070    table_schema: SchemaRef,
1071) -> Result<Statistics> {
1072    DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
1073}
1074
1075/// Implements [`DataSink`] for writing to a parquet file.
1076pub struct ParquetSink {
1077    /// Config options for writing data
1078    config: FileSinkConfig,
1079    /// Underlying parquet options
1080    parquet_options: TableParquetOptions,
1081    /// File metadata from successfully produced parquet files. The Mutex is only used
1082    /// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
1083    written: Arc<parking_lot::Mutex<HashMap<Path, FileMetaData>>>,
1084}
1085
1086impl Debug for ParquetSink {
1087    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088        f.debug_struct("ParquetSink").finish()
1089    }
1090}
1091
1092impl DisplayAs for ParquetSink {
1093    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094        match t {
1095            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1096                write!(f, "ParquetSink(file_groups=",)?;
1097                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1098                write!(f, ")")
1099            }
1100            DisplayFormatType::TreeRender => {
1101                // TODO: collect info
1102                write!(f, "")
1103            }
1104        }
1105    }
1106}
1107
1108impl ParquetSink {
1109    /// Create from config.
1110    pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1111        Self {
1112            config,
1113            parquet_options,
1114            written: Default::default(),
1115        }
1116    }
1117
1118    /// Retrieve the file metadata for the written files, keyed to the path
1119    /// which may be partitioned (in the case of hive style partitioning).
1120    pub fn written(&self) -> HashMap<Path, FileMetaData> {
1121        self.written.lock().clone()
1122    }
1123
1124    /// Create writer properties based upon configuration settings,
1125    /// including partitioning and the inclusion of arrow schema metadata.
1126    async fn create_writer_props(
1127        &self,
1128        runtime: &Arc<RuntimeEnv>,
1129        path: &Path,
1130    ) -> Result<WriterProperties> {
1131        let schema = if self.parquet_options.global.allow_single_file_parallelism {
1132            // If parallelizing writes, we may be also be doing hive style partitioning
1133            // into multiple files which impacts the schema per file.
1134            // Refer to `get_writer_schema()`
1135            &get_writer_schema(&self.config)
1136        } else {
1137            self.config.output_schema()
1138        };
1139
1140        // TODO: avoid this clone in follow up PR, where the writer properties & schema
1141        // are calculated once on `ParquetSink::new`
1142        let mut parquet_opts = self.parquet_options.clone();
1143        if !self.parquet_options.global.skip_arrow_metadata {
1144            parquet_opts.arrow_schema(schema);
1145        }
1146
1147        let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1148        builder = set_writer_encryption_properties(
1149            builder,
1150            runtime,
1151            &parquet_opts,
1152            schema,
1153            path,
1154        )
1155        .await?;
1156        Ok(builder.build())
1157    }
1158
1159    /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
1160    /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
1161    async fn create_async_arrow_writer(
1162        &self,
1163        location: &Path,
1164        object_store: Arc<dyn ObjectStore>,
1165        context: &Arc<TaskContext>,
1166        parquet_props: WriterProperties,
1167    ) -> Result<AsyncArrowWriter<BufWriter>> {
1168        let buf_writer = BufWriter::with_capacity(
1169            object_store,
1170            location.clone(),
1171            context
1172                .session_config()
1173                .options()
1174                .execution
1175                .objectstore_writer_buffer_size,
1176        );
1177        let options = ArrowWriterOptions::new()
1178            .with_properties(parquet_props)
1179            .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1180
1181        let writer = AsyncArrowWriter::try_new_with_options(
1182            buf_writer,
1183            get_writer_schema(&self.config),
1184            options,
1185        )?;
1186        Ok(writer)
1187    }
1188
1189    /// Parquet options
1190    pub fn parquet_options(&self) -> &TableParquetOptions {
1191        &self.parquet_options
1192    }
1193}
1194
1195#[cfg(feature = "parquet_encryption")]
1196async fn set_writer_encryption_properties(
1197    builder: WriterPropertiesBuilder,
1198    runtime: &Arc<RuntimeEnv>,
1199    parquet_opts: &TableParquetOptions,
1200    schema: &Arc<Schema>,
1201    path: &Path,
1202) -> Result<WriterPropertiesBuilder> {
1203    if let Some(file_encryption_properties) = &parquet_opts.crypto.file_encryption {
1204        // Encryption properties have been specified directly
1205        return Ok(builder
1206            .with_file_encryption_properties(file_encryption_properties.clone().into()));
1207    } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
1208        // Encryption properties will be generated by an encryption factory
1209        let encryption_factory =
1210            runtime.parquet_encryption_factory(encryption_factory_id)?;
1211        let file_encryption_properties = encryption_factory
1212            .get_file_encryption_properties(
1213                &parquet_opts.crypto.factory_options,
1214                schema,
1215                path,
1216            )
1217            .await?;
1218        if let Some(file_encryption_properties) = file_encryption_properties {
1219            return Ok(
1220                builder.with_file_encryption_properties(file_encryption_properties)
1221            );
1222        }
1223    }
1224    Ok(builder)
1225}
1226
1227#[cfg(not(feature = "parquet_encryption"))]
1228async fn set_writer_encryption_properties(
1229    builder: WriterPropertiesBuilder,
1230    _runtime: &Arc<RuntimeEnv>,
1231    _parquet_opts: &TableParquetOptions,
1232    _schema: &Arc<Schema>,
1233    _path: &Path,
1234) -> Result<WriterPropertiesBuilder> {
1235    Ok(builder)
1236}
1237
1238#[async_trait]
1239impl FileSink for ParquetSink {
1240    fn config(&self) -> &FileSinkConfig {
1241        &self.config
1242    }
1243
1244    async fn spawn_writer_tasks_and_join(
1245        &self,
1246        context: &Arc<TaskContext>,
1247        demux_task: SpawnedTask<Result<()>>,
1248        mut file_stream_rx: DemuxedStreamReceiver,
1249        object_store: Arc<dyn ObjectStore>,
1250    ) -> Result<u64> {
1251        let parquet_opts = &self.parquet_options;
1252        let mut allow_single_file_parallelism =
1253            parquet_opts.global.allow_single_file_parallelism;
1254
1255        if parquet_opts.crypto.file_encryption.is_some()
1256            || parquet_opts.crypto.factory_id.is_some()
1257        {
1258            // For now, arrow-rs does not support parallel writes with encryption
1259            // See https://github.com/apache/arrow-rs/issues/7359
1260            allow_single_file_parallelism = false;
1261        }
1262
1263        let mut file_write_tasks: JoinSet<
1264            std::result::Result<(Path, FileMetaData), DataFusionError>,
1265        > = JoinSet::new();
1266
1267        let runtime = context.runtime_env();
1268        let parallel_options = ParallelParquetWriterOptions {
1269            max_parallel_row_groups: parquet_opts
1270                .global
1271                .maximum_parallel_row_group_writers,
1272            max_buffered_record_batches_per_stream: parquet_opts
1273                .global
1274                .maximum_buffered_record_batches_per_stream,
1275        };
1276
1277        while let Some((path, mut rx)) = file_stream_rx.recv().await {
1278            let parquet_props = self.create_writer_props(&runtime, &path).await?;
1279            if !allow_single_file_parallelism {
1280                let mut writer = self
1281                    .create_async_arrow_writer(
1282                        &path,
1283                        Arc::clone(&object_store),
1284                        context,
1285                        parquet_props.clone(),
1286                    )
1287                    .await?;
1288                let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
1289                    .register(context.memory_pool());
1290                file_write_tasks.spawn(async move {
1291                    while let Some(batch) = rx.recv().await {
1292                        writer.write(&batch).await?;
1293                        reservation.try_resize(writer.memory_size())?;
1294                    }
1295                    let file_metadata = writer
1296                        .close()
1297                        .await
1298                        .map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1299                    Ok((path, file_metadata))
1300                });
1301            } else {
1302                let writer = ObjectWriterBuilder::new(
1303                    // Parquet files as a whole are never compressed, since they
1304                    // manage compressed blocks themselves.
1305                    FileCompressionType::UNCOMPRESSED,
1306                    &path,
1307                    Arc::clone(&object_store),
1308                )
1309                .with_buffer_size(Some(
1310                    context
1311                        .session_config()
1312                        .options()
1313                        .execution
1314                        .objectstore_writer_buffer_size,
1315                ))
1316                .build()?;
1317                let schema = get_writer_schema(&self.config);
1318                let props = parquet_props.clone();
1319                let parallel_options_clone = parallel_options.clone();
1320                let pool = Arc::clone(context.memory_pool());
1321                file_write_tasks.spawn(async move {
1322                    let file_metadata = output_single_parquet_file_parallelized(
1323                        writer,
1324                        rx,
1325                        schema,
1326                        &props,
1327                        parallel_options_clone,
1328                        pool,
1329                    )
1330                    .await?;
1331                    Ok((path, file_metadata))
1332                });
1333            }
1334        }
1335
1336        let mut row_count = 0;
1337        while let Some(result) = file_write_tasks.join_next().await {
1338            match result {
1339                Ok(r) => {
1340                    let (path, file_metadata) = r?;
1341                    row_count += file_metadata.num_rows;
1342                    let mut written_files = self.written.lock();
1343                    written_files
1344                        .try_insert(path.clone(), file_metadata)
1345                        .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1346                    drop(written_files);
1347                }
1348                Err(e) => {
1349                    if e.is_panic() {
1350                        std::panic::resume_unwind(e.into_panic());
1351                    } else {
1352                        unreachable!();
1353                    }
1354                }
1355            }
1356        }
1357
1358        demux_task
1359            .join_unwind()
1360            .await
1361            .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1362
1363        Ok(row_count as u64)
1364    }
1365}
1366
1367#[async_trait]
1368impl DataSink for ParquetSink {
1369    fn as_any(&self) -> &dyn Any {
1370        self
1371    }
1372
1373    fn schema(&self) -> &SchemaRef {
1374        self.config.output_schema()
1375    }
1376
1377    async fn write_all(
1378        &self,
1379        data: SendableRecordBatchStream,
1380        context: &Arc<TaskContext>,
1381    ) -> Result<u64> {
1382        FileSink::write_all(self, data, context).await
1383    }
1384}
1385
1386/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them using an [ArrowColumnWriter]
1387/// Once the channel is exhausted, returns the ArrowColumnWriter.
1388async fn column_serializer_task(
1389    mut rx: Receiver<ArrowLeafColumn>,
1390    mut writer: ArrowColumnWriter,
1391    mut reservation: MemoryReservation,
1392) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1393    while let Some(col) = rx.recv().await {
1394        writer.write(&col)?;
1395        reservation.try_resize(writer.memory_size())?;
1396    }
1397    Ok((writer, reservation))
1398}
1399
1400type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1401type ColSender = Sender<ArrowLeafColumn>;
1402
1403/// Spawns a parallel serialization task for each column
1404/// Returns join handles for each columns serialization task along with a send channel
1405/// to send arrow arrays to each serialization task.
1406fn spawn_column_parallel_row_group_writer(
1407    schema: Arc<Schema>,
1408    parquet_props: Arc<WriterProperties>,
1409    max_buffer_size: usize,
1410    pool: &Arc<dyn MemoryPool>,
1411) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1412    let schema_desc = ArrowSchemaConverter::new().convert(&schema)?;
1413    let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
1414    let num_columns = col_writers.len();
1415
1416    let mut col_writer_tasks = Vec::with_capacity(num_columns);
1417    let mut col_array_channels = Vec::with_capacity(num_columns);
1418    for writer in col_writers.into_iter() {
1419        // Buffer size of this channel limits the number of arrays queued up for column level serialization
1420        let (send_array, receive_array) =
1421            mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1422        col_array_channels.push(send_array);
1423
1424        let reservation =
1425            MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1426        let task = SpawnedTask::spawn(column_serializer_task(
1427            receive_array,
1428            writer,
1429            reservation,
1430        ));
1431        col_writer_tasks.push(task);
1432    }
1433
1434    Ok((col_writer_tasks, col_array_channels))
1435}
1436
1437/// Settings related to writing parquet files in parallel
1438#[derive(Clone)]
1439struct ParallelParquetWriterOptions {
1440    max_parallel_row_groups: usize,
1441    max_buffered_record_batches_per_stream: usize,
1442}
1443
1444/// This is the return type of calling [ArrowColumnWriter].close() on each column
1445/// i.e. the Vec of encoded columns which can be appended to a row group
1446type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1447
1448/// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective
1449/// parallel column serializers.
1450async fn send_arrays_to_col_writers(
1451    col_array_channels: &[ColSender],
1452    rb: &RecordBatch,
1453    schema: Arc<Schema>,
1454) -> Result<()> {
1455    // Each leaf column has its own channel, increment next_channel for each leaf column sent.
1456    let mut next_channel = 0;
1457    for (array, field) in rb.columns().iter().zip(schema.fields()) {
1458        for c in compute_leaves(field, array)? {
1459            // Do not surface error from closed channel (means something
1460            // else hit an error, and the plan is shutting down).
1461            if col_array_channels[next_channel].send(c).await.is_err() {
1462                return Ok(());
1463            }
1464
1465            next_channel += 1;
1466        }
1467    }
1468
1469    Ok(())
1470}
1471
1472/// Spawns a tokio task which joins the parallel column writer tasks,
1473/// and finalizes the row group
1474fn spawn_rg_join_and_finalize_task(
1475    column_writer_tasks: Vec<ColumnWriterTask>,
1476    rg_rows: usize,
1477    pool: &Arc<dyn MemoryPool>,
1478) -> SpawnedTask<RBStreamSerializeResult> {
1479    let mut rg_reservation =
1480        MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1481
1482    SpawnedTask::spawn(async move {
1483        let num_cols = column_writer_tasks.len();
1484        let mut finalized_rg = Vec::with_capacity(num_cols);
1485        for task in column_writer_tasks.into_iter() {
1486            let (writer, _col_reservation) = task
1487                .join_unwind()
1488                .await
1489                .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1490            let encoded_size = writer.get_estimated_total_bytes();
1491            rg_reservation.grow(encoded_size);
1492            finalized_rg.push(writer.close()?);
1493        }
1494
1495        Ok((finalized_rg, rg_reservation, rg_rows))
1496    })
1497}
1498
1499/// This task coordinates the serialization of a parquet file in parallel.
1500/// As the query produces RecordBatches, these are written to a RowGroup
1501/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per
1502/// row group is reached, the parallel tasks are joined on another separate task
1503/// and sent to a concatenation task. This task immediately continues to work
1504/// on the next row group in parallel. So, parquet serialization is parallelized
1505/// across both columns and row_groups, with a theoretical max number of parallel tasks
1506/// given by n_columns * num_row_groups.
1507fn spawn_parquet_parallel_serialization_task(
1508    mut data: Receiver<RecordBatch>,
1509    serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1510    schema: Arc<Schema>,
1511    writer_props: Arc<WriterProperties>,
1512    parallel_options: ParallelParquetWriterOptions,
1513    pool: Arc<dyn MemoryPool>,
1514) -> SpawnedTask<Result<(), DataFusionError>> {
1515    SpawnedTask::spawn(async move {
1516        let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1517        let max_row_group_rows = writer_props.max_row_group_size();
1518        let (mut column_writer_handles, mut col_array_channels) =
1519            spawn_column_parallel_row_group_writer(
1520                Arc::clone(&schema),
1521                Arc::clone(&writer_props),
1522                max_buffer_rb,
1523                &pool,
1524            )?;
1525        let mut current_rg_rows = 0;
1526
1527        while let Some(mut rb) = data.recv().await {
1528            // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
1529            // when max_row_group_rows < execution.batch_size as an alternative to a recursive async
1530            // function.
1531            loop {
1532                if current_rg_rows + rb.num_rows() < max_row_group_rows {
1533                    send_arrays_to_col_writers(
1534                        &col_array_channels,
1535                        &rb,
1536                        Arc::clone(&schema),
1537                    )
1538                    .await?;
1539                    current_rg_rows += rb.num_rows();
1540                    break;
1541                } else {
1542                    let rows_left = max_row_group_rows - current_rg_rows;
1543                    let a = rb.slice(0, rows_left);
1544                    send_arrays_to_col_writers(
1545                        &col_array_channels,
1546                        &a,
1547                        Arc::clone(&schema),
1548                    )
1549                    .await?;
1550
1551                    // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
1552                    // on a separate task, so that we can immediately start on the next RG before waiting
1553                    // for the current one to finish.
1554                    drop(col_array_channels);
1555                    let finalize_rg_task = spawn_rg_join_and_finalize_task(
1556                        column_writer_handles,
1557                        max_row_group_rows,
1558                        &pool,
1559                    );
1560
1561                    // Do not surface error from closed channel (means something
1562                    // else hit an error, and the plan is shutting down).
1563                    if serialize_tx.send(finalize_rg_task).await.is_err() {
1564                        return Ok(());
1565                    }
1566
1567                    current_rg_rows = 0;
1568                    rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1569
1570                    (column_writer_handles, col_array_channels) =
1571                        spawn_column_parallel_row_group_writer(
1572                            Arc::clone(&schema),
1573                            Arc::clone(&writer_props),
1574                            max_buffer_rb,
1575                            &pool,
1576                        )?;
1577                }
1578            }
1579        }
1580
1581        drop(col_array_channels);
1582        // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows
1583        if current_rg_rows > 0 {
1584            let finalize_rg_task = spawn_rg_join_and_finalize_task(
1585                column_writer_handles,
1586                current_rg_rows,
1587                &pool,
1588            );
1589
1590            // Do not surface error from closed channel (means something
1591            // else hit an error, and the plan is shutting down).
1592            if serialize_tx.send(finalize_rg_task).await.is_err() {
1593                return Ok(());
1594            }
1595        }
1596
1597        Ok(())
1598    })
1599}
1600
1601/// Consume RowGroups serialized by other parallel tasks and concatenate them in
1602/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
1603async fn concatenate_parallel_row_groups(
1604    mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1605    schema: Arc<Schema>,
1606    writer_props: Arc<WriterProperties>,
1607    mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1608    pool: Arc<dyn MemoryPool>,
1609) -> Result<FileMetaData> {
1610    let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1611
1612    let mut file_reservation =
1613        MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1614
1615    let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?;
1616    let mut parquet_writer = SerializedFileWriter::new(
1617        merged_buff.clone(),
1618        schema_desc.root_schema_ptr(),
1619        writer_props,
1620    )?;
1621
1622    while let Some(task) = serialize_rx.recv().await {
1623        let result = task.join_unwind().await;
1624        let mut rg_out = parquet_writer.next_row_group()?;
1625        let (serialized_columns, mut rg_reservation, _cnt) =
1626            result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1627        for chunk in serialized_columns {
1628            chunk.append_to_row_group(&mut rg_out)?;
1629            rg_reservation.free();
1630
1631            let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1632            file_reservation.try_resize(buff_to_flush.len())?;
1633
1634            if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1635                object_store_writer
1636                    .write_all(buff_to_flush.as_slice())
1637                    .await?;
1638                buff_to_flush.clear();
1639                file_reservation.try_resize(buff_to_flush.len())?; // will set to zero
1640            }
1641        }
1642        rg_out.close()?;
1643    }
1644
1645    let file_metadata = parquet_writer.close()?;
1646    let final_buff = merged_buff.buffer.try_lock().unwrap();
1647
1648    object_store_writer.write_all(final_buff.as_slice()).await?;
1649    object_store_writer.shutdown().await?;
1650    file_reservation.free();
1651
1652    Ok(file_metadata)
1653}
1654
1655/// Parallelizes the serialization of a single parquet file, by first serializing N
1656/// independent RecordBatch streams in parallel to RowGroups in memory. Another
1657/// task then stitches these independent RowGroups together and streams this large
1658/// single parquet file to an ObjectStore in multiple parts.
1659async fn output_single_parquet_file_parallelized(
1660    object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1661    data: Receiver<RecordBatch>,
1662    output_schema: Arc<Schema>,
1663    parquet_props: &WriterProperties,
1664    parallel_options: ParallelParquetWriterOptions,
1665    pool: Arc<dyn MemoryPool>,
1666) -> Result<FileMetaData> {
1667    let max_rowgroups = parallel_options.max_parallel_row_groups;
1668    // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
1669    let (serialize_tx, serialize_rx) =
1670        mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1671
1672    let arc_props = Arc::new(parquet_props.clone());
1673    let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1674        data,
1675        serialize_tx,
1676        Arc::clone(&output_schema),
1677        Arc::clone(&arc_props),
1678        parallel_options,
1679        Arc::clone(&pool),
1680    );
1681    let file_metadata = concatenate_parallel_row_groups(
1682        serialize_rx,
1683        Arc::clone(&output_schema),
1684        Arc::clone(&arc_props),
1685        object_store_writer,
1686        pool,
1687    )
1688    .await?;
1689
1690    launch_serialization_task
1691        .join_unwind()
1692        .await
1693        .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1694    Ok(file_metadata)
1695}
1696
1697#[cfg(test)]
1698mod tests {
1699    use parquet::arrow::parquet_to_arrow_schema;
1700    use std::sync::Arc;
1701
1702    use super::*;
1703
1704    use arrow::datatypes::DataType;
1705    use parquet::schema::parser::parse_message_type;
1706
1707    #[test]
1708    fn coerce_int96_to_resolution_with_mixed_timestamps() {
1709        // Unclear if Spark (or other writer) could generate a file with mixed timestamps like this,
1710        // but we want to test the scenario just in case since it's at least a valid schema as far
1711        // as the Parquet spec is concerned.
1712        let spark_schema = "
1713        message spark_schema {
1714          optional int96 c0;
1715          optional int64 c1 (TIMESTAMP(NANOS,true));
1716          optional int64 c2 (TIMESTAMP(NANOS,false));
1717          optional int64 c3 (TIMESTAMP(MILLIS,true));
1718          optional int64 c4 (TIMESTAMP(MILLIS,false));
1719          optional int64 c5 (TIMESTAMP(MICROS,true));
1720          optional int64 c6 (TIMESTAMP(MICROS,false));
1721        }
1722        ";
1723
1724        let schema = parse_message_type(spark_schema).expect("should parse schema");
1725        let descr = SchemaDescriptor::new(Arc::new(schema));
1726
1727        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1728
1729        let result =
1730            coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1731                .unwrap();
1732
1733        // Only the first field (c0) should be converted to a microsecond timestamp because it's the
1734        // only timestamp that originated from an INT96.
1735        let expected_schema = Schema::new(vec![
1736            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1737            Field::new(
1738                "c1",
1739                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1740                true,
1741            ),
1742            Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1743            Field::new(
1744                "c3",
1745                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1746                true,
1747            ),
1748            Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1749            Field::new(
1750                "c5",
1751                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1752                true,
1753            ),
1754            Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1755        ]);
1756
1757        assert_eq!(result, expected_schema);
1758    }
1759
1760    #[test]
1761    fn coerce_int96_to_resolution_with_nested_types() {
1762        // This schema is derived from Comet's CometFuzzTestSuite ParquetGenerator only using int96
1763        // primitive types with generateStruct, generateArray, and generateMap set to true, with one
1764        // additional field added to c4's struct to make sure all fields in a struct get modified.
1765        // https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala
1766        let spark_schema = "
1767        message spark_schema {
1768          optional int96 c0;
1769          optional group c1 {
1770            optional int96 c0;
1771          }
1772          optional group c2 {
1773            optional group c0 (LIST) {
1774              repeated group list {
1775                optional int96 element;
1776              }
1777            }
1778          }
1779          optional group c3 (LIST) {
1780            repeated group list {
1781              optional int96 element;
1782            }
1783          }
1784          optional group c4 (LIST) {
1785            repeated group list {
1786              optional group element {
1787                optional int96 c0;
1788                optional int96 c1;
1789              }
1790            }
1791          }
1792          optional group c5 (MAP) {
1793            repeated group key_value {
1794              required int96 key;
1795              optional int96 value;
1796            }
1797          }
1798          optional group c6 (LIST) {
1799            repeated group list {
1800              optional group element (MAP) {
1801                repeated group key_value {
1802                  required int96 key;
1803                  optional int96 value;
1804                }
1805              }
1806            }
1807          }
1808        }
1809        ";
1810
1811        let schema = parse_message_type(spark_schema).expect("should parse schema");
1812        let descr = SchemaDescriptor::new(Arc::new(schema));
1813
1814        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1815
1816        let result =
1817            coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1818                .unwrap();
1819
1820        let expected_schema = Schema::new(vec![
1821            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1822            Field::new_struct(
1823                "c1",
1824                vec![Field::new(
1825                    "c0",
1826                    DataType::Timestamp(TimeUnit::Microsecond, None),
1827                    true,
1828                )],
1829                true,
1830            ),
1831            Field::new_struct(
1832                "c2",
1833                vec![Field::new_list(
1834                    "c0",
1835                    Field::new(
1836                        "element",
1837                        DataType::Timestamp(TimeUnit::Microsecond, None),
1838                        true,
1839                    ),
1840                    true,
1841                )],
1842                true,
1843            ),
1844            Field::new_list(
1845                "c3",
1846                Field::new(
1847                    "element",
1848                    DataType::Timestamp(TimeUnit::Microsecond, None),
1849                    true,
1850                ),
1851                true,
1852            ),
1853            Field::new_list(
1854                "c4",
1855                Field::new_struct(
1856                    "element",
1857                    vec![
1858                        Field::new(
1859                            "c0",
1860                            DataType::Timestamp(TimeUnit::Microsecond, None),
1861                            true,
1862                        ),
1863                        Field::new(
1864                            "c1",
1865                            DataType::Timestamp(TimeUnit::Microsecond, None),
1866                            true,
1867                        ),
1868                    ],
1869                    true,
1870                ),
1871                true,
1872            ),
1873            Field::new_map(
1874                "c5",
1875                "key_value",
1876                Field::new(
1877                    "key",
1878                    DataType::Timestamp(TimeUnit::Microsecond, None),
1879                    false,
1880                ),
1881                Field::new(
1882                    "value",
1883                    DataType::Timestamp(TimeUnit::Microsecond, None),
1884                    true,
1885                ),
1886                false,
1887                true,
1888            ),
1889            Field::new_list(
1890                "c6",
1891                Field::new_map(
1892                    "element",
1893                    "key_value",
1894                    Field::new(
1895                        "key",
1896                        DataType::Timestamp(TimeUnit::Microsecond, None),
1897                        false,
1898                    ),
1899                    Field::new(
1900                        "value",
1901                        DataType::Timestamp(TimeUnit::Microsecond, None),
1902                        true,
1903                    ),
1904                    false,
1905                    true,
1906                ),
1907                true,
1908            ),
1909        ]);
1910
1911        assert_eq!(result, expected_schema);
1912    }
1913}