1use std::any::Any;
21use std::cell::RefCell;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{fmt, vec};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
30use datafusion_datasource::TableSchema;
31use datafusion_datasource::file_compression_type::FileCompressionType;
32use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
33use datafusion_datasource::write::{
34 ObjectWriterBuilder, SharedBuffer, get_writer_schema,
35};
36
37use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
38use datafusion_datasource::write::demux::DemuxedStreamReceiver;
39
40use arrow::datatypes::{DataType, Field, FieldRef};
41use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
42use datafusion_common::encryption::FileDecryptionProperties;
43use datafusion_common::parsers::CompressionTypeVariant;
44use datafusion_common::{
45 DEFAULT_PARQUET_EXTENSION, DataFusionError, GetExt, HashSet, Result,
46 internal_datafusion_err, internal_err, not_impl_err,
47};
48use datafusion_common::{HashMap, Statistics};
49use datafusion_common_runtime::{JoinSet, SpawnedTask};
50use datafusion_datasource::display::FileGroupDisplay;
51use datafusion_datasource::file::FileSource;
52use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
53use datafusion_datasource::sink::{DataSink, DataSinkExec};
54use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
55use datafusion_execution::{SendableRecordBatchStream, TaskContext};
56use datafusion_expr::dml::InsertOp;
57use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
58use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
59use datafusion_session::Session;
60
61use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns};
62use crate::reader::CachedParquetFileReaderFactory;
63use crate::source::{ParquetSource, parse_coerce_int96_string};
64use async_trait::async_trait;
65use bytes::Bytes;
66use datafusion_datasource::source::DataSourceExec;
67use datafusion_execution::cache::cache_manager::FileMetadataCache;
68use datafusion_execution::runtime_env::RuntimeEnv;
69use futures::future::BoxFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use object_store::buffered::BufWriter;
72use object_store::path::Path;
73use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
74use parquet::arrow::arrow_writer::{
75 ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory,
76 ArrowWriterOptions, compute_leaves,
77};
78use parquet::arrow::async_reader::MetadataFetch;
79use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
80use parquet::basic::Type;
81#[cfg(feature = "parquet_encryption")]
82use parquet::encryption::encrypt::FileEncryptionProperties;
83use parquet::errors::ParquetError;
84use parquet::file::metadata::{ParquetMetaData, SortingColumn};
85use parquet::file::properties::{
86 DEFAULT_MAX_ROW_GROUP_ROW_COUNT, WriterProperties, WriterPropertiesBuilder,
87};
88use parquet::file::writer::SerializedFileWriter;
89use parquet::schema::types::SchemaDescriptor;
90use tokio::io::{AsyncWrite, AsyncWriteExt};
91use tokio::sync::mpsc::{self, Receiver, Sender};
92
93const INITIAL_BUFFER_BYTES: usize = 1048576;
96
97const BUFFER_FLUSH_BYTES: usize = 1024000;
100
101#[derive(Default)]
102pub struct ParquetFormatFactory {
104 pub options: Option<TableParquetOptions>,
106}
107
108impl ParquetFormatFactory {
109 pub fn new() -> Self {
111 Self { options: None }
112 }
113
114 pub fn new_with_options(options: TableParquetOptions) -> Self {
116 Self {
117 options: Some(options),
118 }
119 }
120}
121
122impl FileFormatFactory for ParquetFormatFactory {
123 fn create(
124 &self,
125 state: &dyn Session,
126 format_options: &std::collections::HashMap<String, String>,
127 ) -> Result<Arc<dyn FileFormat>> {
128 let parquet_options = match &self.options {
129 None => {
130 let mut table_options = state.default_table_options();
131 table_options.set_config_format(ConfigFileType::PARQUET);
132 table_options.alter_with_string_hash_map(format_options)?;
133 table_options.parquet
134 }
135 Some(parquet_options) => {
136 let mut parquet_options = parquet_options.clone();
137 for (k, v) in format_options {
138 parquet_options.set(k, v)?;
139 }
140 parquet_options
141 }
142 };
143
144 Ok(Arc::new(
145 ParquetFormat::default().with_options(parquet_options),
146 ))
147 }
148
149 fn default(&self) -> Arc<dyn FileFormat> {
150 Arc::new(ParquetFormat::default())
151 }
152
153 fn as_any(&self) -> &dyn Any {
154 self
155 }
156}
157
158impl GetExt for ParquetFormatFactory {
159 fn get_ext(&self) -> String {
160 DEFAULT_PARQUET_EXTENSION[1..].to_string()
162 }
163}
164
165impl Debug for ParquetFormatFactory {
166 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167 f.debug_struct("ParquetFormatFactory")
168 .field("ParquetFormatFactory", &self.options)
169 .finish()
170 }
171}
172#[derive(Debug, Default)]
174pub struct ParquetFormat {
175 options: TableParquetOptions,
176}
177
178impl ParquetFormat {
179 pub fn new() -> Self {
181 Self::default()
182 }
183
184 pub fn with_enable_pruning(mut self, enable: bool) -> Self {
187 self.options.global.pruning = enable;
188 self
189 }
190
191 pub fn enable_pruning(&self) -> bool {
193 self.options.global.pruning
194 }
195
196 pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
203 self.options.global.metadata_size_hint = size_hint;
204 self
205 }
206
207 pub fn metadata_size_hint(&self) -> Option<usize> {
209 self.options.global.metadata_size_hint
210 }
211
212 pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
218 self.options.global.skip_metadata = skip_metadata;
219 self
220 }
221
222 pub fn skip_metadata(&self) -> bool {
225 self.options.global.skip_metadata
226 }
227
228 pub fn with_options(mut self, options: TableParquetOptions) -> Self {
230 self.options = options;
231 self
232 }
233
234 pub fn options(&self) -> &TableParquetOptions {
236 &self.options
237 }
238
239 pub fn force_view_types(&self) -> bool {
251 self.options.global.schema_force_view_types
252 }
253
254 pub fn with_force_view_types(mut self, use_views: bool) -> Self {
256 self.options.global.schema_force_view_types = use_views;
257 self
258 }
259
260 pub fn binary_as_string(&self) -> bool {
269 self.options.global.binary_as_string
270 }
271
272 pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
274 self.options.global.binary_as_string = binary_as_string;
275 self
276 }
277
278 pub fn coerce_int96(&self) -> Option<String> {
279 self.options.global.coerce_int96.clone()
280 }
281
282 pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
283 self.options.global.coerce_int96 = time_unit;
284 self
285 }
286}
287
288fn clear_metadata(
291 schemas: impl IntoIterator<Item = Schema>,
292) -> impl Iterator<Item = Schema> {
293 schemas.into_iter().map(|schema| {
294 let fields = schema
295 .fields()
296 .iter()
297 .map(|field| {
298 field.as_ref().clone().with_metadata(Default::default()) })
300 .collect::<Fields>();
301 Schema::new(fields)
302 })
303}
304
305#[cfg(feature = "parquet_encryption")]
306async fn get_file_decryption_properties(
307 state: &dyn Session,
308 options: &TableParquetOptions,
309 file_path: &Path,
310) -> Result<Option<Arc<FileDecryptionProperties>>> {
311 Ok(match &options.crypto.file_decryption {
312 Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))),
313 None => match &options.crypto.factory_id {
314 Some(factory_id) => {
315 let factory =
316 state.runtime_env().parquet_encryption_factory(factory_id)?;
317 factory
318 .get_file_decryption_properties(
319 &options.crypto.factory_options,
320 file_path,
321 )
322 .await?
323 }
324 None => None,
325 },
326 })
327}
328
329#[cfg(not(feature = "parquet_encryption"))]
330async fn get_file_decryption_properties(
331 _state: &dyn Session,
332 _options: &TableParquetOptions,
333 _file_path: &Path,
334) -> Result<Option<Arc<FileDecryptionProperties>>> {
335 Ok(None)
336}
337
338#[async_trait]
339impl FileFormat for ParquetFormat {
340 fn as_any(&self) -> &dyn Any {
341 self
342 }
343
344 fn get_ext(&self) -> String {
345 ParquetFormatFactory::new().get_ext()
346 }
347
348 fn get_ext_with_compression(
349 &self,
350 file_compression_type: &FileCompressionType,
351 ) -> Result<String> {
352 let ext = self.get_ext();
353 match file_compression_type.get_variant() {
354 CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
355 _ => internal_err!("Parquet FileFormat does not support compression."),
356 }
357 }
358
359 fn compression_type(&self) -> Option<FileCompressionType> {
360 None
361 }
362
363 async fn infer_schema(
364 &self,
365 state: &dyn Session,
366 store: &Arc<dyn ObjectStore>,
367 objects: &[ObjectMeta],
368 ) -> Result<SchemaRef> {
369 let coerce_int96 = match self.coerce_int96() {
370 Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
371 None => None,
372 };
373
374 let file_metadata_cache =
375 state.runtime_env().cache_manager.get_file_metadata_cache();
376
377 let mut schemas: Vec<_> = futures::stream::iter(objects)
378 .map(|object| async {
379 let file_decryption_properties = get_file_decryption_properties(
380 state,
381 &self.options,
382 &object.location,
383 )
384 .await?;
385 let result = DFParquetMetadata::new(store.as_ref(), object)
386 .with_metadata_size_hint(self.metadata_size_hint())
387 .with_decryption_properties(file_decryption_properties)
388 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
389 .with_coerce_int96(coerce_int96)
390 .fetch_schema_with_location()
391 .await?;
392 Ok::<_, DataFusionError>(result)
393 })
394 .boxed() .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
397 .try_collect()
398 .await?;
399
400 schemas
407 .sort_unstable_by(|(location1, _), (location2, _)| location1.cmp(location2));
408
409 let schemas = schemas.into_iter().map(|(_, schema)| schema);
410
411 let schema = if self.skip_metadata() {
412 Schema::try_merge(clear_metadata(schemas))
413 } else {
414 Schema::try_merge(schemas)
415 }?;
416
417 let schema = if self.binary_as_string() {
418 transform_binary_to_string(&schema)
419 } else {
420 schema
421 };
422
423 let schema = if self.force_view_types() {
424 transform_schema_to_view(&schema)
425 } else {
426 schema
427 };
428
429 Ok(Arc::new(schema))
430 }
431
432 async fn infer_stats(
433 &self,
434 state: &dyn Session,
435 store: &Arc<dyn ObjectStore>,
436 table_schema: SchemaRef,
437 object: &ObjectMeta,
438 ) -> Result<Statistics> {
439 let file_decryption_properties =
440 get_file_decryption_properties(state, &self.options, &object.location)
441 .await?;
442 let file_metadata_cache =
443 state.runtime_env().cache_manager.get_file_metadata_cache();
444 DFParquetMetadata::new(store, object)
445 .with_metadata_size_hint(self.metadata_size_hint())
446 .with_decryption_properties(file_decryption_properties)
447 .with_file_metadata_cache(Some(file_metadata_cache))
448 .fetch_statistics(&table_schema)
449 .await
450 }
451
452 async fn infer_ordering(
453 &self,
454 state: &dyn Session,
455 store: &Arc<dyn ObjectStore>,
456 table_schema: SchemaRef,
457 object: &ObjectMeta,
458 ) -> Result<Option<LexOrdering>> {
459 let file_decryption_properties =
460 get_file_decryption_properties(state, &self.options, &object.location)
461 .await?;
462 let file_metadata_cache =
463 state.runtime_env().cache_manager.get_file_metadata_cache();
464 let metadata = DFParquetMetadata::new(store, object)
465 .with_metadata_size_hint(self.metadata_size_hint())
466 .with_decryption_properties(file_decryption_properties)
467 .with_file_metadata_cache(Some(file_metadata_cache))
468 .fetch_metadata()
469 .await?;
470 crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)
471 }
472
473 async fn infer_stats_and_ordering(
474 &self,
475 state: &dyn Session,
476 store: &Arc<dyn ObjectStore>,
477 table_schema: SchemaRef,
478 object: &ObjectMeta,
479 ) -> Result<datafusion_datasource::file_format::FileMeta> {
480 let file_decryption_properties =
481 get_file_decryption_properties(state, &self.options, &object.location)
482 .await?;
483 let file_metadata_cache =
484 state.runtime_env().cache_manager.get_file_metadata_cache();
485 let metadata = DFParquetMetadata::new(store, object)
486 .with_metadata_size_hint(self.metadata_size_hint())
487 .with_decryption_properties(file_decryption_properties)
488 .with_file_metadata_cache(Some(file_metadata_cache))
489 .fetch_metadata()
490 .await?;
491 let statistics = DFParquetMetadata::statistics_from_parquet_metadata(
492 &metadata,
493 &table_schema,
494 )?;
495 let ordering =
496 crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)?;
497 Ok(
498 datafusion_datasource::file_format::FileMeta::new(statistics)
499 .with_ordering(ordering),
500 )
501 }
502
503 async fn create_physical_plan(
504 &self,
505 state: &dyn Session,
506 conf: FileScanConfig,
507 ) -> Result<Arc<dyn ExecutionPlan>> {
508 let mut metadata_size_hint = None;
509
510 if let Some(metadata) = self.metadata_size_hint() {
511 metadata_size_hint = Some(metadata);
512 }
513
514 let mut source = conf
515 .file_source()
516 .as_any()
517 .downcast_ref::<ParquetSource>()
518 .cloned()
519 .ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
520 source = source.with_table_parquet_options(self.options.clone());
521
522 let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
524 let store = state
525 .runtime_env()
526 .object_store(conf.object_store_url.clone())?;
527 let cached_parquet_read_factory =
528 Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
529 source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
530
531 if let Some(metadata_size_hint) = metadata_size_hint {
532 source = source.with_metadata_size_hint(metadata_size_hint)
533 }
534
535 source = self.set_source_encryption_factory(source, state)?;
536
537 let conf = FileScanConfigBuilder::from(conf)
538 .with_source(Arc::new(source))
539 .build();
540 Ok(DataSourceExec::from_data_source(conf))
541 }
542
543 async fn create_writer_physical_plan(
544 &self,
545 input: Arc<dyn ExecutionPlan>,
546 _state: &dyn Session,
547 conf: FileSinkConfig,
548 order_requirements: Option<LexRequirement>,
549 ) -> Result<Arc<dyn ExecutionPlan>> {
550 if conf.insert_op != InsertOp::Append {
551 return not_impl_err!("Overwrites are not implemented yet for Parquet");
552 }
553
554 let sorting_columns = if let Some(ref requirements) = order_requirements {
556 let ordering: LexOrdering = requirements.clone().into();
557 lex_ordering_to_sorting_columns(&ordering).ok()
562 } else {
563 None
564 };
565
566 let sink = Arc::new(
567 ParquetSink::new(conf, self.options.clone())
568 .with_sorting_columns(sorting_columns),
569 );
570
571 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
572 }
573
574 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
575 Arc::new(
576 ParquetSource::new(table_schema)
577 .with_table_parquet_options(self.options.clone()),
578 )
579 }
580}
581
582#[cfg(feature = "parquet_encryption")]
583impl ParquetFormat {
584 fn set_source_encryption_factory(
585 &self,
586 source: ParquetSource,
587 state: &dyn Session,
588 ) -> Result<ParquetSource> {
589 if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
590 Ok(source.with_encryption_factory(
591 state
592 .runtime_env()
593 .parquet_encryption_factory(encryption_factory_id)?,
594 ))
595 } else {
596 Ok(source)
597 }
598 }
599}
600
601#[cfg(not(feature = "parquet_encryption"))]
602impl ParquetFormat {
603 fn set_source_encryption_factory(
604 &self,
605 source: ParquetSource,
606 _state: &dyn Session,
607 ) -> Result<ParquetSource> {
608 if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
609 Err(DataFusionError::Configuration(format!(
610 "Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"
611 )))
612 } else {
613 Ok(source)
614 }
615 }
616}
617
618pub fn apply_file_schema_type_coercions(
634 table_schema: &Schema,
635 file_schema: &Schema,
636) -> Option<Schema> {
637 let mut needs_view_transform = false;
638 let mut needs_string_transform = false;
639
640 let table_fields: HashMap<_, _> = table_schema
643 .fields()
644 .iter()
645 .map(|f| {
646 let dt = f.data_type();
647 if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
649 needs_view_transform = true;
650 }
651 if matches!(
653 dt,
654 &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
655 ) {
656 needs_string_transform = true;
657 }
658
659 (f.name(), dt)
660 })
661 .collect();
662
663 if !needs_view_transform && !needs_string_transform {
665 return None;
666 }
667
668 let transformed_fields: Vec<Arc<Field>> = file_schema
669 .fields()
670 .iter()
671 .map(|field| {
672 let field_name = field.name();
673 let field_type = field.data_type();
674
675 if let Some(table_type) = table_fields.get(field_name) {
677 match (table_type, field_type) {
678 (
680 &DataType::Utf8,
681 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
682 ) => {
683 return field_with_new_type(field, DataType::Utf8);
684 }
685 (
687 &DataType::LargeUtf8,
688 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
689 ) => {
690 return field_with_new_type(field, DataType::LargeUtf8);
691 }
692 (
694 &DataType::Utf8View,
695 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
696 ) => {
697 return field_with_new_type(field, DataType::Utf8View);
698 }
699 (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
701 return field_with_new_type(field, DataType::Utf8View);
702 }
703 (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
704 return field_with_new_type(field, DataType::BinaryView);
705 }
706 _ => {}
707 }
708 }
709
710 Arc::clone(field)
712 })
713 .collect();
714
715 Some(Schema::new_with_metadata(
716 transformed_fields,
717 file_schema.metadata.clone(),
718 ))
719}
720
721pub fn coerce_int96_to_resolution(
723 parquet_schema: &SchemaDescriptor,
724 file_schema: &Schema,
725 time_unit: &TimeUnit,
726) -> Option<Schema> {
727 let int96_fields: HashSet<_> = parquet_schema
730 .columns()
731 .iter()
732 .filter(|f| f.physical_type() == Type::INT96)
733 .map(|f| f.path().string())
734 .collect();
735
736 if int96_fields.is_empty() {
737 return None;
739 }
740
741 type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
745 type StackContext<'a> = (
746 Vec<&'a str>, &'a FieldRef, NestedFields, Option<NestedFields>, );
758
759 let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
762
763 let transformed_schema = {
767 let mut stack: Vec<StackContext> = file_schema
769 .fields()
770 .iter()
771 .rev()
772 .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
773 .collect();
774
775 while let Some((parquet_path, current_field, parent_fields, child_fields)) =
777 stack.pop()
778 {
779 match (current_field.data_type(), child_fields) {
780 (DataType::Struct(unprocessed_children), None) => {
781 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
786 unprocessed_children.len(),
787 )));
788 stack.push((
791 parquet_path.clone(),
792 current_field,
793 parent_fields,
794 Some(Rc::clone(&child_fields)),
795 ));
796 for child in unprocessed_children.into_iter().rev() {
799 let mut child_path = parquet_path.clone();
800 child_path.push(".");
803 child_path.push(child.name());
804 stack.push((child_path, child, Rc::clone(&child_fields), None));
807 }
808 }
809 (DataType::Struct(unprocessed_children), Some(processed_children)) => {
810 let processed_children = processed_children.borrow();
814 assert_eq!(processed_children.len(), unprocessed_children.len());
815 let processed_struct = Field::new_struct(
816 current_field.name(),
817 processed_children.as_slice(),
818 current_field.is_nullable(),
819 );
820 parent_fields.borrow_mut().push(Arc::new(processed_struct));
821 }
822 (DataType::List(unprocessed_child), None) => {
823 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
825 stack.push((
826 parquet_path.clone(),
827 current_field,
828 parent_fields,
829 Some(Rc::clone(&child_fields)),
830 ));
831 let mut child_path = parquet_path.clone();
832 child_path.push(".list.");
836 child_path.push(unprocessed_child.name());
837 stack.push((
838 child_path.clone(),
839 unprocessed_child,
840 Rc::clone(&child_fields),
841 None,
842 ));
843 }
844 (DataType::List(_), Some(processed_children)) => {
845 let processed_children = processed_children.borrow();
847 assert_eq!(processed_children.len(), 1);
848 let processed_list = Field::new_list(
849 current_field.name(),
850 Arc::clone(&processed_children[0]),
851 current_field.is_nullable(),
852 );
853 parent_fields.borrow_mut().push(Arc::new(processed_list));
854 }
855 (DataType::Map(unprocessed_child, _), None) => {
856 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
858 stack.push((
859 parquet_path.clone(),
860 current_field,
861 parent_fields,
862 Some(Rc::clone(&child_fields)),
863 ));
864 let mut child_path = parquet_path.clone();
865 child_path.push(".");
866 child_path.push(unprocessed_child.name());
867 stack.push((
868 child_path.clone(),
869 unprocessed_child,
870 Rc::clone(&child_fields),
871 None,
872 ));
873 }
874 (DataType::Map(_, sorted), Some(processed_children)) => {
875 let processed_children = processed_children.borrow();
877 assert_eq!(processed_children.len(), 1);
878 let processed_map = Field::new(
879 current_field.name(),
880 DataType::Map(Arc::clone(&processed_children[0]), *sorted),
881 current_field.is_nullable(),
882 );
883 parent_fields.borrow_mut().push(Arc::new(processed_map));
884 }
885 (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
886 if int96_fields.contains(parquet_path.concat().as_str()) =>
887 {
890 parent_fields.borrow_mut().push(field_with_new_type(
891 current_field,
892 DataType::Timestamp(*time_unit, None),
893 ));
894 }
895 _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
897 }
898 }
899 assert_eq!(fields.borrow().len(), file_schema.fields.len());
900 Schema::new_with_metadata(
901 fields.borrow_mut().clone(),
902 file_schema.metadata.clone(),
903 )
904 };
905
906 Some(transformed_schema)
907}
908
909#[deprecated(
911 since = "47.0.0",
912 note = "Use `apply_file_schema_type_coercions` instead"
913)]
914pub fn coerce_file_schema_to_view_type(
915 table_schema: &Schema,
916 file_schema: &Schema,
917) -> Option<Schema> {
918 let mut transform = false;
919 let table_fields: HashMap<_, _> = table_schema
920 .fields
921 .iter()
922 .map(|f| {
923 let dt = f.data_type();
924 if dt.equals_datatype(&DataType::Utf8View)
925 || dt.equals_datatype(&DataType::BinaryView)
926 {
927 transform = true;
928 }
929 (f.name(), dt)
930 })
931 .collect();
932
933 if !transform {
934 return None;
935 }
936
937 let transformed_fields: Vec<Arc<Field>> = file_schema
938 .fields
939 .iter()
940 .map(
941 |field| match (table_fields.get(field.name()), field.data_type()) {
942 (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
943 field_with_new_type(field, DataType::Utf8View)
944 }
945 (
946 Some(DataType::BinaryView),
947 DataType::Binary | DataType::LargeBinary,
948 ) => field_with_new_type(field, DataType::BinaryView),
949 _ => Arc::clone(field),
950 },
951 )
952 .collect();
953
954 Some(Schema::new_with_metadata(
955 transformed_fields,
956 file_schema.metadata.clone(),
957 ))
958}
959
960#[deprecated(
964 since = "47.0.0",
965 note = "Use `apply_file_schema_type_coercions` instead"
966)]
967pub fn coerce_file_schema_to_string_type(
968 table_schema: &Schema,
969 file_schema: &Schema,
970) -> Option<Schema> {
971 let mut transform = false;
972 let table_fields: HashMap<_, _> = table_schema
973 .fields
974 .iter()
975 .map(|f| (f.name(), f.data_type()))
976 .collect();
977 let transformed_fields: Vec<Arc<Field>> = file_schema
978 .fields
979 .iter()
980 .map(
981 |field| match (table_fields.get(field.name()), field.data_type()) {
982 (
984 Some(DataType::Utf8),
985 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
986 ) => {
987 transform = true;
988 field_with_new_type(field, DataType::Utf8)
989 }
990 (
992 Some(DataType::LargeUtf8),
993 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
994 ) => {
995 transform = true;
996 field_with_new_type(field, DataType::LargeUtf8)
997 }
998 (
1000 Some(DataType::Utf8View),
1001 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
1002 ) => {
1003 transform = true;
1004 field_with_new_type(field, DataType::Utf8View)
1005 }
1006 _ => Arc::clone(field),
1007 },
1008 )
1009 .collect();
1010
1011 if !transform {
1012 None
1013 } else {
1014 Some(Schema::new_with_metadata(
1015 transformed_fields,
1016 file_schema.metadata.clone(),
1017 ))
1018 }
1019}
1020
1021fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
1024 Arc::new(field.as_ref().clone().with_data_type(new_type))
1025}
1026
1027pub fn transform_schema_to_view(schema: &Schema) -> Schema {
1031 let transformed_fields: Vec<Arc<Field>> = schema
1032 .fields
1033 .iter()
1034 .map(|field| match field.data_type() {
1035 DataType::Utf8 | DataType::LargeUtf8 => {
1036 field_with_new_type(field, DataType::Utf8View)
1037 }
1038 DataType::Binary | DataType::LargeBinary => {
1039 field_with_new_type(field, DataType::BinaryView)
1040 }
1041 _ => Arc::clone(field),
1042 })
1043 .collect();
1044 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
1045}
1046
1047pub fn transform_binary_to_string(schema: &Schema) -> Schema {
1049 let transformed_fields: Vec<Arc<Field>> = schema
1050 .fields
1051 .iter()
1052 .map(|field| match field.data_type() {
1053 DataType::Binary => field_with_new_type(field, DataType::Utf8),
1054 DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
1055 DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
1056 _ => Arc::clone(field),
1057 })
1058 .collect();
1059 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
1060}
1061
1062pub struct ObjectStoreFetch<'a> {
1064 store: &'a dyn ObjectStore,
1065 meta: &'a ObjectMeta,
1066}
1067
1068impl<'a> ObjectStoreFetch<'a> {
1069 pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
1070 Self { store, meta }
1071 }
1072}
1073
1074impl MetadataFetch for ObjectStoreFetch<'_> {
1075 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
1076 async {
1077 self.store
1078 .get_range(&self.meta.location, range)
1079 .await
1080 .map_err(ParquetError::from)
1081 }
1082 .boxed()
1083 }
1084}
1085
1086#[deprecated(
1093 since = "50.0.0",
1094 note = "Use `DFParquetMetadata::fetch_metadata` instead"
1095)]
1096pub async fn fetch_parquet_metadata(
1097 store: &dyn ObjectStore,
1098 object_meta: &ObjectMeta,
1099 size_hint: Option<usize>,
1100 decryption_properties: Option<&FileDecryptionProperties>,
1101 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1102) -> Result<Arc<ParquetMetaData>> {
1103 let decryption_properties = decryption_properties.cloned().map(Arc::new);
1104 DFParquetMetadata::new(store, object_meta)
1105 .with_metadata_size_hint(size_hint)
1106 .with_decryption_properties(decryption_properties)
1107 .with_file_metadata_cache(file_metadata_cache)
1108 .fetch_metadata()
1109 .await
1110}
1111
1112#[deprecated(
1116 since = "50.0.0",
1117 note = "Use `DFParquetMetadata::fetch_statistics` instead"
1118)]
1119pub async fn fetch_statistics(
1120 store: &dyn ObjectStore,
1121 table_schema: SchemaRef,
1122 file: &ObjectMeta,
1123 metadata_size_hint: Option<usize>,
1124 decryption_properties: Option<&FileDecryptionProperties>,
1125 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1126) -> Result<Statistics> {
1127 let decryption_properties = decryption_properties.cloned().map(Arc::new);
1128 DFParquetMetadata::new(store, file)
1129 .with_metadata_size_hint(metadata_size_hint)
1130 .with_decryption_properties(decryption_properties)
1131 .with_file_metadata_cache(file_metadata_cache)
1132 .fetch_statistics(&table_schema)
1133 .await
1134}
1135
1136#[deprecated(
1137 since = "50.0.0",
1138 note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
1139)]
1140#[expect(clippy::needless_pass_by_value)]
1141pub fn statistics_from_parquet_meta_calc(
1142 metadata: &ParquetMetaData,
1143 table_schema: SchemaRef,
1144) -> Result<Statistics> {
1145 DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
1146}
1147
1148pub struct ParquetSink {
1150 config: FileSinkConfig,
1152 parquet_options: TableParquetOptions,
1154 written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
1157 sorting_columns: Option<Vec<SortingColumn>>,
1159}
1160
1161impl Debug for ParquetSink {
1162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1163 f.debug_struct("ParquetSink").finish()
1164 }
1165}
1166
1167impl DisplayAs for ParquetSink {
1168 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1169 match t {
1170 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1171 write!(f, "ParquetSink(file_groups=",)?;
1172 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1173 write!(f, ")")
1174 }
1175 DisplayFormatType::TreeRender => {
1176 write!(f, "")
1178 }
1179 }
1180 }
1181}
1182
1183impl ParquetSink {
1184 pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1186 Self {
1187 config,
1188 parquet_options,
1189 written: Default::default(),
1190 sorting_columns: None,
1191 }
1192 }
1193
1194 pub fn with_sorting_columns(
1196 mut self,
1197 sorting_columns: Option<Vec<SortingColumn>>,
1198 ) -> Self {
1199 self.sorting_columns = sorting_columns;
1200 self
1201 }
1202
1203 pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
1206 self.written.lock().clone()
1207 }
1208
1209 async fn create_writer_props(
1212 &self,
1213 runtime: &Arc<RuntimeEnv>,
1214 path: &Path,
1215 ) -> Result<WriterProperties> {
1216 let schema = self.config.output_schema();
1217
1218 let mut parquet_opts = self.parquet_options.clone();
1221 if !self.parquet_options.global.skip_arrow_metadata {
1222 parquet_opts.arrow_schema(schema);
1223 }
1224
1225 let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1226
1227 if let Some(ref sorting_columns) = self.sorting_columns {
1229 builder = builder.set_sorting_columns(Some(sorting_columns.clone()));
1230 }
1231
1232 builder = set_writer_encryption_properties(
1233 builder,
1234 runtime,
1235 parquet_opts,
1236 schema,
1237 path,
1238 )
1239 .await?;
1240 Ok(builder.build())
1241 }
1242
1243 async fn create_async_arrow_writer(
1246 &self,
1247 location: &Path,
1248 object_store: Arc<dyn ObjectStore>,
1249 context: &Arc<TaskContext>,
1250 parquet_props: WriterProperties,
1251 ) -> Result<AsyncArrowWriter<BufWriter>> {
1252 let buf_writer = BufWriter::with_capacity(
1253 object_store,
1254 location.clone(),
1255 context
1256 .session_config()
1257 .options()
1258 .execution
1259 .objectstore_writer_buffer_size,
1260 );
1261 let options = ArrowWriterOptions::new()
1262 .with_properties(parquet_props)
1263 .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1264
1265 let writer = AsyncArrowWriter::try_new_with_options(
1266 buf_writer,
1267 get_writer_schema(&self.config),
1268 options,
1269 )?;
1270 Ok(writer)
1271 }
1272
1273 pub fn parquet_options(&self) -> &TableParquetOptions {
1275 &self.parquet_options
1276 }
1277}
1278
1279#[cfg(feature = "parquet_encryption")]
1280async fn set_writer_encryption_properties(
1281 builder: WriterPropertiesBuilder,
1282 runtime: &Arc<RuntimeEnv>,
1283 parquet_opts: TableParquetOptions,
1284 schema: &Arc<Schema>,
1285 path: &Path,
1286) -> Result<WriterPropertiesBuilder> {
1287 if let Some(file_encryption_properties) = parquet_opts.crypto.file_encryption {
1288 return Ok(builder.with_file_encryption_properties(Arc::new(
1290 FileEncryptionProperties::from(file_encryption_properties),
1291 )));
1292 } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
1293 let encryption_factory =
1295 runtime.parquet_encryption_factory(encryption_factory_id)?;
1296 let file_encryption_properties = encryption_factory
1297 .get_file_encryption_properties(
1298 &parquet_opts.crypto.factory_options,
1299 schema,
1300 path,
1301 )
1302 .await?;
1303 if let Some(file_encryption_properties) = file_encryption_properties {
1304 return Ok(
1305 builder.with_file_encryption_properties(file_encryption_properties)
1306 );
1307 }
1308 }
1309 Ok(builder)
1310}
1311
1312#[cfg(not(feature = "parquet_encryption"))]
1313async fn set_writer_encryption_properties(
1314 builder: WriterPropertiesBuilder,
1315 _runtime: &Arc<RuntimeEnv>,
1316 _parquet_opts: TableParquetOptions,
1317 _schema: &Arc<Schema>,
1318 _path: &Path,
1319) -> Result<WriterPropertiesBuilder> {
1320 Ok(builder)
1321}
1322
1323#[async_trait]
1324impl FileSink for ParquetSink {
1325 fn config(&self) -> &FileSinkConfig {
1326 &self.config
1327 }
1328
1329 async fn spawn_writer_tasks_and_join(
1330 &self,
1331 context: &Arc<TaskContext>,
1332 demux_task: SpawnedTask<Result<()>>,
1333 mut file_stream_rx: DemuxedStreamReceiver,
1334 object_store: Arc<dyn ObjectStore>,
1335 ) -> Result<u64> {
1336 let parquet_opts = &self.parquet_options;
1337
1338 let mut file_write_tasks: JoinSet<
1339 std::result::Result<(Path, ParquetMetaData), DataFusionError>,
1340 > = JoinSet::new();
1341
1342 let runtime = context.runtime_env();
1343 let parallel_options = ParallelParquetWriterOptions {
1344 max_parallel_row_groups: parquet_opts
1345 .global
1346 .maximum_parallel_row_group_writers,
1347 max_buffered_record_batches_per_stream: parquet_opts
1348 .global
1349 .maximum_buffered_record_batches_per_stream,
1350 };
1351
1352 while let Some((path, mut rx)) = file_stream_rx.recv().await {
1353 let parquet_props = self.create_writer_props(&runtime, &path).await?;
1354 if !parquet_opts.global.allow_single_file_parallelism {
1355 let mut writer = self
1356 .create_async_arrow_writer(
1357 &path,
1358 Arc::clone(&object_store),
1359 context,
1360 parquet_props.clone(),
1361 )
1362 .await?;
1363 let reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
1364 .register(context.memory_pool());
1365 file_write_tasks.spawn(async move {
1366 while let Some(batch) = rx.recv().await {
1367 writer.write(&batch).await?;
1368 reservation.try_resize(writer.memory_size())?;
1369 }
1370 let parquet_meta_data = writer
1371 .close()
1372 .await
1373 .map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1374 Ok((path, parquet_meta_data))
1375 });
1376 } else {
1377 let writer = ObjectWriterBuilder::new(
1378 FileCompressionType::UNCOMPRESSED,
1381 &path,
1382 Arc::clone(&object_store),
1383 )
1384 .with_buffer_size(Some(
1385 context
1386 .session_config()
1387 .options()
1388 .execution
1389 .objectstore_writer_buffer_size,
1390 ))
1391 .build()?;
1392 let schema = get_writer_schema(&self.config);
1393 let props = parquet_props.clone();
1394 let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
1395 let parallel_options_clone = parallel_options.clone();
1396 let pool = Arc::clone(context.memory_pool());
1397 file_write_tasks.spawn(async move {
1398 let parquet_meta_data = output_single_parquet_file_parallelized(
1399 writer,
1400 rx,
1401 schema,
1402 &props,
1403 skip_arrow_metadata,
1404 parallel_options_clone,
1405 pool,
1406 )
1407 .await?;
1408 Ok((path, parquet_meta_data))
1409 });
1410 }
1411 }
1412
1413 let mut row_count = 0;
1414 while let Some(result) = file_write_tasks.join_next().await {
1415 match result {
1416 Ok(r) => {
1417 let (path, parquet_meta_data) = r?;
1418 row_count += parquet_meta_data.file_metadata().num_rows();
1419 let mut written_files = self.written.lock();
1420 written_files
1421 .try_insert(path.clone(), parquet_meta_data)
1422 .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1423 drop(written_files);
1424 }
1425 Err(e) => {
1426 if e.is_panic() {
1427 std::panic::resume_unwind(e.into_panic());
1428 } else {
1429 unreachable!();
1430 }
1431 }
1432 }
1433 }
1434
1435 demux_task
1436 .join_unwind()
1437 .await
1438 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1439
1440 Ok(row_count as u64)
1441 }
1442}
1443
1444#[async_trait]
1445impl DataSink for ParquetSink {
1446 fn as_any(&self) -> &dyn Any {
1447 self
1448 }
1449
1450 fn schema(&self) -> &SchemaRef {
1451 self.config.output_schema()
1452 }
1453
1454 async fn write_all(
1455 &self,
1456 data: SendableRecordBatchStream,
1457 context: &Arc<TaskContext>,
1458 ) -> Result<u64> {
1459 FileSink::write_all(self, data, context).await
1460 }
1461}
1462
1463async fn column_serializer_task(
1466 mut rx: Receiver<ArrowLeafColumn>,
1467 mut writer: ArrowColumnWriter,
1468 reservation: MemoryReservation,
1469) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1470 while let Some(col) = rx.recv().await {
1471 writer.write(&col)?;
1472 reservation.try_resize(writer.memory_size())?;
1473 }
1474 Ok((writer, reservation))
1475}
1476
1477type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1478type ColSender = Sender<ArrowLeafColumn>;
1479
1480fn spawn_column_parallel_row_group_writer(
1484 col_writers: Vec<ArrowColumnWriter>,
1485 max_buffer_size: usize,
1486 pool: &Arc<dyn MemoryPool>,
1487) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1488 let num_columns = col_writers.len();
1489
1490 let mut col_writer_tasks = Vec::with_capacity(num_columns);
1491 let mut col_array_channels = Vec::with_capacity(num_columns);
1492 for writer in col_writers.into_iter() {
1493 let (send_array, receive_array) =
1495 mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1496 col_array_channels.push(send_array);
1497
1498 let reservation =
1499 MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1500 let task = SpawnedTask::spawn(column_serializer_task(
1501 receive_array,
1502 writer,
1503 reservation,
1504 ));
1505 col_writer_tasks.push(task);
1506 }
1507
1508 Ok((col_writer_tasks, col_array_channels))
1509}
1510
1511#[derive(Clone)]
1513struct ParallelParquetWriterOptions {
1514 max_parallel_row_groups: usize,
1515 max_buffered_record_batches_per_stream: usize,
1516}
1517
1518type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1521
1522async fn send_arrays_to_col_writers(
1525 col_array_channels: &[ColSender],
1526 rb: &RecordBatch,
1527 schema: Arc<Schema>,
1528) -> Result<()> {
1529 let mut next_channel = 0;
1531 for (array, field) in rb.columns().iter().zip(schema.fields()) {
1532 for c in compute_leaves(field, array)? {
1533 if col_array_channels[next_channel].send(c).await.is_err() {
1536 return Ok(());
1537 }
1538
1539 next_channel += 1;
1540 }
1541 }
1542
1543 Ok(())
1544}
1545
1546fn spawn_rg_join_and_finalize_task(
1549 column_writer_tasks: Vec<ColumnWriterTask>,
1550 rg_rows: usize,
1551 pool: &Arc<dyn MemoryPool>,
1552) -> SpawnedTask<RBStreamSerializeResult> {
1553 let rg_reservation =
1554 MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1555
1556 SpawnedTask::spawn(async move {
1557 let num_cols = column_writer_tasks.len();
1558 let mut finalized_rg = Vec::with_capacity(num_cols);
1559 for task in column_writer_tasks.into_iter() {
1560 let (writer, _col_reservation) = task
1561 .join_unwind()
1562 .await
1563 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1564 let encoded_size = writer.get_estimated_total_bytes();
1565 rg_reservation.grow(encoded_size);
1566 finalized_rg.push(writer.close()?);
1567 }
1568
1569 Ok((finalized_rg, rg_reservation, rg_rows))
1570 })
1571}
1572
1573fn spawn_parquet_parallel_serialization_task(
1582 row_group_writer_factory: ArrowRowGroupWriterFactory,
1583 mut data: Receiver<RecordBatch>,
1584 serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1585 schema: Arc<Schema>,
1586 writer_props: Arc<WriterProperties>,
1587 parallel_options: Arc<ParallelParquetWriterOptions>,
1588 pool: Arc<dyn MemoryPool>,
1589) -> SpawnedTask<Result<(), DataFusionError>> {
1590 SpawnedTask::spawn(async move {
1591 let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1592 let max_row_group_rows = writer_props
1593 .max_row_group_row_count()
1594 .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT);
1595 let mut row_group_index = 0;
1596 let col_writers =
1597 row_group_writer_factory.create_column_writers(row_group_index)?;
1598 let (mut column_writer_handles, mut col_array_channels) =
1599 spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
1600 let mut current_rg_rows = 0;
1601
1602 while let Some(mut rb) = data.recv().await {
1603 loop {
1607 if current_rg_rows + rb.num_rows() < max_row_group_rows {
1608 send_arrays_to_col_writers(
1609 &col_array_channels,
1610 &rb,
1611 Arc::clone(&schema),
1612 )
1613 .await?;
1614 current_rg_rows += rb.num_rows();
1615 break;
1616 } else {
1617 let rows_left = max_row_group_rows - current_rg_rows;
1618 let a = rb.slice(0, rows_left);
1619 send_arrays_to_col_writers(
1620 &col_array_channels,
1621 &a,
1622 Arc::clone(&schema),
1623 )
1624 .await?;
1625
1626 drop(col_array_channels);
1630 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1631 column_writer_handles,
1632 max_row_group_rows,
1633 &pool,
1634 );
1635
1636 if serialize_tx.send(finalize_rg_task).await.is_err() {
1639 return Ok(());
1640 }
1641
1642 current_rg_rows = 0;
1643 rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1644
1645 row_group_index += 1;
1646 let col_writers = row_group_writer_factory
1647 .create_column_writers(row_group_index)?;
1648 (column_writer_handles, col_array_channels) =
1649 spawn_column_parallel_row_group_writer(
1650 col_writers,
1651 max_buffer_rb,
1652 &pool,
1653 )?;
1654 }
1655 }
1656 }
1657
1658 drop(col_array_channels);
1659 if current_rg_rows > 0 {
1661 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1662 column_writer_handles,
1663 current_rg_rows,
1664 &pool,
1665 );
1666
1667 if serialize_tx.send(finalize_rg_task).await.is_err() {
1670 return Ok(());
1671 }
1672 }
1673
1674 Ok(())
1675 })
1676}
1677
1678async fn concatenate_parallel_row_groups(
1681 mut parquet_writer: SerializedFileWriter<SharedBuffer>,
1682 merged_buff: SharedBuffer,
1683 mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1684 mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1685 pool: Arc<dyn MemoryPool>,
1686) -> Result<ParquetMetaData> {
1687 let file_reservation =
1688 MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1689
1690 while let Some(task) = serialize_rx.recv().await {
1691 let result = task.join_unwind().await;
1692 let (serialized_columns, rg_reservation, _cnt) =
1693 result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1694
1695 let mut rg_out = parquet_writer.next_row_group()?;
1696 for chunk in serialized_columns {
1697 chunk.append_to_row_group(&mut rg_out)?;
1698 rg_reservation.free();
1699
1700 let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1701 file_reservation.try_resize(buff_to_flush.len())?;
1702
1703 if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1704 object_store_writer
1705 .write_all(buff_to_flush.as_slice())
1706 .await?;
1707 buff_to_flush.clear();
1708 file_reservation.try_resize(buff_to_flush.len())?; }
1710 }
1711 rg_out.close()?;
1712 }
1713
1714 let parquet_meta_data = parquet_writer.close()?;
1715 let final_buff = merged_buff.buffer.try_lock().unwrap();
1716
1717 object_store_writer.write_all(final_buff.as_slice()).await?;
1718 object_store_writer.shutdown().await?;
1719 file_reservation.free();
1720
1721 Ok(parquet_meta_data)
1722}
1723
1724async fn output_single_parquet_file_parallelized(
1729 object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1730 data: Receiver<RecordBatch>,
1731 output_schema: Arc<Schema>,
1732 parquet_props: &WriterProperties,
1733 skip_arrow_metadata: bool,
1734 parallel_options: ParallelParquetWriterOptions,
1735 pool: Arc<dyn MemoryPool>,
1736) -> Result<ParquetMetaData> {
1737 let max_rowgroups = parallel_options.max_parallel_row_groups;
1738 let (serialize_tx, serialize_rx) =
1740 mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1741
1742 let arc_props = Arc::new(parquet_props.clone());
1743 let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1744 let options = ArrowWriterOptions::new()
1745 .with_properties(parquet_props.clone())
1746 .with_skip_arrow_metadata(skip_arrow_metadata);
1747 let writer = ArrowWriter::try_new_with_options(
1748 merged_buff.clone(),
1749 Arc::clone(&output_schema),
1750 options,
1751 )?;
1752 let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
1753
1754 let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1755 row_group_writer_factory,
1756 data,
1757 serialize_tx,
1758 Arc::clone(&output_schema),
1759 Arc::clone(&arc_props),
1760 parallel_options.into(),
1761 Arc::clone(&pool),
1762 );
1763 let parquet_meta_data = concatenate_parallel_row_groups(
1764 writer,
1765 merged_buff,
1766 serialize_rx,
1767 object_store_writer,
1768 pool,
1769 )
1770 .await?;
1771
1772 launch_serialization_task
1773 .join_unwind()
1774 .await
1775 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1776 Ok(parquet_meta_data)
1777}
1778
1779#[cfg(test)]
1780mod tests {
1781 use parquet::arrow::parquet_to_arrow_schema;
1782 use std::sync::Arc;
1783
1784 use super::*;
1785
1786 use arrow::datatypes::DataType;
1787 use parquet::schema::parser::parse_message_type;
1788
1789 #[test]
1790 fn coerce_int96_to_resolution_with_mixed_timestamps() {
1791 let spark_schema = "
1795 message spark_schema {
1796 optional int96 c0;
1797 optional int64 c1 (TIMESTAMP(NANOS,true));
1798 optional int64 c2 (TIMESTAMP(NANOS,false));
1799 optional int64 c3 (TIMESTAMP(MILLIS,true));
1800 optional int64 c4 (TIMESTAMP(MILLIS,false));
1801 optional int64 c5 (TIMESTAMP(MICROS,true));
1802 optional int64 c6 (TIMESTAMP(MICROS,false));
1803 }
1804 ";
1805
1806 let schema = parse_message_type(spark_schema).expect("should parse schema");
1807 let descr = SchemaDescriptor::new(Arc::new(schema));
1808
1809 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1810
1811 let result =
1812 coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1813 .unwrap();
1814
1815 let expected_schema = Schema::new(vec![
1818 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1819 Field::new(
1820 "c1",
1821 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1822 true,
1823 ),
1824 Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1825 Field::new(
1826 "c3",
1827 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1828 true,
1829 ),
1830 Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1831 Field::new(
1832 "c5",
1833 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1834 true,
1835 ),
1836 Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1837 ]);
1838
1839 assert_eq!(result, expected_schema);
1840 }
1841
1842 #[test]
1843 fn coerce_int96_to_resolution_with_nested_types() {
1844 let spark_schema = "
1849 message spark_schema {
1850 optional int96 c0;
1851 optional group c1 {
1852 optional int96 c0;
1853 }
1854 optional group c2 {
1855 optional group c0 (LIST) {
1856 repeated group list {
1857 optional int96 element;
1858 }
1859 }
1860 }
1861 optional group c3 (LIST) {
1862 repeated group list {
1863 optional int96 element;
1864 }
1865 }
1866 optional group c4 (LIST) {
1867 repeated group list {
1868 optional group element {
1869 optional int96 c0;
1870 optional int96 c1;
1871 }
1872 }
1873 }
1874 optional group c5 (MAP) {
1875 repeated group key_value {
1876 required int96 key;
1877 optional int96 value;
1878 }
1879 }
1880 optional group c6 (LIST) {
1881 repeated group list {
1882 optional group element (MAP) {
1883 repeated group key_value {
1884 required int96 key;
1885 optional int96 value;
1886 }
1887 }
1888 }
1889 }
1890 }
1891 ";
1892
1893 let schema = parse_message_type(spark_schema).expect("should parse schema");
1894 let descr = SchemaDescriptor::new(Arc::new(schema));
1895
1896 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1897
1898 let result =
1899 coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1900 .unwrap();
1901
1902 let expected_schema = Schema::new(vec![
1903 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1904 Field::new_struct(
1905 "c1",
1906 vec![Field::new(
1907 "c0",
1908 DataType::Timestamp(TimeUnit::Microsecond, None),
1909 true,
1910 )],
1911 true,
1912 ),
1913 Field::new_struct(
1914 "c2",
1915 vec![Field::new_list(
1916 "c0",
1917 Field::new(
1918 "element",
1919 DataType::Timestamp(TimeUnit::Microsecond, None),
1920 true,
1921 ),
1922 true,
1923 )],
1924 true,
1925 ),
1926 Field::new_list(
1927 "c3",
1928 Field::new(
1929 "element",
1930 DataType::Timestamp(TimeUnit::Microsecond, None),
1931 true,
1932 ),
1933 true,
1934 ),
1935 Field::new_list(
1936 "c4",
1937 Field::new_struct(
1938 "element",
1939 vec![
1940 Field::new(
1941 "c0",
1942 DataType::Timestamp(TimeUnit::Microsecond, None),
1943 true,
1944 ),
1945 Field::new(
1946 "c1",
1947 DataType::Timestamp(TimeUnit::Microsecond, None),
1948 true,
1949 ),
1950 ],
1951 true,
1952 ),
1953 true,
1954 ),
1955 Field::new_map(
1956 "c5",
1957 "key_value",
1958 Field::new(
1959 "key",
1960 DataType::Timestamp(TimeUnit::Microsecond, None),
1961 false,
1962 ),
1963 Field::new(
1964 "value",
1965 DataType::Timestamp(TimeUnit::Microsecond, None),
1966 true,
1967 ),
1968 false,
1969 true,
1970 ),
1971 Field::new_list(
1972 "c6",
1973 Field::new_map(
1974 "element",
1975 "key_value",
1976 Field::new(
1977 "key",
1978 DataType::Timestamp(TimeUnit::Microsecond, None),
1979 false,
1980 ),
1981 Field::new(
1982 "value",
1983 DataType::Timestamp(TimeUnit::Microsecond, None),
1984 true,
1985 ),
1986 false,
1987 true,
1988 ),
1989 true,
1990 ),
1991 ]);
1992
1993 assert_eq!(result, expected_schema);
1994 }
1995}