datafusion_datasource_parquet/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::sync::Arc;
25
26use arrow::array::RecordBatch;
27use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
30use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer};
31
32use datafusion_datasource::file_format::{
33    FileFormat, FileFormatFactory, FilePushdownSupport,
34};
35use datafusion_datasource::write::demux::DemuxedStreamReceiver;
36
37use arrow::compute::sum;
38use arrow::datatypes::{DataType, Field, FieldRef};
39use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
40use datafusion_common::parsers::CompressionTypeVariant;
41use datafusion_common::stats::Precision;
42use datafusion_common::{
43    internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics,
44    DataFusionError, GetExt, Result, DEFAULT_PARQUET_EXTENSION,
45};
46use datafusion_common::{HashMap, Statistics};
47use datafusion_common_runtime::{JoinSet, SpawnedTask};
48use datafusion_datasource::display::FileGroupDisplay;
49use datafusion_datasource::file::FileSource;
50use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
51use datafusion_datasource::sink::{DataSink, DataSinkExec};
52use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
53use datafusion_execution::{SendableRecordBatchStream, TaskContext};
54use datafusion_expr::dml::InsertOp;
55use datafusion_expr::Expr;
56use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
57use datafusion_physical_expr::PhysicalExpr;
58use datafusion_physical_expr_common::sort_expr::LexRequirement;
59use datafusion_physical_plan::Accumulator;
60use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
61use datafusion_session::Session;
62
63use crate::can_expr_be_pushed_down_with_schemas;
64use crate::source::ParquetSource;
65use async_trait::async_trait;
66use bytes::Bytes;
67use datafusion_datasource::source::DataSourceExec;
68use futures::future::BoxFuture;
69use futures::{FutureExt, StreamExt, TryStreamExt};
70use log::debug;
71use object_store::buffered::BufWriter;
72use object_store::path::Path;
73use object_store::{ObjectMeta, ObjectStore};
74use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
75use parquet::arrow::arrow_writer::{
76    compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
77    ArrowLeafColumn, ArrowWriterOptions,
78};
79use parquet::arrow::async_reader::MetadataFetch;
80use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter};
81use parquet::basic::Type;
82use parquet::errors::ParquetError;
83use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
84use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
85use parquet::file::writer::SerializedFileWriter;
86use parquet::format::FileMetaData;
87use parquet::schema::types::SchemaDescriptor;
88use tokio::io::{AsyncWrite, AsyncWriteExt};
89use tokio::sync::mpsc::{self, Receiver, Sender};
90
91/// Initial writing buffer size. Note this is just a size hint for efficiency. It
92/// will grow beyond the set value if needed.
93const INITIAL_BUFFER_BYTES: usize = 1048576;
94
95/// When writing parquet files in parallel, if the buffered Parquet data exceeds
96/// this size, it is flushed to object store
97const BUFFER_FLUSH_BYTES: usize = 1024000;
98
99#[derive(Default)]
100/// Factory struct used to create [ParquetFormat]
101pub struct ParquetFormatFactory {
102    /// inner options for parquet
103    pub options: Option<TableParquetOptions>,
104}
105
106impl ParquetFormatFactory {
107    /// Creates an instance of [ParquetFormatFactory]
108    pub fn new() -> Self {
109        Self { options: None }
110    }
111
112    /// Creates an instance of [ParquetFormatFactory] with customized default options
113    pub fn new_with_options(options: TableParquetOptions) -> Self {
114        Self {
115            options: Some(options),
116        }
117    }
118}
119
120impl FileFormatFactory for ParquetFormatFactory {
121    fn create(
122        &self,
123        state: &dyn Session,
124        format_options: &std::collections::HashMap<String, String>,
125    ) -> Result<Arc<dyn FileFormat>> {
126        let parquet_options = match &self.options {
127            None => {
128                let mut table_options = state.default_table_options();
129                table_options.set_config_format(ConfigFileType::PARQUET);
130                table_options.alter_with_string_hash_map(format_options)?;
131                table_options.parquet
132            }
133            Some(parquet_options) => {
134                let mut parquet_options = parquet_options.clone();
135                for (k, v) in format_options {
136                    parquet_options.set(k, v)?;
137                }
138                parquet_options
139            }
140        };
141
142        Ok(Arc::new(
143            ParquetFormat::default().with_options(parquet_options),
144        ))
145    }
146
147    fn default(&self) -> Arc<dyn FileFormat> {
148        Arc::new(ParquetFormat::default())
149    }
150
151    fn as_any(&self) -> &dyn Any {
152        self
153    }
154}
155
156impl GetExt for ParquetFormatFactory {
157    fn get_ext(&self) -> String {
158        // Removes the dot, i.e. ".parquet" -> "parquet"
159        DEFAULT_PARQUET_EXTENSION[1..].to_string()
160    }
161}
162
163impl Debug for ParquetFormatFactory {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        f.debug_struct("ParquetFormatFactory")
166            .field("ParquetFormatFactory", &self.options)
167            .finish()
168    }
169}
170/// The Apache Parquet `FileFormat` implementation
171#[derive(Debug, Default)]
172pub struct ParquetFormat {
173    options: TableParquetOptions,
174}
175
176impl ParquetFormat {
177    /// Construct a new Format with no local overrides
178    pub fn new() -> Self {
179        Self::default()
180    }
181
182    /// Activate statistics based row group level pruning
183    /// - If `None`, defaults to value on `config_options`
184    pub fn with_enable_pruning(mut self, enable: bool) -> Self {
185        self.options.global.pruning = enable;
186        self
187    }
188
189    /// Return `true` if pruning is enabled
190    pub fn enable_pruning(&self) -> bool {
191        self.options.global.pruning
192    }
193
194    /// Provide a hint to the size of the file metadata. If a hint is provided
195    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
196    /// Without a hint, two read are required. One read to fetch the 8-byte parquet footer and then
197    /// another read to fetch the metadata length encoded in the footer.
198    ///
199    /// - If `None`, defaults to value on `config_options`
200    pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
201        self.options.global.metadata_size_hint = size_hint;
202        self
203    }
204
205    /// Return the metadata size hint if set
206    pub fn metadata_size_hint(&self) -> Option<usize> {
207        self.options.global.metadata_size_hint
208    }
209
210    /// Tell the parquet reader to skip any metadata that may be in
211    /// the file Schema. This can help avoid schema conflicts due to
212    /// metadata.
213    ///
214    /// - If `None`, defaults to value on `config_options`
215    pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
216        self.options.global.skip_metadata = skip_metadata;
217        self
218    }
219
220    /// Returns `true` if schema metadata will be cleared prior to
221    /// schema merging.
222    pub fn skip_metadata(&self) -> bool {
223        self.options.global.skip_metadata
224    }
225
226    /// Set Parquet options for the ParquetFormat
227    pub fn with_options(mut self, options: TableParquetOptions) -> Self {
228        self.options = options;
229        self
230    }
231
232    /// Parquet options
233    pub fn options(&self) -> &TableParquetOptions {
234        &self.options
235    }
236
237    /// Return `true` if should use view types.
238    ///
239    /// If this returns true, DataFusion will instruct the parquet reader
240    /// to read string / binary columns using view `StringView` or `BinaryView`
241    /// if the table schema specifies those types, regardless of any embedded metadata
242    /// that may specify an alternate Arrow type. The parquet reader is optimized
243    /// for reading `StringView` and `BinaryView` and such queries are significantly faster.
244    ///
245    /// If this returns false, the parquet reader will read the columns according to the
246    /// defaults or any embedded Arrow type information. This may result in reading
247    /// `StringArrays` and then casting to `StringViewArray` which is less efficient.
248    pub fn force_view_types(&self) -> bool {
249        self.options.global.schema_force_view_types
250    }
251
252    /// If true, will use view types. See [`Self::force_view_types`] for details
253    pub fn with_force_view_types(mut self, use_views: bool) -> Self {
254        self.options.global.schema_force_view_types = use_views;
255        self
256    }
257
258    /// Return `true` if binary types will be read as strings.
259    ///
260    /// If this returns true, DataFusion will instruct the parquet reader
261    /// to read binary columns such as `Binary` or `BinaryView` as the
262    /// corresponding string type such as `Utf8` or `LargeUtf8`.
263    /// The parquet reader has special optimizations for `Utf8` and `LargeUtf8`
264    /// validation, and such queries are significantly faster than reading
265    /// binary columns and then casting to string columns.
266    pub fn binary_as_string(&self) -> bool {
267        self.options.global.binary_as_string
268    }
269
270    /// If true, will read binary types as strings. See [`Self::binary_as_string`] for details
271    pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
272        self.options.global.binary_as_string = binary_as_string;
273        self
274    }
275
276    pub fn coerce_int96(&self) -> Option<String> {
277        self.options.global.coerce_int96.clone()
278    }
279
280    pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
281        self.options.global.coerce_int96 = time_unit;
282        self
283    }
284}
285
286/// Clears all metadata (Schema level and field level) on an iterator
287/// of Schemas
288fn clear_metadata(
289    schemas: impl IntoIterator<Item = Schema>,
290) -> impl Iterator<Item = Schema> {
291    schemas.into_iter().map(|schema| {
292        let fields = schema
293            .fields()
294            .iter()
295            .map(|field| {
296                field.as_ref().clone().with_metadata(Default::default()) // clear meta
297            })
298            .collect::<Fields>();
299        Schema::new(fields)
300    })
301}
302
303async fn fetch_schema_with_location(
304    store: &dyn ObjectStore,
305    file: &ObjectMeta,
306    metadata_size_hint: Option<usize>,
307) -> Result<(Path, Schema)> {
308    let loc_path = file.location.clone();
309    let schema = fetch_schema(store, file, metadata_size_hint).await?;
310    Ok((loc_path, schema))
311}
312
313#[async_trait]
314impl FileFormat for ParquetFormat {
315    fn as_any(&self) -> &dyn Any {
316        self
317    }
318
319    fn get_ext(&self) -> String {
320        ParquetFormatFactory::new().get_ext()
321    }
322
323    fn get_ext_with_compression(
324        &self,
325        file_compression_type: &FileCompressionType,
326    ) -> Result<String> {
327        let ext = self.get_ext();
328        match file_compression_type.get_variant() {
329            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
330            _ => internal_err!("Parquet FileFormat does not support compression."),
331        }
332    }
333
334    async fn infer_schema(
335        &self,
336        state: &dyn Session,
337        store: &Arc<dyn ObjectStore>,
338        objects: &[ObjectMeta],
339    ) -> Result<SchemaRef> {
340        let mut schemas: Vec<_> = futures::stream::iter(objects)
341            .map(|object| {
342                fetch_schema_with_location(
343                    store.as_ref(),
344                    object,
345                    self.metadata_size_hint(),
346                )
347            })
348            .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
349            .buffered(state.config_options().execution.meta_fetch_concurrency)
350            .try_collect()
351            .await?;
352
353        // Schema inference adds fields based the order they are seen
354        // which depends on the order the files are processed. For some
355        // object stores (like local file systems) the order returned from list
356        // is not deterministic. Thus, to ensure deterministic schema inference
357        // sort the files first.
358        // https://github.com/apache/datafusion/pull/6629
359        schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
360
361        let schemas = schemas
362            .into_iter()
363            .map(|(_, schema)| schema)
364            .collect::<Vec<_>>();
365
366        let schema = if self.skip_metadata() {
367            Schema::try_merge(clear_metadata(schemas))
368        } else {
369            Schema::try_merge(schemas)
370        }?;
371
372        let schema = if self.binary_as_string() {
373            transform_binary_to_string(&schema)
374        } else {
375            schema
376        };
377
378        let schema = if self.force_view_types() {
379            transform_schema_to_view(&schema)
380        } else {
381            schema
382        };
383
384        Ok(Arc::new(schema))
385    }
386
387    async fn infer_stats(
388        &self,
389        _state: &dyn Session,
390        store: &Arc<dyn ObjectStore>,
391        table_schema: SchemaRef,
392        object: &ObjectMeta,
393    ) -> Result<Statistics> {
394        let stats = fetch_statistics(
395            store.as_ref(),
396            table_schema,
397            object,
398            self.metadata_size_hint(),
399        )
400        .await?;
401        Ok(stats)
402    }
403
404    async fn create_physical_plan(
405        &self,
406        _state: &dyn Session,
407        conf: FileScanConfig,
408        filters: Option<&Arc<dyn PhysicalExpr>>,
409    ) -> Result<Arc<dyn ExecutionPlan>> {
410        let mut predicate = None;
411        let mut metadata_size_hint = None;
412
413        // If enable pruning then combine the filters to build the predicate.
414        // If disable pruning then set the predicate to None, thus readers
415        // will not prune data based on the statistics.
416        if self.enable_pruning() {
417            if let Some(pred) = filters.cloned() {
418                predicate = Some(pred);
419            }
420        }
421        if let Some(metadata) = self.metadata_size_hint() {
422            metadata_size_hint = Some(metadata);
423        }
424
425        let mut source = ParquetSource::new(self.options.clone());
426
427        if let Some(predicate) = predicate {
428            source = source.with_predicate(Arc::clone(&conf.file_schema), predicate);
429        }
430        if let Some(metadata_size_hint) = metadata_size_hint {
431            source = source.with_metadata_size_hint(metadata_size_hint)
432        }
433
434        let conf = FileScanConfigBuilder::from(conf)
435            .with_source(Arc::new(source))
436            .build();
437        Ok(DataSourceExec::from_data_source(conf))
438    }
439
440    async fn create_writer_physical_plan(
441        &self,
442        input: Arc<dyn ExecutionPlan>,
443        _state: &dyn Session,
444        conf: FileSinkConfig,
445        order_requirements: Option<LexRequirement>,
446    ) -> Result<Arc<dyn ExecutionPlan>> {
447        if conf.insert_op != InsertOp::Append {
448            return not_impl_err!("Overwrites are not implemented yet for Parquet");
449        }
450
451        let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
452
453        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
454    }
455
456    fn supports_filters_pushdown(
457        &self,
458        file_schema: &Schema,
459        table_schema: &Schema,
460        filters: &[&Expr],
461    ) -> Result<FilePushdownSupport> {
462        if !self.options().global.pushdown_filters {
463            return Ok(FilePushdownSupport::NoSupport);
464        }
465
466        let all_supported = filters.iter().all(|filter| {
467            can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema)
468        });
469
470        Ok(if all_supported {
471            FilePushdownSupport::Supported
472        } else {
473            FilePushdownSupport::NotSupportedForFilter
474        })
475    }
476
477    fn file_source(&self) -> Arc<dyn FileSource> {
478        Arc::new(ParquetSource::default())
479    }
480}
481
482/// Apply necessary schema type coercions to make file schema match table schema.
483///
484/// This function performs two main types of transformations in a single pass:
485/// 1. Binary types to string types conversion - Converts binary data types to their
486///    corresponding string types when the table schema expects string data
487/// 2. Regular to view types conversion - Converts standard string/binary types to
488///    view types when the table schema uses view types
489///
490/// # Arguments
491/// * `table_schema` - The table schema containing the desired types
492/// * `file_schema` - The file schema to be transformed
493///
494/// # Returns
495/// * `Some(Schema)` - If any transformations were applied, returns the transformed schema
496/// * `None` - If no transformations were needed
497pub fn apply_file_schema_type_coercions(
498    table_schema: &Schema,
499    file_schema: &Schema,
500) -> Option<Schema> {
501    let mut needs_view_transform = false;
502    let mut needs_string_transform = false;
503
504    // Create a mapping of table field names to their data types for fast lookup
505    // and simultaneously check if we need any transformations
506    let table_fields: HashMap<_, _> = table_schema
507        .fields()
508        .iter()
509        .map(|f| {
510            let dt = f.data_type();
511            // Check if we need view type transformation
512            if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
513                needs_view_transform = true;
514            }
515            // Check if we need string type transformation
516            if matches!(
517                dt,
518                &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
519            ) {
520                needs_string_transform = true;
521            }
522
523            (f.name(), dt)
524        })
525        .collect();
526
527    // Early return if no transformation needed
528    if !needs_view_transform && !needs_string_transform {
529        return None;
530    }
531
532    let transformed_fields: Vec<Arc<Field>> = file_schema
533        .fields()
534        .iter()
535        .map(|field| {
536            let field_name = field.name();
537            let field_type = field.data_type();
538
539            // Look up the corresponding field type in the table schema
540            if let Some(table_type) = table_fields.get(field_name) {
541                match (table_type, field_type) {
542                    // table schema uses string type, coerce the file schema to use string type
543                    (
544                        &DataType::Utf8,
545                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
546                    ) => {
547                        return field_with_new_type(field, DataType::Utf8);
548                    }
549                    // table schema uses large string type, coerce the file schema to use large string type
550                    (
551                        &DataType::LargeUtf8,
552                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
553                    ) => {
554                        return field_with_new_type(field, DataType::LargeUtf8);
555                    }
556                    // table schema uses string view type, coerce the file schema to use view type
557                    (
558                        &DataType::Utf8View,
559                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
560                    ) => {
561                        return field_with_new_type(field, DataType::Utf8View);
562                    }
563                    // Handle view type conversions
564                    (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
565                        return field_with_new_type(field, DataType::Utf8View);
566                    }
567                    (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
568                        return field_with_new_type(field, DataType::BinaryView);
569                    }
570                    _ => {}
571                }
572            }
573
574            // If no transformation is needed, keep the original field
575            Arc::clone(field)
576        })
577        .collect();
578
579    Some(Schema::new_with_metadata(
580        transformed_fields,
581        file_schema.metadata.clone(),
582    ))
583}
584
585/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96.
586pub fn coerce_int96_to_resolution(
587    parquet_schema: &SchemaDescriptor,
588    file_schema: &Schema,
589    time_unit: &TimeUnit,
590) -> Option<Schema> {
591    let mut transform = false;
592    let parquet_fields: HashMap<_, _> = parquet_schema
593        .columns()
594        .iter()
595        .map(|f| {
596            let dt = f.physical_type();
597            if dt.eq(&Type::INT96) {
598                transform = true;
599            }
600            (f.name(), dt)
601        })
602        .collect();
603
604    if !transform {
605        return None;
606    }
607
608    let transformed_fields: Vec<Arc<Field>> = file_schema
609        .fields
610        .iter()
611        .map(|field| match parquet_fields.get(field.name().as_str()) {
612            Some(Type::INT96) => {
613                field_with_new_type(field, DataType::Timestamp(*time_unit, None))
614            }
615            _ => Arc::clone(field),
616        })
617        .collect();
618
619    Some(Schema::new_with_metadata(
620        transformed_fields,
621        file_schema.metadata.clone(),
622    ))
623}
624
625/// Coerces the file schema if the table schema uses a view type.
626#[deprecated(
627    since = "47.0.0",
628    note = "Use `apply_file_schema_type_coercions` instead"
629)]
630pub fn coerce_file_schema_to_view_type(
631    table_schema: &Schema,
632    file_schema: &Schema,
633) -> Option<Schema> {
634    let mut transform = false;
635    let table_fields: HashMap<_, _> = table_schema
636        .fields
637        .iter()
638        .map(|f| {
639            let dt = f.data_type();
640            if dt.equals_datatype(&DataType::Utf8View)
641                || dt.equals_datatype(&DataType::BinaryView)
642            {
643                transform = true;
644            }
645            (f.name(), dt)
646        })
647        .collect();
648
649    if !transform {
650        return None;
651    }
652
653    let transformed_fields: Vec<Arc<Field>> = file_schema
654        .fields
655        .iter()
656        .map(
657            |field| match (table_fields.get(field.name()), field.data_type()) {
658                (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
659                    field_with_new_type(field, DataType::Utf8View)
660                }
661                (
662                    Some(DataType::BinaryView),
663                    DataType::Binary | DataType::LargeBinary,
664                ) => field_with_new_type(field, DataType::BinaryView),
665                _ => Arc::clone(field),
666            },
667        )
668        .collect();
669
670    Some(Schema::new_with_metadata(
671        transformed_fields,
672        file_schema.metadata.clone(),
673    ))
674}
675
676/// If the table schema uses a string type, coerce the file schema to use a string type.
677///
678/// See [ParquetFormat::binary_as_string] for details
679#[deprecated(
680    since = "47.0.0",
681    note = "Use `apply_file_schema_type_coercions` instead"
682)]
683pub fn coerce_file_schema_to_string_type(
684    table_schema: &Schema,
685    file_schema: &Schema,
686) -> Option<Schema> {
687    let mut transform = false;
688    let table_fields: HashMap<_, _> = table_schema
689        .fields
690        .iter()
691        .map(|f| (f.name(), f.data_type()))
692        .collect();
693    let transformed_fields: Vec<Arc<Field>> = file_schema
694        .fields
695        .iter()
696        .map(
697            |field| match (table_fields.get(field.name()), field.data_type()) {
698                // table schema uses string type, coerce the file schema to use string type
699                (
700                    Some(DataType::Utf8),
701                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
702                ) => {
703                    transform = true;
704                    field_with_new_type(field, DataType::Utf8)
705                }
706                // table schema uses large string type, coerce the file schema to use large string type
707                (
708                    Some(DataType::LargeUtf8),
709                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
710                ) => {
711                    transform = true;
712                    field_with_new_type(field, DataType::LargeUtf8)
713                }
714                // table schema uses string view type, coerce the file schema to use view type
715                (
716                    Some(DataType::Utf8View),
717                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
718                ) => {
719                    transform = true;
720                    field_with_new_type(field, DataType::Utf8View)
721                }
722                _ => Arc::clone(field),
723            },
724        )
725        .collect();
726
727    if !transform {
728        None
729    } else {
730        Some(Schema::new_with_metadata(
731            transformed_fields,
732            file_schema.metadata.clone(),
733        ))
734    }
735}
736
737/// Create a new field with the specified data type, copying the other
738/// properties from the input field
739fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
740    Arc::new(field.as_ref().clone().with_data_type(new_type))
741}
742
743/// Transform a schema to use view types for Utf8 and Binary
744///
745/// See [ParquetFormat::force_view_types] for details
746pub fn transform_schema_to_view(schema: &Schema) -> Schema {
747    let transformed_fields: Vec<Arc<Field>> = schema
748        .fields
749        .iter()
750        .map(|field| match field.data_type() {
751            DataType::Utf8 | DataType::LargeUtf8 => {
752                field_with_new_type(field, DataType::Utf8View)
753            }
754            DataType::Binary | DataType::LargeBinary => {
755                field_with_new_type(field, DataType::BinaryView)
756            }
757            _ => Arc::clone(field),
758        })
759        .collect();
760    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
761}
762
763/// Transform a schema so that any binary types are strings
764pub fn transform_binary_to_string(schema: &Schema) -> Schema {
765    let transformed_fields: Vec<Arc<Field>> = schema
766        .fields
767        .iter()
768        .map(|field| match field.data_type() {
769            DataType::Binary => field_with_new_type(field, DataType::Utf8),
770            DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
771            DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
772            _ => Arc::clone(field),
773        })
774        .collect();
775    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
776}
777
778/// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`]
779struct ObjectStoreFetch<'a> {
780    store: &'a dyn ObjectStore,
781    meta: &'a ObjectMeta,
782}
783
784impl<'a> ObjectStoreFetch<'a> {
785    fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
786        Self { store, meta }
787    }
788}
789
790impl MetadataFetch for ObjectStoreFetch<'_> {
791    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
792        async {
793            self.store
794                .get_range(&self.meta.location, range)
795                .await
796                .map_err(ParquetError::from)
797        }
798        .boxed()
799    }
800}
801
802/// Fetches parquet metadata from ObjectStore for given object
803///
804/// This component is a subject to **change** in near future and is exposed for low level integrations
805/// through [`ParquetFileReaderFactory`].
806///
807/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory
808pub async fn fetch_parquet_metadata(
809    store: &dyn ObjectStore,
810    meta: &ObjectMeta,
811    size_hint: Option<usize>,
812) -> Result<ParquetMetaData> {
813    let file_size = meta.size;
814    let fetch = ObjectStoreFetch::new(store, meta);
815
816    ParquetMetaDataReader::new()
817        .with_prefetch_hint(size_hint)
818        .load_and_finish(fetch, file_size)
819        .await
820        .map_err(DataFusionError::from)
821}
822
823/// Read and parse the schema of the Parquet file at location `path`
824async fn fetch_schema(
825    store: &dyn ObjectStore,
826    file: &ObjectMeta,
827    metadata_size_hint: Option<usize>,
828) -> Result<Schema> {
829    let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
830    let file_metadata = metadata.file_metadata();
831    let schema = parquet_to_arrow_schema(
832        file_metadata.schema_descr(),
833        file_metadata.key_value_metadata(),
834    )?;
835    Ok(schema)
836}
837
838/// Read and parse the statistics of the Parquet file at location `path`
839///
840/// See [`statistics_from_parquet_meta_calc`] for more details
841pub async fn fetch_statistics(
842    store: &dyn ObjectStore,
843    table_schema: SchemaRef,
844    file: &ObjectMeta,
845    metadata_size_hint: Option<usize>,
846) -> Result<Statistics> {
847    let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
848    statistics_from_parquet_meta_calc(&metadata, table_schema)
849}
850
851/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using [`StatisticsConverter`]
852///
853/// The statistics are calculated for each column in the table schema
854/// using the row group statistics in the parquet metadata.
855///
856/// # Key behaviors:
857///
858/// 1. Extracts row counts and byte sizes from all row groups
859/// 2. Applies schema type coercions to align file schema with table schema
860/// 3. Collects and aggregates statistics across row groups when available
861///
862/// # When there are no statistics:
863///
864/// If the Parquet file doesn't contain any statistics (has_statistics is false), the function returns a Statistics object with:
865/// - Exact row count
866/// - Exact byte size
867/// - All column statistics marked as unknown via Statistics::unknown_column(&table_schema)
868/// # When only some columns have statistics:
869///
870/// For columns with statistics:
871/// - Min/max values are properly extracted and represented as Precision::Exact
872/// - Null counts are calculated by summing across row groups
873///
874/// For columns without statistics,
875/// - For min/max, there are two situations:
876///     1. The column isn't in arrow schema, then min/max values are set to Precision::Absent
877///     2. The column is in arrow schema, but not in parquet schema due to schema revolution, min/max values are set to Precision::Exact(null)
878/// - Null counts are set to Precision::Exact(num_rows) (conservatively assuming all values could be null)
879pub fn statistics_from_parquet_meta_calc(
880    metadata: &ParquetMetaData,
881    table_schema: SchemaRef,
882) -> Result<Statistics> {
883    let row_groups_metadata = metadata.row_groups();
884
885    let mut statistics = Statistics::new_unknown(&table_schema);
886    let mut has_statistics = false;
887    let mut num_rows = 0_usize;
888    let mut total_byte_size = 0_usize;
889    for row_group_meta in row_groups_metadata {
890        num_rows += row_group_meta.num_rows() as usize;
891        total_byte_size += row_group_meta.total_byte_size() as usize;
892
893        if !has_statistics {
894            has_statistics = row_group_meta
895                .columns()
896                .iter()
897                .any(|column| column.statistics().is_some());
898        }
899    }
900    statistics.num_rows = Precision::Exact(num_rows);
901    statistics.total_byte_size = Precision::Exact(total_byte_size);
902
903    let file_metadata = metadata.file_metadata();
904    let mut file_schema = parquet_to_arrow_schema(
905        file_metadata.schema_descr(),
906        file_metadata.key_value_metadata(),
907    )?;
908
909    if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &file_schema) {
910        file_schema = merged;
911    }
912
913    statistics.column_statistics = if has_statistics {
914        let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
915        let mut null_counts_array =
916            vec![Precision::Exact(0); table_schema.fields().len()];
917
918        table_schema
919            .fields()
920            .iter()
921            .enumerate()
922            .for_each(|(idx, field)| {
923                match StatisticsConverter::try_new(
924                    field.name(),
925                    &file_schema,
926                    file_metadata.schema_descr(),
927                ) {
928                    Ok(stats_converter) => {
929                        summarize_min_max_null_counts(
930                            &mut min_accs,
931                            &mut max_accs,
932                            &mut null_counts_array,
933                            idx,
934                            num_rows,
935                            &stats_converter,
936                            row_groups_metadata,
937                        )
938                        .ok();
939                    }
940                    Err(e) => {
941                        debug!("Failed to create statistics converter: {}", e);
942                        null_counts_array[idx] = Precision::Exact(num_rows);
943                    }
944                }
945            });
946
947        get_col_stats(
948            &table_schema,
949            null_counts_array,
950            &mut max_accs,
951            &mut min_accs,
952        )
953    } else {
954        Statistics::unknown_column(&table_schema)
955    };
956
957    Ok(statistics)
958}
959
960fn get_col_stats(
961    schema: &Schema,
962    null_counts: Vec<Precision<usize>>,
963    max_values: &mut [Option<MaxAccumulator>],
964    min_values: &mut [Option<MinAccumulator>],
965) -> Vec<ColumnStatistics> {
966    (0..schema.fields().len())
967        .map(|i| {
968            let max_value = match max_values.get_mut(i).unwrap() {
969                Some(max_value) => max_value.evaluate().ok(),
970                None => None,
971            };
972            let min_value = match min_values.get_mut(i).unwrap() {
973                Some(min_value) => min_value.evaluate().ok(),
974                None => None,
975            };
976            ColumnStatistics {
977                null_count: null_counts[i],
978                max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent),
979                min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent),
980                sum_value: Precision::Absent,
981                distinct_count: Precision::Absent,
982            }
983        })
984        .collect()
985}
986
987fn summarize_min_max_null_counts(
988    min_accs: &mut [Option<MinAccumulator>],
989    max_accs: &mut [Option<MaxAccumulator>],
990    null_counts_array: &mut [Precision<usize>],
991    arrow_schema_index: usize,
992    num_rows: usize,
993    stats_converter: &StatisticsConverter,
994    row_groups_metadata: &[RowGroupMetaData],
995) -> Result<()> {
996    let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
997    let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
998    let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
999
1000    if let Some(max_acc) = &mut max_accs[arrow_schema_index] {
1001        max_acc.update_batch(&[max_values])?;
1002    }
1003
1004    if let Some(min_acc) = &mut min_accs[arrow_schema_index] {
1005        min_acc.update_batch(&[min_values])?;
1006    }
1007
1008    null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) {
1009        Some(null_count) => null_count as usize,
1010        None => num_rows,
1011    });
1012
1013    Ok(())
1014}
1015
1016/// Implements [`DataSink`] for writing to a parquet file.
1017pub struct ParquetSink {
1018    /// Config options for writing data
1019    config: FileSinkConfig,
1020    /// Underlying parquet options
1021    parquet_options: TableParquetOptions,
1022    /// File metadata from successfully produced parquet files. The Mutex is only used
1023    /// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
1024    written: Arc<parking_lot::Mutex<HashMap<Path, FileMetaData>>>,
1025}
1026
1027impl Debug for ParquetSink {
1028    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1029        f.debug_struct("ParquetSink").finish()
1030    }
1031}
1032
1033impl DisplayAs for ParquetSink {
1034    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1035        match t {
1036            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1037                write!(f, "ParquetSink(file_groups=",)?;
1038                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1039                write!(f, ")")
1040            }
1041            DisplayFormatType::TreeRender => {
1042                // TODO: collect info
1043                write!(f, "")
1044            }
1045        }
1046    }
1047}
1048
1049impl ParquetSink {
1050    /// Create from config.
1051    pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1052        Self {
1053            config,
1054            parquet_options,
1055            written: Default::default(),
1056        }
1057    }
1058
1059    /// Retrieve the file metadata for the written files, keyed to the path
1060    /// which may be partitioned (in the case of hive style partitioning).
1061    pub fn written(&self) -> HashMap<Path, FileMetaData> {
1062        self.written.lock().clone()
1063    }
1064
1065    /// Create writer properties based upon configuration settings,
1066    /// including partitioning and the inclusion of arrow schema metadata.
1067    fn create_writer_props(&self) -> Result<WriterProperties> {
1068        let schema = if self.parquet_options.global.allow_single_file_parallelism {
1069            // If parallelizing writes, we may be also be doing hive style partitioning
1070            // into multiple files which impacts the schema per file.
1071            // Refer to `get_writer_schema()`
1072            &get_writer_schema(&self.config)
1073        } else {
1074            self.config.output_schema()
1075        };
1076
1077        // TODO: avoid this clone in follow up PR, where the writer properties & schema
1078        // are calculated once on `ParquetSink::new`
1079        let mut parquet_opts = self.parquet_options.clone();
1080        if !self.parquet_options.global.skip_arrow_metadata {
1081            parquet_opts.arrow_schema(schema);
1082        }
1083
1084        Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build())
1085    }
1086
1087    /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
1088    /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
1089    async fn create_async_arrow_writer(
1090        &self,
1091        location: &Path,
1092        object_store: Arc<dyn ObjectStore>,
1093        parquet_props: WriterProperties,
1094    ) -> Result<AsyncArrowWriter<BufWriter>> {
1095        let buf_writer = BufWriter::new(object_store, location.clone());
1096        let options = ArrowWriterOptions::new()
1097            .with_properties(parquet_props)
1098            .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1099
1100        let writer = AsyncArrowWriter::try_new_with_options(
1101            buf_writer,
1102            get_writer_schema(&self.config),
1103            options,
1104        )?;
1105        Ok(writer)
1106    }
1107
1108    /// Parquet options
1109    pub fn parquet_options(&self) -> &TableParquetOptions {
1110        &self.parquet_options
1111    }
1112}
1113
1114#[async_trait]
1115impl FileSink for ParquetSink {
1116    fn config(&self) -> &FileSinkConfig {
1117        &self.config
1118    }
1119
1120    async fn spawn_writer_tasks_and_join(
1121        &self,
1122        context: &Arc<TaskContext>,
1123        demux_task: SpawnedTask<Result<()>>,
1124        mut file_stream_rx: DemuxedStreamReceiver,
1125        object_store: Arc<dyn ObjectStore>,
1126    ) -> Result<u64> {
1127        let parquet_opts = &self.parquet_options;
1128        let allow_single_file_parallelism =
1129            parquet_opts.global.allow_single_file_parallelism;
1130
1131        let mut file_write_tasks: JoinSet<
1132            std::result::Result<(Path, FileMetaData), DataFusionError>,
1133        > = JoinSet::new();
1134
1135        let parquet_props = self.create_writer_props()?;
1136        let parallel_options = ParallelParquetWriterOptions {
1137            max_parallel_row_groups: parquet_opts
1138                .global
1139                .maximum_parallel_row_group_writers,
1140            max_buffered_record_batches_per_stream: parquet_opts
1141                .global
1142                .maximum_buffered_record_batches_per_stream,
1143        };
1144
1145        while let Some((path, mut rx)) = file_stream_rx.recv().await {
1146            if !allow_single_file_parallelism {
1147                let mut writer = self
1148                    .create_async_arrow_writer(
1149                        &path,
1150                        Arc::clone(&object_store),
1151                        parquet_props.clone(),
1152                    )
1153                    .await?;
1154                let mut reservation =
1155                    MemoryConsumer::new(format!("ParquetSink[{}]", path))
1156                        .register(context.memory_pool());
1157                file_write_tasks.spawn(async move {
1158                    while let Some(batch) = rx.recv().await {
1159                        writer.write(&batch).await?;
1160                        reservation.try_resize(writer.memory_size())?;
1161                    }
1162                    let file_metadata = writer
1163                        .close()
1164                        .await
1165                        .map_err(DataFusionError::ParquetError)?;
1166                    Ok((path, file_metadata))
1167                });
1168            } else {
1169                let writer = create_writer(
1170                    // Parquet files as a whole are never compressed, since they
1171                    // manage compressed blocks themselves.
1172                    FileCompressionType::UNCOMPRESSED,
1173                    &path,
1174                    Arc::clone(&object_store),
1175                )
1176                .await?;
1177                let schema = get_writer_schema(&self.config);
1178                let props = parquet_props.clone();
1179                let parallel_options_clone = parallel_options.clone();
1180                let pool = Arc::clone(context.memory_pool());
1181                file_write_tasks.spawn(async move {
1182                    let file_metadata = output_single_parquet_file_parallelized(
1183                        writer,
1184                        rx,
1185                        schema,
1186                        &props,
1187                        parallel_options_clone,
1188                        pool,
1189                    )
1190                    .await?;
1191                    Ok((path, file_metadata))
1192                });
1193            }
1194        }
1195
1196        let mut row_count = 0;
1197        while let Some(result) = file_write_tasks.join_next().await {
1198            match result {
1199                Ok(r) => {
1200                    let (path, file_metadata) = r?;
1201                    row_count += file_metadata.num_rows;
1202                    let mut written_files = self.written.lock();
1203                    written_files
1204                        .try_insert(path.clone(), file_metadata)
1205                        .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1206                    drop(written_files);
1207                }
1208                Err(e) => {
1209                    if e.is_panic() {
1210                        std::panic::resume_unwind(e.into_panic());
1211                    } else {
1212                        unreachable!();
1213                    }
1214                }
1215            }
1216        }
1217
1218        demux_task
1219            .join_unwind()
1220            .await
1221            .map_err(DataFusionError::ExecutionJoin)??;
1222
1223        Ok(row_count as u64)
1224    }
1225}
1226
1227#[async_trait]
1228impl DataSink for ParquetSink {
1229    fn as_any(&self) -> &dyn Any {
1230        self
1231    }
1232
1233    fn schema(&self) -> &SchemaRef {
1234        self.config.output_schema()
1235    }
1236
1237    async fn write_all(
1238        &self,
1239        data: SendableRecordBatchStream,
1240        context: &Arc<TaskContext>,
1241    ) -> Result<u64> {
1242        FileSink::write_all(self, data, context).await
1243    }
1244}
1245
1246/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them using an [ArrowColumnWriter]
1247/// Once the channel is exhausted, returns the ArrowColumnWriter.
1248async fn column_serializer_task(
1249    mut rx: Receiver<ArrowLeafColumn>,
1250    mut writer: ArrowColumnWriter,
1251    mut reservation: MemoryReservation,
1252) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1253    while let Some(col) = rx.recv().await {
1254        writer.write(&col)?;
1255        reservation.try_resize(writer.memory_size())?;
1256    }
1257    Ok((writer, reservation))
1258}
1259
1260type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1261type ColSender = Sender<ArrowLeafColumn>;
1262
1263/// Spawns a parallel serialization task for each column
1264/// Returns join handles for each columns serialization task along with a send channel
1265/// to send arrow arrays to each serialization task.
1266fn spawn_column_parallel_row_group_writer(
1267    schema: Arc<Schema>,
1268    parquet_props: Arc<WriterProperties>,
1269    max_buffer_size: usize,
1270    pool: &Arc<dyn MemoryPool>,
1271) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1272    let schema_desc = ArrowSchemaConverter::new().convert(&schema)?;
1273    let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
1274    let num_columns = col_writers.len();
1275
1276    let mut col_writer_tasks = Vec::with_capacity(num_columns);
1277    let mut col_array_channels = Vec::with_capacity(num_columns);
1278    for writer in col_writers.into_iter() {
1279        // Buffer size of this channel limits the number of arrays queued up for column level serialization
1280        let (send_array, receive_array) =
1281            mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1282        col_array_channels.push(send_array);
1283
1284        let reservation =
1285            MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1286        let task = SpawnedTask::spawn(column_serializer_task(
1287            receive_array,
1288            writer,
1289            reservation,
1290        ));
1291        col_writer_tasks.push(task);
1292    }
1293
1294    Ok((col_writer_tasks, col_array_channels))
1295}
1296
1297/// Settings related to writing parquet files in parallel
1298#[derive(Clone)]
1299struct ParallelParquetWriterOptions {
1300    max_parallel_row_groups: usize,
1301    max_buffered_record_batches_per_stream: usize,
1302}
1303
1304/// This is the return type of calling [ArrowColumnWriter].close() on each column
1305/// i.e. the Vec of encoded columns which can be appended to a row group
1306type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1307
1308/// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective
1309/// parallel column serializers.
1310async fn send_arrays_to_col_writers(
1311    col_array_channels: &[ColSender],
1312    rb: &RecordBatch,
1313    schema: Arc<Schema>,
1314) -> Result<()> {
1315    // Each leaf column has its own channel, increment next_channel for each leaf column sent.
1316    let mut next_channel = 0;
1317    for (array, field) in rb.columns().iter().zip(schema.fields()) {
1318        for c in compute_leaves(field, array)? {
1319            // Do not surface error from closed channel (means something
1320            // else hit an error, and the plan is shutting down).
1321            if col_array_channels[next_channel].send(c).await.is_err() {
1322                return Ok(());
1323            }
1324
1325            next_channel += 1;
1326        }
1327    }
1328
1329    Ok(())
1330}
1331
1332/// Spawns a tokio task which joins the parallel column writer tasks,
1333/// and finalizes the row group
1334fn spawn_rg_join_and_finalize_task(
1335    column_writer_tasks: Vec<ColumnWriterTask>,
1336    rg_rows: usize,
1337    pool: &Arc<dyn MemoryPool>,
1338) -> SpawnedTask<RBStreamSerializeResult> {
1339    let mut rg_reservation =
1340        MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1341
1342    SpawnedTask::spawn(async move {
1343        let num_cols = column_writer_tasks.len();
1344        let mut finalized_rg = Vec::with_capacity(num_cols);
1345        for task in column_writer_tasks.into_iter() {
1346            let (writer, _col_reservation) = task
1347                .join_unwind()
1348                .await
1349                .map_err(DataFusionError::ExecutionJoin)??;
1350            let encoded_size = writer.get_estimated_total_bytes();
1351            rg_reservation.grow(encoded_size);
1352            finalized_rg.push(writer.close()?);
1353        }
1354
1355        Ok((finalized_rg, rg_reservation, rg_rows))
1356    })
1357}
1358
1359/// This task coordinates the serialization of a parquet file in parallel.
1360/// As the query produces RecordBatches, these are written to a RowGroup
1361/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per
1362/// row group is reached, the parallel tasks are joined on another separate task
1363/// and sent to a concatenation task. This task immediately continues to work
1364/// on the next row group in parallel. So, parquet serialization is parallelized
1365/// across both columns and row_groups, with a theoretical max number of parallel tasks
1366/// given by n_columns * num_row_groups.
1367fn spawn_parquet_parallel_serialization_task(
1368    mut data: Receiver<RecordBatch>,
1369    serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1370    schema: Arc<Schema>,
1371    writer_props: Arc<WriterProperties>,
1372    parallel_options: ParallelParquetWriterOptions,
1373    pool: Arc<dyn MemoryPool>,
1374) -> SpawnedTask<Result<(), DataFusionError>> {
1375    SpawnedTask::spawn(async move {
1376        let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1377        let max_row_group_rows = writer_props.max_row_group_size();
1378        let (mut column_writer_handles, mut col_array_channels) =
1379            spawn_column_parallel_row_group_writer(
1380                Arc::clone(&schema),
1381                Arc::clone(&writer_props),
1382                max_buffer_rb,
1383                &pool,
1384            )?;
1385        let mut current_rg_rows = 0;
1386
1387        while let Some(mut rb) = data.recv().await {
1388            // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
1389            // when max_row_group_rows < execution.batch_size as an alternative to a recursive async
1390            // function.
1391            loop {
1392                if current_rg_rows + rb.num_rows() < max_row_group_rows {
1393                    send_arrays_to_col_writers(
1394                        &col_array_channels,
1395                        &rb,
1396                        Arc::clone(&schema),
1397                    )
1398                    .await?;
1399                    current_rg_rows += rb.num_rows();
1400                    break;
1401                } else {
1402                    let rows_left = max_row_group_rows - current_rg_rows;
1403                    let a = rb.slice(0, rows_left);
1404                    send_arrays_to_col_writers(
1405                        &col_array_channels,
1406                        &a,
1407                        Arc::clone(&schema),
1408                    )
1409                    .await?;
1410
1411                    // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
1412                    // on a separate task, so that we can immediately start on the next RG before waiting
1413                    // for the current one to finish.
1414                    drop(col_array_channels);
1415                    let finalize_rg_task = spawn_rg_join_and_finalize_task(
1416                        column_writer_handles,
1417                        max_row_group_rows,
1418                        &pool,
1419                    );
1420
1421                    // Do not surface error from closed channel (means something
1422                    // else hit an error, and the plan is shutting down).
1423                    if serialize_tx.send(finalize_rg_task).await.is_err() {
1424                        return Ok(());
1425                    }
1426
1427                    current_rg_rows = 0;
1428                    rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1429
1430                    (column_writer_handles, col_array_channels) =
1431                        spawn_column_parallel_row_group_writer(
1432                            Arc::clone(&schema),
1433                            Arc::clone(&writer_props),
1434                            max_buffer_rb,
1435                            &pool,
1436                        )?;
1437                }
1438            }
1439        }
1440
1441        drop(col_array_channels);
1442        // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows
1443        if current_rg_rows > 0 {
1444            let finalize_rg_task = spawn_rg_join_and_finalize_task(
1445                column_writer_handles,
1446                current_rg_rows,
1447                &pool,
1448            );
1449
1450            // Do not surface error from closed channel (means something
1451            // else hit an error, and the plan is shutting down).
1452            if serialize_tx.send(finalize_rg_task).await.is_err() {
1453                return Ok(());
1454            }
1455        }
1456
1457        Ok(())
1458    })
1459}
1460
1461/// Consume RowGroups serialized by other parallel tasks and concatenate them in
1462/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
1463async fn concatenate_parallel_row_groups(
1464    mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1465    schema: Arc<Schema>,
1466    writer_props: Arc<WriterProperties>,
1467    mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1468    pool: Arc<dyn MemoryPool>,
1469) -> Result<FileMetaData> {
1470    let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1471
1472    let mut file_reservation =
1473        MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1474
1475    let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?;
1476    let mut parquet_writer = SerializedFileWriter::new(
1477        merged_buff.clone(),
1478        schema_desc.root_schema_ptr(),
1479        writer_props,
1480    )?;
1481
1482    while let Some(task) = serialize_rx.recv().await {
1483        let result = task.join_unwind().await;
1484        let mut rg_out = parquet_writer.next_row_group()?;
1485        let (serialized_columns, mut rg_reservation, _cnt) =
1486            result.map_err(DataFusionError::ExecutionJoin)??;
1487        for chunk in serialized_columns {
1488            chunk.append_to_row_group(&mut rg_out)?;
1489            rg_reservation.free();
1490
1491            let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1492            file_reservation.try_resize(buff_to_flush.len())?;
1493
1494            if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1495                object_store_writer
1496                    .write_all(buff_to_flush.as_slice())
1497                    .await?;
1498                buff_to_flush.clear();
1499                file_reservation.try_resize(buff_to_flush.len())?; // will set to zero
1500            }
1501        }
1502        rg_out.close()?;
1503    }
1504
1505    let file_metadata = parquet_writer.close()?;
1506    let final_buff = merged_buff.buffer.try_lock().unwrap();
1507
1508    object_store_writer.write_all(final_buff.as_slice()).await?;
1509    object_store_writer.shutdown().await?;
1510    file_reservation.free();
1511
1512    Ok(file_metadata)
1513}
1514
1515/// Parallelizes the serialization of a single parquet file, by first serializing N
1516/// independent RecordBatch streams in parallel to RowGroups in memory. Another
1517/// task then stitches these independent RowGroups together and streams this large
1518/// single parquet file to an ObjectStore in multiple parts.
1519async fn output_single_parquet_file_parallelized(
1520    object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1521    data: Receiver<RecordBatch>,
1522    output_schema: Arc<Schema>,
1523    parquet_props: &WriterProperties,
1524    parallel_options: ParallelParquetWriterOptions,
1525    pool: Arc<dyn MemoryPool>,
1526) -> Result<FileMetaData> {
1527    let max_rowgroups = parallel_options.max_parallel_row_groups;
1528    // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
1529    let (serialize_tx, serialize_rx) =
1530        mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1531
1532    let arc_props = Arc::new(parquet_props.clone());
1533    let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1534        data,
1535        serialize_tx,
1536        Arc::clone(&output_schema),
1537        Arc::clone(&arc_props),
1538        parallel_options,
1539        Arc::clone(&pool),
1540    );
1541    let file_metadata = concatenate_parallel_row_groups(
1542        serialize_rx,
1543        Arc::clone(&output_schema),
1544        Arc::clone(&arc_props),
1545        object_store_writer,
1546        pool,
1547    )
1548    .await?;
1549
1550    launch_serialization_task
1551        .join_unwind()
1552        .await
1553        .map_err(DataFusionError::ExecutionJoin)??;
1554    Ok(file_metadata)
1555}
1556
1557/// Min/max aggregation can take Dictionary encode input but always produces unpacked
1558/// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
1559/// The reason min/max aggregate produces unpacked output because there is only one
1560/// min/max value per group; there is no needs to keep them Dictionary encode
1561fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
1562    if let DataType::Dictionary(_, value_type) = input_type {
1563        value_type.as_ref()
1564    } else {
1565        input_type
1566    }
1567}
1568
1569fn create_max_min_accs(
1570    schema: &Schema,
1571) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
1572    let max_values: Vec<Option<MaxAccumulator>> = schema
1573        .fields()
1574        .iter()
1575        .map(|field| {
1576            MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
1577        })
1578        .collect();
1579    let min_values: Vec<Option<MinAccumulator>> = schema
1580        .fields()
1581        .iter()
1582        .map(|field| {
1583            MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
1584        })
1585        .collect();
1586    (max_values, min_values)
1587}