1use std::any::Any;
21use std::fmt;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::sync::Arc;
25
26use arrow::array::RecordBatch;
27use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
30use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer};
31
32use datafusion_datasource::file_format::{
33 FileFormat, FileFormatFactory, FilePushdownSupport,
34};
35use datafusion_datasource::write::demux::DemuxedStreamReceiver;
36
37use arrow::compute::sum;
38use arrow::datatypes::{DataType, Field, FieldRef};
39use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
40use datafusion_common::parsers::CompressionTypeVariant;
41use datafusion_common::stats::Precision;
42use datafusion_common::{
43 internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics,
44 DataFusionError, GetExt, Result, DEFAULT_PARQUET_EXTENSION,
45};
46use datafusion_common::{HashMap, Statistics};
47use datafusion_common_runtime::{JoinSet, SpawnedTask};
48use datafusion_datasource::display::FileGroupDisplay;
49use datafusion_datasource::file::FileSource;
50use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
51use datafusion_datasource::sink::{DataSink, DataSinkExec};
52use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
53use datafusion_execution::{SendableRecordBatchStream, TaskContext};
54use datafusion_expr::dml::InsertOp;
55use datafusion_expr::Expr;
56use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
57use datafusion_physical_expr::PhysicalExpr;
58use datafusion_physical_expr_common::sort_expr::LexRequirement;
59use datafusion_physical_plan::Accumulator;
60use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
61use datafusion_session::Session;
62
63use crate::can_expr_be_pushed_down_with_schemas;
64use crate::source::ParquetSource;
65use async_trait::async_trait;
66use bytes::Bytes;
67use datafusion_datasource::source::DataSourceExec;
68use futures::future::BoxFuture;
69use futures::{FutureExt, StreamExt, TryStreamExt};
70use log::debug;
71use object_store::buffered::BufWriter;
72use object_store::path::Path;
73use object_store::{ObjectMeta, ObjectStore};
74use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
75use parquet::arrow::arrow_writer::{
76 compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
77 ArrowLeafColumn, ArrowWriterOptions,
78};
79use parquet::arrow::async_reader::MetadataFetch;
80use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter};
81use parquet::basic::Type;
82use parquet::errors::ParquetError;
83use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
84use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
85use parquet::file::writer::SerializedFileWriter;
86use parquet::format::FileMetaData;
87use parquet::schema::types::SchemaDescriptor;
88use tokio::io::{AsyncWrite, AsyncWriteExt};
89use tokio::sync::mpsc::{self, Receiver, Sender};
90
91const INITIAL_BUFFER_BYTES: usize = 1048576;
94
95const BUFFER_FLUSH_BYTES: usize = 1024000;
98
99#[derive(Default)]
100pub struct ParquetFormatFactory {
102 pub options: Option<TableParquetOptions>,
104}
105
106impl ParquetFormatFactory {
107 pub fn new() -> Self {
109 Self { options: None }
110 }
111
112 pub fn new_with_options(options: TableParquetOptions) -> Self {
114 Self {
115 options: Some(options),
116 }
117 }
118}
119
120impl FileFormatFactory for ParquetFormatFactory {
121 fn create(
122 &self,
123 state: &dyn Session,
124 format_options: &std::collections::HashMap<String, String>,
125 ) -> Result<Arc<dyn FileFormat>> {
126 let parquet_options = match &self.options {
127 None => {
128 let mut table_options = state.default_table_options();
129 table_options.set_config_format(ConfigFileType::PARQUET);
130 table_options.alter_with_string_hash_map(format_options)?;
131 table_options.parquet
132 }
133 Some(parquet_options) => {
134 let mut parquet_options = parquet_options.clone();
135 for (k, v) in format_options {
136 parquet_options.set(k, v)?;
137 }
138 parquet_options
139 }
140 };
141
142 Ok(Arc::new(
143 ParquetFormat::default().with_options(parquet_options),
144 ))
145 }
146
147 fn default(&self) -> Arc<dyn FileFormat> {
148 Arc::new(ParquetFormat::default())
149 }
150
151 fn as_any(&self) -> &dyn Any {
152 self
153 }
154}
155
156impl GetExt for ParquetFormatFactory {
157 fn get_ext(&self) -> String {
158 DEFAULT_PARQUET_EXTENSION[1..].to_string()
160 }
161}
162
163impl Debug for ParquetFormatFactory {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 f.debug_struct("ParquetFormatFactory")
166 .field("ParquetFormatFactory", &self.options)
167 .finish()
168 }
169}
170#[derive(Debug, Default)]
172pub struct ParquetFormat {
173 options: TableParquetOptions,
174}
175
176impl ParquetFormat {
177 pub fn new() -> Self {
179 Self::default()
180 }
181
182 pub fn with_enable_pruning(mut self, enable: bool) -> Self {
185 self.options.global.pruning = enable;
186 self
187 }
188
189 pub fn enable_pruning(&self) -> bool {
191 self.options.global.pruning
192 }
193
194 pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
201 self.options.global.metadata_size_hint = size_hint;
202 self
203 }
204
205 pub fn metadata_size_hint(&self) -> Option<usize> {
207 self.options.global.metadata_size_hint
208 }
209
210 pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
216 self.options.global.skip_metadata = skip_metadata;
217 self
218 }
219
220 pub fn skip_metadata(&self) -> bool {
223 self.options.global.skip_metadata
224 }
225
226 pub fn with_options(mut self, options: TableParquetOptions) -> Self {
228 self.options = options;
229 self
230 }
231
232 pub fn options(&self) -> &TableParquetOptions {
234 &self.options
235 }
236
237 pub fn force_view_types(&self) -> bool {
249 self.options.global.schema_force_view_types
250 }
251
252 pub fn with_force_view_types(mut self, use_views: bool) -> Self {
254 self.options.global.schema_force_view_types = use_views;
255 self
256 }
257
258 pub fn binary_as_string(&self) -> bool {
267 self.options.global.binary_as_string
268 }
269
270 pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
272 self.options.global.binary_as_string = binary_as_string;
273 self
274 }
275
276 pub fn coerce_int96(&self) -> Option<String> {
277 self.options.global.coerce_int96.clone()
278 }
279
280 pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
281 self.options.global.coerce_int96 = time_unit;
282 self
283 }
284}
285
286fn clear_metadata(
289 schemas: impl IntoIterator<Item = Schema>,
290) -> impl Iterator<Item = Schema> {
291 schemas.into_iter().map(|schema| {
292 let fields = schema
293 .fields()
294 .iter()
295 .map(|field| {
296 field.as_ref().clone().with_metadata(Default::default()) })
298 .collect::<Fields>();
299 Schema::new(fields)
300 })
301}
302
303async fn fetch_schema_with_location(
304 store: &dyn ObjectStore,
305 file: &ObjectMeta,
306 metadata_size_hint: Option<usize>,
307) -> Result<(Path, Schema)> {
308 let loc_path = file.location.clone();
309 let schema = fetch_schema(store, file, metadata_size_hint).await?;
310 Ok((loc_path, schema))
311}
312
313#[async_trait]
314impl FileFormat for ParquetFormat {
315 fn as_any(&self) -> &dyn Any {
316 self
317 }
318
319 fn get_ext(&self) -> String {
320 ParquetFormatFactory::new().get_ext()
321 }
322
323 fn get_ext_with_compression(
324 &self,
325 file_compression_type: &FileCompressionType,
326 ) -> Result<String> {
327 let ext = self.get_ext();
328 match file_compression_type.get_variant() {
329 CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
330 _ => internal_err!("Parquet FileFormat does not support compression."),
331 }
332 }
333
334 async fn infer_schema(
335 &self,
336 state: &dyn Session,
337 store: &Arc<dyn ObjectStore>,
338 objects: &[ObjectMeta],
339 ) -> Result<SchemaRef> {
340 let mut schemas: Vec<_> = futures::stream::iter(objects)
341 .map(|object| {
342 fetch_schema_with_location(
343 store.as_ref(),
344 object,
345 self.metadata_size_hint(),
346 )
347 })
348 .boxed() .buffered(state.config_options().execution.meta_fetch_concurrency)
350 .try_collect()
351 .await?;
352
353 schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
360
361 let schemas = schemas
362 .into_iter()
363 .map(|(_, schema)| schema)
364 .collect::<Vec<_>>();
365
366 let schema = if self.skip_metadata() {
367 Schema::try_merge(clear_metadata(schemas))
368 } else {
369 Schema::try_merge(schemas)
370 }?;
371
372 let schema = if self.binary_as_string() {
373 transform_binary_to_string(&schema)
374 } else {
375 schema
376 };
377
378 let schema = if self.force_view_types() {
379 transform_schema_to_view(&schema)
380 } else {
381 schema
382 };
383
384 Ok(Arc::new(schema))
385 }
386
387 async fn infer_stats(
388 &self,
389 _state: &dyn Session,
390 store: &Arc<dyn ObjectStore>,
391 table_schema: SchemaRef,
392 object: &ObjectMeta,
393 ) -> Result<Statistics> {
394 let stats = fetch_statistics(
395 store.as_ref(),
396 table_schema,
397 object,
398 self.metadata_size_hint(),
399 )
400 .await?;
401 Ok(stats)
402 }
403
404 async fn create_physical_plan(
405 &self,
406 _state: &dyn Session,
407 conf: FileScanConfig,
408 filters: Option<&Arc<dyn PhysicalExpr>>,
409 ) -> Result<Arc<dyn ExecutionPlan>> {
410 let mut predicate = None;
411 let mut metadata_size_hint = None;
412
413 if self.enable_pruning() {
417 if let Some(pred) = filters.cloned() {
418 predicate = Some(pred);
419 }
420 }
421 if let Some(metadata) = self.metadata_size_hint() {
422 metadata_size_hint = Some(metadata);
423 }
424
425 let mut source = ParquetSource::new(self.options.clone());
426
427 if let Some(predicate) = predicate {
428 source = source.with_predicate(Arc::clone(&conf.file_schema), predicate);
429 }
430 if let Some(metadata_size_hint) = metadata_size_hint {
431 source = source.with_metadata_size_hint(metadata_size_hint)
432 }
433
434 let conf = FileScanConfigBuilder::from(conf)
435 .with_source(Arc::new(source))
436 .build();
437 Ok(DataSourceExec::from_data_source(conf))
438 }
439
440 async fn create_writer_physical_plan(
441 &self,
442 input: Arc<dyn ExecutionPlan>,
443 _state: &dyn Session,
444 conf: FileSinkConfig,
445 order_requirements: Option<LexRequirement>,
446 ) -> Result<Arc<dyn ExecutionPlan>> {
447 if conf.insert_op != InsertOp::Append {
448 return not_impl_err!("Overwrites are not implemented yet for Parquet");
449 }
450
451 let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
452
453 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
454 }
455
456 fn supports_filters_pushdown(
457 &self,
458 file_schema: &Schema,
459 table_schema: &Schema,
460 filters: &[&Expr],
461 ) -> Result<FilePushdownSupport> {
462 if !self.options().global.pushdown_filters {
463 return Ok(FilePushdownSupport::NoSupport);
464 }
465
466 let all_supported = filters.iter().all(|filter| {
467 can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema)
468 });
469
470 Ok(if all_supported {
471 FilePushdownSupport::Supported
472 } else {
473 FilePushdownSupport::NotSupportedForFilter
474 })
475 }
476
477 fn file_source(&self) -> Arc<dyn FileSource> {
478 Arc::new(ParquetSource::default())
479 }
480}
481
482pub fn apply_file_schema_type_coercions(
498 table_schema: &Schema,
499 file_schema: &Schema,
500) -> Option<Schema> {
501 let mut needs_view_transform = false;
502 let mut needs_string_transform = false;
503
504 let table_fields: HashMap<_, _> = table_schema
507 .fields()
508 .iter()
509 .map(|f| {
510 let dt = f.data_type();
511 if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
513 needs_view_transform = true;
514 }
515 if matches!(
517 dt,
518 &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
519 ) {
520 needs_string_transform = true;
521 }
522
523 (f.name(), dt)
524 })
525 .collect();
526
527 if !needs_view_transform && !needs_string_transform {
529 return None;
530 }
531
532 let transformed_fields: Vec<Arc<Field>> = file_schema
533 .fields()
534 .iter()
535 .map(|field| {
536 let field_name = field.name();
537 let field_type = field.data_type();
538
539 if let Some(table_type) = table_fields.get(field_name) {
541 match (table_type, field_type) {
542 (
544 &DataType::Utf8,
545 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
546 ) => {
547 return field_with_new_type(field, DataType::Utf8);
548 }
549 (
551 &DataType::LargeUtf8,
552 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
553 ) => {
554 return field_with_new_type(field, DataType::LargeUtf8);
555 }
556 (
558 &DataType::Utf8View,
559 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
560 ) => {
561 return field_with_new_type(field, DataType::Utf8View);
562 }
563 (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
565 return field_with_new_type(field, DataType::Utf8View);
566 }
567 (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
568 return field_with_new_type(field, DataType::BinaryView);
569 }
570 _ => {}
571 }
572 }
573
574 Arc::clone(field)
576 })
577 .collect();
578
579 Some(Schema::new_with_metadata(
580 transformed_fields,
581 file_schema.metadata.clone(),
582 ))
583}
584
585pub fn coerce_int96_to_resolution(
587 parquet_schema: &SchemaDescriptor,
588 file_schema: &Schema,
589 time_unit: &TimeUnit,
590) -> Option<Schema> {
591 let mut transform = false;
592 let parquet_fields: HashMap<_, _> = parquet_schema
593 .columns()
594 .iter()
595 .map(|f| {
596 let dt = f.physical_type();
597 if dt.eq(&Type::INT96) {
598 transform = true;
599 }
600 (f.name(), dt)
601 })
602 .collect();
603
604 if !transform {
605 return None;
606 }
607
608 let transformed_fields: Vec<Arc<Field>> = file_schema
609 .fields
610 .iter()
611 .map(|field| match parquet_fields.get(field.name().as_str()) {
612 Some(Type::INT96) => {
613 field_with_new_type(field, DataType::Timestamp(*time_unit, None))
614 }
615 _ => Arc::clone(field),
616 })
617 .collect();
618
619 Some(Schema::new_with_metadata(
620 transformed_fields,
621 file_schema.metadata.clone(),
622 ))
623}
624
625#[deprecated(
627 since = "47.0.0",
628 note = "Use `apply_file_schema_type_coercions` instead"
629)]
630pub fn coerce_file_schema_to_view_type(
631 table_schema: &Schema,
632 file_schema: &Schema,
633) -> Option<Schema> {
634 let mut transform = false;
635 let table_fields: HashMap<_, _> = table_schema
636 .fields
637 .iter()
638 .map(|f| {
639 let dt = f.data_type();
640 if dt.equals_datatype(&DataType::Utf8View)
641 || dt.equals_datatype(&DataType::BinaryView)
642 {
643 transform = true;
644 }
645 (f.name(), dt)
646 })
647 .collect();
648
649 if !transform {
650 return None;
651 }
652
653 let transformed_fields: Vec<Arc<Field>> = file_schema
654 .fields
655 .iter()
656 .map(
657 |field| match (table_fields.get(field.name()), field.data_type()) {
658 (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
659 field_with_new_type(field, DataType::Utf8View)
660 }
661 (
662 Some(DataType::BinaryView),
663 DataType::Binary | DataType::LargeBinary,
664 ) => field_with_new_type(field, DataType::BinaryView),
665 _ => Arc::clone(field),
666 },
667 )
668 .collect();
669
670 Some(Schema::new_with_metadata(
671 transformed_fields,
672 file_schema.metadata.clone(),
673 ))
674}
675
676#[deprecated(
680 since = "47.0.0",
681 note = "Use `apply_file_schema_type_coercions` instead"
682)]
683pub fn coerce_file_schema_to_string_type(
684 table_schema: &Schema,
685 file_schema: &Schema,
686) -> Option<Schema> {
687 let mut transform = false;
688 let table_fields: HashMap<_, _> = table_schema
689 .fields
690 .iter()
691 .map(|f| (f.name(), f.data_type()))
692 .collect();
693 let transformed_fields: Vec<Arc<Field>> = file_schema
694 .fields
695 .iter()
696 .map(
697 |field| match (table_fields.get(field.name()), field.data_type()) {
698 (
700 Some(DataType::Utf8),
701 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
702 ) => {
703 transform = true;
704 field_with_new_type(field, DataType::Utf8)
705 }
706 (
708 Some(DataType::LargeUtf8),
709 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
710 ) => {
711 transform = true;
712 field_with_new_type(field, DataType::LargeUtf8)
713 }
714 (
716 Some(DataType::Utf8View),
717 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
718 ) => {
719 transform = true;
720 field_with_new_type(field, DataType::Utf8View)
721 }
722 _ => Arc::clone(field),
723 },
724 )
725 .collect();
726
727 if !transform {
728 None
729 } else {
730 Some(Schema::new_with_metadata(
731 transformed_fields,
732 file_schema.metadata.clone(),
733 ))
734 }
735}
736
737fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
740 Arc::new(field.as_ref().clone().with_data_type(new_type))
741}
742
743pub fn transform_schema_to_view(schema: &Schema) -> Schema {
747 let transformed_fields: Vec<Arc<Field>> = schema
748 .fields
749 .iter()
750 .map(|field| match field.data_type() {
751 DataType::Utf8 | DataType::LargeUtf8 => {
752 field_with_new_type(field, DataType::Utf8View)
753 }
754 DataType::Binary | DataType::LargeBinary => {
755 field_with_new_type(field, DataType::BinaryView)
756 }
757 _ => Arc::clone(field),
758 })
759 .collect();
760 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
761}
762
763pub fn transform_binary_to_string(schema: &Schema) -> Schema {
765 let transformed_fields: Vec<Arc<Field>> = schema
766 .fields
767 .iter()
768 .map(|field| match field.data_type() {
769 DataType::Binary => field_with_new_type(field, DataType::Utf8),
770 DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
771 DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
772 _ => Arc::clone(field),
773 })
774 .collect();
775 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
776}
777
778struct ObjectStoreFetch<'a> {
780 store: &'a dyn ObjectStore,
781 meta: &'a ObjectMeta,
782}
783
784impl<'a> ObjectStoreFetch<'a> {
785 fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
786 Self { store, meta }
787 }
788}
789
790impl MetadataFetch for ObjectStoreFetch<'_> {
791 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
792 async {
793 self.store
794 .get_range(&self.meta.location, range)
795 .await
796 .map_err(ParquetError::from)
797 }
798 .boxed()
799 }
800}
801
802pub async fn fetch_parquet_metadata(
809 store: &dyn ObjectStore,
810 meta: &ObjectMeta,
811 size_hint: Option<usize>,
812) -> Result<ParquetMetaData> {
813 let file_size = meta.size;
814 let fetch = ObjectStoreFetch::new(store, meta);
815
816 ParquetMetaDataReader::new()
817 .with_prefetch_hint(size_hint)
818 .load_and_finish(fetch, file_size)
819 .await
820 .map_err(DataFusionError::from)
821}
822
823async fn fetch_schema(
825 store: &dyn ObjectStore,
826 file: &ObjectMeta,
827 metadata_size_hint: Option<usize>,
828) -> Result<Schema> {
829 let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
830 let file_metadata = metadata.file_metadata();
831 let schema = parquet_to_arrow_schema(
832 file_metadata.schema_descr(),
833 file_metadata.key_value_metadata(),
834 )?;
835 Ok(schema)
836}
837
838pub async fn fetch_statistics(
842 store: &dyn ObjectStore,
843 table_schema: SchemaRef,
844 file: &ObjectMeta,
845 metadata_size_hint: Option<usize>,
846) -> Result<Statistics> {
847 let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
848 statistics_from_parquet_meta_calc(&metadata, table_schema)
849}
850
851pub fn statistics_from_parquet_meta_calc(
880 metadata: &ParquetMetaData,
881 table_schema: SchemaRef,
882) -> Result<Statistics> {
883 let row_groups_metadata = metadata.row_groups();
884
885 let mut statistics = Statistics::new_unknown(&table_schema);
886 let mut has_statistics = false;
887 let mut num_rows = 0_usize;
888 let mut total_byte_size = 0_usize;
889 for row_group_meta in row_groups_metadata {
890 num_rows += row_group_meta.num_rows() as usize;
891 total_byte_size += row_group_meta.total_byte_size() as usize;
892
893 if !has_statistics {
894 has_statistics = row_group_meta
895 .columns()
896 .iter()
897 .any(|column| column.statistics().is_some());
898 }
899 }
900 statistics.num_rows = Precision::Exact(num_rows);
901 statistics.total_byte_size = Precision::Exact(total_byte_size);
902
903 let file_metadata = metadata.file_metadata();
904 let mut file_schema = parquet_to_arrow_schema(
905 file_metadata.schema_descr(),
906 file_metadata.key_value_metadata(),
907 )?;
908
909 if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &file_schema) {
910 file_schema = merged;
911 }
912
913 statistics.column_statistics = if has_statistics {
914 let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
915 let mut null_counts_array =
916 vec![Precision::Exact(0); table_schema.fields().len()];
917
918 table_schema
919 .fields()
920 .iter()
921 .enumerate()
922 .for_each(|(idx, field)| {
923 match StatisticsConverter::try_new(
924 field.name(),
925 &file_schema,
926 file_metadata.schema_descr(),
927 ) {
928 Ok(stats_converter) => {
929 summarize_min_max_null_counts(
930 &mut min_accs,
931 &mut max_accs,
932 &mut null_counts_array,
933 idx,
934 num_rows,
935 &stats_converter,
936 row_groups_metadata,
937 )
938 .ok();
939 }
940 Err(e) => {
941 debug!("Failed to create statistics converter: {}", e);
942 null_counts_array[idx] = Precision::Exact(num_rows);
943 }
944 }
945 });
946
947 get_col_stats(
948 &table_schema,
949 null_counts_array,
950 &mut max_accs,
951 &mut min_accs,
952 )
953 } else {
954 Statistics::unknown_column(&table_schema)
955 };
956
957 Ok(statistics)
958}
959
960fn get_col_stats(
961 schema: &Schema,
962 null_counts: Vec<Precision<usize>>,
963 max_values: &mut [Option<MaxAccumulator>],
964 min_values: &mut [Option<MinAccumulator>],
965) -> Vec<ColumnStatistics> {
966 (0..schema.fields().len())
967 .map(|i| {
968 let max_value = match max_values.get_mut(i).unwrap() {
969 Some(max_value) => max_value.evaluate().ok(),
970 None => None,
971 };
972 let min_value = match min_values.get_mut(i).unwrap() {
973 Some(min_value) => min_value.evaluate().ok(),
974 None => None,
975 };
976 ColumnStatistics {
977 null_count: null_counts[i],
978 max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent),
979 min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent),
980 sum_value: Precision::Absent,
981 distinct_count: Precision::Absent,
982 }
983 })
984 .collect()
985}
986
987fn summarize_min_max_null_counts(
988 min_accs: &mut [Option<MinAccumulator>],
989 max_accs: &mut [Option<MaxAccumulator>],
990 null_counts_array: &mut [Precision<usize>],
991 arrow_schema_index: usize,
992 num_rows: usize,
993 stats_converter: &StatisticsConverter,
994 row_groups_metadata: &[RowGroupMetaData],
995) -> Result<()> {
996 let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
997 let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
998 let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
999
1000 if let Some(max_acc) = &mut max_accs[arrow_schema_index] {
1001 max_acc.update_batch(&[max_values])?;
1002 }
1003
1004 if let Some(min_acc) = &mut min_accs[arrow_schema_index] {
1005 min_acc.update_batch(&[min_values])?;
1006 }
1007
1008 null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) {
1009 Some(null_count) => null_count as usize,
1010 None => num_rows,
1011 });
1012
1013 Ok(())
1014}
1015
1016pub struct ParquetSink {
1018 config: FileSinkConfig,
1020 parquet_options: TableParquetOptions,
1022 written: Arc<parking_lot::Mutex<HashMap<Path, FileMetaData>>>,
1025}
1026
1027impl Debug for ParquetSink {
1028 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1029 f.debug_struct("ParquetSink").finish()
1030 }
1031}
1032
1033impl DisplayAs for ParquetSink {
1034 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1035 match t {
1036 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1037 write!(f, "ParquetSink(file_groups=",)?;
1038 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1039 write!(f, ")")
1040 }
1041 DisplayFormatType::TreeRender => {
1042 write!(f, "")
1044 }
1045 }
1046 }
1047}
1048
1049impl ParquetSink {
1050 pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1052 Self {
1053 config,
1054 parquet_options,
1055 written: Default::default(),
1056 }
1057 }
1058
1059 pub fn written(&self) -> HashMap<Path, FileMetaData> {
1062 self.written.lock().clone()
1063 }
1064
1065 fn create_writer_props(&self) -> Result<WriterProperties> {
1068 let schema = if self.parquet_options.global.allow_single_file_parallelism {
1069 &get_writer_schema(&self.config)
1073 } else {
1074 self.config.output_schema()
1075 };
1076
1077 let mut parquet_opts = self.parquet_options.clone();
1080 if !self.parquet_options.global.skip_arrow_metadata {
1081 parquet_opts.arrow_schema(schema);
1082 }
1083
1084 Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build())
1085 }
1086
1087 async fn create_async_arrow_writer(
1090 &self,
1091 location: &Path,
1092 object_store: Arc<dyn ObjectStore>,
1093 parquet_props: WriterProperties,
1094 ) -> Result<AsyncArrowWriter<BufWriter>> {
1095 let buf_writer = BufWriter::new(object_store, location.clone());
1096 let options = ArrowWriterOptions::new()
1097 .with_properties(parquet_props)
1098 .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1099
1100 let writer = AsyncArrowWriter::try_new_with_options(
1101 buf_writer,
1102 get_writer_schema(&self.config),
1103 options,
1104 )?;
1105 Ok(writer)
1106 }
1107
1108 pub fn parquet_options(&self) -> &TableParquetOptions {
1110 &self.parquet_options
1111 }
1112}
1113
1114#[async_trait]
1115impl FileSink for ParquetSink {
1116 fn config(&self) -> &FileSinkConfig {
1117 &self.config
1118 }
1119
1120 async fn spawn_writer_tasks_and_join(
1121 &self,
1122 context: &Arc<TaskContext>,
1123 demux_task: SpawnedTask<Result<()>>,
1124 mut file_stream_rx: DemuxedStreamReceiver,
1125 object_store: Arc<dyn ObjectStore>,
1126 ) -> Result<u64> {
1127 let parquet_opts = &self.parquet_options;
1128 let allow_single_file_parallelism =
1129 parquet_opts.global.allow_single_file_parallelism;
1130
1131 let mut file_write_tasks: JoinSet<
1132 std::result::Result<(Path, FileMetaData), DataFusionError>,
1133 > = JoinSet::new();
1134
1135 let parquet_props = self.create_writer_props()?;
1136 let parallel_options = ParallelParquetWriterOptions {
1137 max_parallel_row_groups: parquet_opts
1138 .global
1139 .maximum_parallel_row_group_writers,
1140 max_buffered_record_batches_per_stream: parquet_opts
1141 .global
1142 .maximum_buffered_record_batches_per_stream,
1143 };
1144
1145 while let Some((path, mut rx)) = file_stream_rx.recv().await {
1146 if !allow_single_file_parallelism {
1147 let mut writer = self
1148 .create_async_arrow_writer(
1149 &path,
1150 Arc::clone(&object_store),
1151 parquet_props.clone(),
1152 )
1153 .await?;
1154 let mut reservation =
1155 MemoryConsumer::new(format!("ParquetSink[{}]", path))
1156 .register(context.memory_pool());
1157 file_write_tasks.spawn(async move {
1158 while let Some(batch) = rx.recv().await {
1159 writer.write(&batch).await?;
1160 reservation.try_resize(writer.memory_size())?;
1161 }
1162 let file_metadata = writer
1163 .close()
1164 .await
1165 .map_err(DataFusionError::ParquetError)?;
1166 Ok((path, file_metadata))
1167 });
1168 } else {
1169 let writer = create_writer(
1170 FileCompressionType::UNCOMPRESSED,
1173 &path,
1174 Arc::clone(&object_store),
1175 )
1176 .await?;
1177 let schema = get_writer_schema(&self.config);
1178 let props = parquet_props.clone();
1179 let parallel_options_clone = parallel_options.clone();
1180 let pool = Arc::clone(context.memory_pool());
1181 file_write_tasks.spawn(async move {
1182 let file_metadata = output_single_parquet_file_parallelized(
1183 writer,
1184 rx,
1185 schema,
1186 &props,
1187 parallel_options_clone,
1188 pool,
1189 )
1190 .await?;
1191 Ok((path, file_metadata))
1192 });
1193 }
1194 }
1195
1196 let mut row_count = 0;
1197 while let Some(result) = file_write_tasks.join_next().await {
1198 match result {
1199 Ok(r) => {
1200 let (path, file_metadata) = r?;
1201 row_count += file_metadata.num_rows;
1202 let mut written_files = self.written.lock();
1203 written_files
1204 .try_insert(path.clone(), file_metadata)
1205 .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1206 drop(written_files);
1207 }
1208 Err(e) => {
1209 if e.is_panic() {
1210 std::panic::resume_unwind(e.into_panic());
1211 } else {
1212 unreachable!();
1213 }
1214 }
1215 }
1216 }
1217
1218 demux_task
1219 .join_unwind()
1220 .await
1221 .map_err(DataFusionError::ExecutionJoin)??;
1222
1223 Ok(row_count as u64)
1224 }
1225}
1226
1227#[async_trait]
1228impl DataSink for ParquetSink {
1229 fn as_any(&self) -> &dyn Any {
1230 self
1231 }
1232
1233 fn schema(&self) -> &SchemaRef {
1234 self.config.output_schema()
1235 }
1236
1237 async fn write_all(
1238 &self,
1239 data: SendableRecordBatchStream,
1240 context: &Arc<TaskContext>,
1241 ) -> Result<u64> {
1242 FileSink::write_all(self, data, context).await
1243 }
1244}
1245
1246async fn column_serializer_task(
1249 mut rx: Receiver<ArrowLeafColumn>,
1250 mut writer: ArrowColumnWriter,
1251 mut reservation: MemoryReservation,
1252) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1253 while let Some(col) = rx.recv().await {
1254 writer.write(&col)?;
1255 reservation.try_resize(writer.memory_size())?;
1256 }
1257 Ok((writer, reservation))
1258}
1259
1260type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1261type ColSender = Sender<ArrowLeafColumn>;
1262
1263fn spawn_column_parallel_row_group_writer(
1267 schema: Arc<Schema>,
1268 parquet_props: Arc<WriterProperties>,
1269 max_buffer_size: usize,
1270 pool: &Arc<dyn MemoryPool>,
1271) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1272 let schema_desc = ArrowSchemaConverter::new().convert(&schema)?;
1273 let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
1274 let num_columns = col_writers.len();
1275
1276 let mut col_writer_tasks = Vec::with_capacity(num_columns);
1277 let mut col_array_channels = Vec::with_capacity(num_columns);
1278 for writer in col_writers.into_iter() {
1279 let (send_array, receive_array) =
1281 mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1282 col_array_channels.push(send_array);
1283
1284 let reservation =
1285 MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1286 let task = SpawnedTask::spawn(column_serializer_task(
1287 receive_array,
1288 writer,
1289 reservation,
1290 ));
1291 col_writer_tasks.push(task);
1292 }
1293
1294 Ok((col_writer_tasks, col_array_channels))
1295}
1296
1297#[derive(Clone)]
1299struct ParallelParquetWriterOptions {
1300 max_parallel_row_groups: usize,
1301 max_buffered_record_batches_per_stream: usize,
1302}
1303
1304type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1307
1308async fn send_arrays_to_col_writers(
1311 col_array_channels: &[ColSender],
1312 rb: &RecordBatch,
1313 schema: Arc<Schema>,
1314) -> Result<()> {
1315 let mut next_channel = 0;
1317 for (array, field) in rb.columns().iter().zip(schema.fields()) {
1318 for c in compute_leaves(field, array)? {
1319 if col_array_channels[next_channel].send(c).await.is_err() {
1322 return Ok(());
1323 }
1324
1325 next_channel += 1;
1326 }
1327 }
1328
1329 Ok(())
1330}
1331
1332fn spawn_rg_join_and_finalize_task(
1335 column_writer_tasks: Vec<ColumnWriterTask>,
1336 rg_rows: usize,
1337 pool: &Arc<dyn MemoryPool>,
1338) -> SpawnedTask<RBStreamSerializeResult> {
1339 let mut rg_reservation =
1340 MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1341
1342 SpawnedTask::spawn(async move {
1343 let num_cols = column_writer_tasks.len();
1344 let mut finalized_rg = Vec::with_capacity(num_cols);
1345 for task in column_writer_tasks.into_iter() {
1346 let (writer, _col_reservation) = task
1347 .join_unwind()
1348 .await
1349 .map_err(DataFusionError::ExecutionJoin)??;
1350 let encoded_size = writer.get_estimated_total_bytes();
1351 rg_reservation.grow(encoded_size);
1352 finalized_rg.push(writer.close()?);
1353 }
1354
1355 Ok((finalized_rg, rg_reservation, rg_rows))
1356 })
1357}
1358
1359fn spawn_parquet_parallel_serialization_task(
1368 mut data: Receiver<RecordBatch>,
1369 serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1370 schema: Arc<Schema>,
1371 writer_props: Arc<WriterProperties>,
1372 parallel_options: ParallelParquetWriterOptions,
1373 pool: Arc<dyn MemoryPool>,
1374) -> SpawnedTask<Result<(), DataFusionError>> {
1375 SpawnedTask::spawn(async move {
1376 let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1377 let max_row_group_rows = writer_props.max_row_group_size();
1378 let (mut column_writer_handles, mut col_array_channels) =
1379 spawn_column_parallel_row_group_writer(
1380 Arc::clone(&schema),
1381 Arc::clone(&writer_props),
1382 max_buffer_rb,
1383 &pool,
1384 )?;
1385 let mut current_rg_rows = 0;
1386
1387 while let Some(mut rb) = data.recv().await {
1388 loop {
1392 if current_rg_rows + rb.num_rows() < max_row_group_rows {
1393 send_arrays_to_col_writers(
1394 &col_array_channels,
1395 &rb,
1396 Arc::clone(&schema),
1397 )
1398 .await?;
1399 current_rg_rows += rb.num_rows();
1400 break;
1401 } else {
1402 let rows_left = max_row_group_rows - current_rg_rows;
1403 let a = rb.slice(0, rows_left);
1404 send_arrays_to_col_writers(
1405 &col_array_channels,
1406 &a,
1407 Arc::clone(&schema),
1408 )
1409 .await?;
1410
1411 drop(col_array_channels);
1415 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1416 column_writer_handles,
1417 max_row_group_rows,
1418 &pool,
1419 );
1420
1421 if serialize_tx.send(finalize_rg_task).await.is_err() {
1424 return Ok(());
1425 }
1426
1427 current_rg_rows = 0;
1428 rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1429
1430 (column_writer_handles, col_array_channels) =
1431 spawn_column_parallel_row_group_writer(
1432 Arc::clone(&schema),
1433 Arc::clone(&writer_props),
1434 max_buffer_rb,
1435 &pool,
1436 )?;
1437 }
1438 }
1439 }
1440
1441 drop(col_array_channels);
1442 if current_rg_rows > 0 {
1444 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1445 column_writer_handles,
1446 current_rg_rows,
1447 &pool,
1448 );
1449
1450 if serialize_tx.send(finalize_rg_task).await.is_err() {
1453 return Ok(());
1454 }
1455 }
1456
1457 Ok(())
1458 })
1459}
1460
1461async fn concatenate_parallel_row_groups(
1464 mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1465 schema: Arc<Schema>,
1466 writer_props: Arc<WriterProperties>,
1467 mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1468 pool: Arc<dyn MemoryPool>,
1469) -> Result<FileMetaData> {
1470 let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1471
1472 let mut file_reservation =
1473 MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1474
1475 let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?;
1476 let mut parquet_writer = SerializedFileWriter::new(
1477 merged_buff.clone(),
1478 schema_desc.root_schema_ptr(),
1479 writer_props,
1480 )?;
1481
1482 while let Some(task) = serialize_rx.recv().await {
1483 let result = task.join_unwind().await;
1484 let mut rg_out = parquet_writer.next_row_group()?;
1485 let (serialized_columns, mut rg_reservation, _cnt) =
1486 result.map_err(DataFusionError::ExecutionJoin)??;
1487 for chunk in serialized_columns {
1488 chunk.append_to_row_group(&mut rg_out)?;
1489 rg_reservation.free();
1490
1491 let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1492 file_reservation.try_resize(buff_to_flush.len())?;
1493
1494 if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1495 object_store_writer
1496 .write_all(buff_to_flush.as_slice())
1497 .await?;
1498 buff_to_flush.clear();
1499 file_reservation.try_resize(buff_to_flush.len())?; }
1501 }
1502 rg_out.close()?;
1503 }
1504
1505 let file_metadata = parquet_writer.close()?;
1506 let final_buff = merged_buff.buffer.try_lock().unwrap();
1507
1508 object_store_writer.write_all(final_buff.as_slice()).await?;
1509 object_store_writer.shutdown().await?;
1510 file_reservation.free();
1511
1512 Ok(file_metadata)
1513}
1514
1515async fn output_single_parquet_file_parallelized(
1520 object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1521 data: Receiver<RecordBatch>,
1522 output_schema: Arc<Schema>,
1523 parquet_props: &WriterProperties,
1524 parallel_options: ParallelParquetWriterOptions,
1525 pool: Arc<dyn MemoryPool>,
1526) -> Result<FileMetaData> {
1527 let max_rowgroups = parallel_options.max_parallel_row_groups;
1528 let (serialize_tx, serialize_rx) =
1530 mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1531
1532 let arc_props = Arc::new(parquet_props.clone());
1533 let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1534 data,
1535 serialize_tx,
1536 Arc::clone(&output_schema),
1537 Arc::clone(&arc_props),
1538 parallel_options,
1539 Arc::clone(&pool),
1540 );
1541 let file_metadata = concatenate_parallel_row_groups(
1542 serialize_rx,
1543 Arc::clone(&output_schema),
1544 Arc::clone(&arc_props),
1545 object_store_writer,
1546 pool,
1547 )
1548 .await?;
1549
1550 launch_serialization_task
1551 .join_unwind()
1552 .await
1553 .map_err(DataFusionError::ExecutionJoin)??;
1554 Ok(file_metadata)
1555}
1556
1557fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
1562 if let DataType::Dictionary(_, value_type) = input_type {
1563 value_type.as_ref()
1564 } else {
1565 input_type
1566 }
1567}
1568
1569fn create_max_min_accs(
1570 schema: &Schema,
1571) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
1572 let max_values: Vec<Option<MaxAccumulator>> = schema
1573 .fields()
1574 .iter()
1575 .map(|field| {
1576 MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
1577 })
1578 .collect();
1579 let min_values: Vec<Option<MinAccumulator>> = schema
1580 .fields()
1581 .iter()
1582 .map(|field| {
1583 MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
1584 })
1585 .collect();
1586 (max_values, min_values)
1587}