1use std::any::Any;
21use std::cell::RefCell;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{fmt, vec};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
32use datafusion_datasource::write::{
33 get_writer_schema, ObjectWriterBuilder, SharedBuffer,
34};
35
36use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
37use datafusion_datasource::write::demux::DemuxedStreamReceiver;
38
39use arrow::datatypes::{DataType, Field, FieldRef};
40use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
41#[cfg(feature = "parquet_encryption")]
42use datafusion_common::encryption::map_config_decryption_to_decryption;
43use datafusion_common::encryption::FileDecryptionProperties;
44use datafusion_common::parsers::CompressionTypeVariant;
45use datafusion_common::{
46 internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt,
47 HashSet, Result, DEFAULT_PARQUET_EXTENSION,
48};
49use datafusion_common::{HashMap, Statistics};
50use datafusion_common_runtime::{JoinSet, SpawnedTask};
51use datafusion_datasource::display::FileGroupDisplay;
52use datafusion_datasource::file::FileSource;
53use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
54use datafusion_datasource::sink::{DataSink, DataSinkExec};
55use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
56use datafusion_execution::{SendableRecordBatchStream, TaskContext};
57use datafusion_expr::dml::InsertOp;
58use datafusion_physical_expr_common::sort_expr::LexRequirement;
59use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
60use datafusion_session::Session;
61
62use crate::reader::CachedParquetFileReaderFactory;
63use crate::source::{parse_coerce_int96_string, ParquetSource};
64use async_trait::async_trait;
65use bytes::Bytes;
66use datafusion_datasource::source::DataSourceExec;
67use datafusion_execution::runtime_env::RuntimeEnv;
68use futures::future::BoxFuture;
69use futures::{FutureExt, StreamExt, TryStreamExt};
70use object_store::buffered::BufWriter;
71use object_store::path::Path;
72use object_store::{ObjectMeta, ObjectStore};
73use parquet::arrow::arrow_writer::{
74 compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
75 ArrowLeafColumn, ArrowWriterOptions,
76};
77use parquet::arrow::async_reader::MetadataFetch;
78use parquet::arrow::{ArrowSchemaConverter, AsyncArrowWriter};
79use parquet::basic::Type;
80
81use crate::metadata::DFParquetMetadata;
82use datafusion_execution::cache::cache_manager::FileMetadataCache;
83use parquet::errors::ParquetError;
84use parquet::file::metadata::ParquetMetaData;
85use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
86use parquet::file::writer::SerializedFileWriter;
87use parquet::format::FileMetaData;
88use parquet::schema::types::SchemaDescriptor;
89use tokio::io::{AsyncWrite, AsyncWriteExt};
90use tokio::sync::mpsc::{self, Receiver, Sender};
91
92const INITIAL_BUFFER_BYTES: usize = 1048576;
95
96const BUFFER_FLUSH_BYTES: usize = 1024000;
99
100#[derive(Default)]
101pub struct ParquetFormatFactory {
103 pub options: Option<TableParquetOptions>,
105}
106
107impl ParquetFormatFactory {
108 pub fn new() -> Self {
110 Self { options: None }
111 }
112
113 pub fn new_with_options(options: TableParquetOptions) -> Self {
115 Self {
116 options: Some(options),
117 }
118 }
119}
120
121impl FileFormatFactory for ParquetFormatFactory {
122 fn create(
123 &self,
124 state: &dyn Session,
125 format_options: &std::collections::HashMap<String, String>,
126 ) -> Result<Arc<dyn FileFormat>> {
127 let parquet_options = match &self.options {
128 None => {
129 let mut table_options = state.default_table_options();
130 table_options.set_config_format(ConfigFileType::PARQUET);
131 table_options.alter_with_string_hash_map(format_options)?;
132 table_options.parquet
133 }
134 Some(parquet_options) => {
135 let mut parquet_options = parquet_options.clone();
136 for (k, v) in format_options {
137 parquet_options.set(k, v)?;
138 }
139 parquet_options
140 }
141 };
142
143 Ok(Arc::new(
144 ParquetFormat::default().with_options(parquet_options),
145 ))
146 }
147
148 fn default(&self) -> Arc<dyn FileFormat> {
149 Arc::new(ParquetFormat::default())
150 }
151
152 fn as_any(&self) -> &dyn Any {
153 self
154 }
155}
156
157impl GetExt for ParquetFormatFactory {
158 fn get_ext(&self) -> String {
159 DEFAULT_PARQUET_EXTENSION[1..].to_string()
161 }
162}
163
164impl Debug for ParquetFormatFactory {
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 f.debug_struct("ParquetFormatFactory")
167 .field("ParquetFormatFactory", &self.options)
168 .finish()
169 }
170}
171#[derive(Debug, Default)]
173pub struct ParquetFormat {
174 options: TableParquetOptions,
175}
176
177impl ParquetFormat {
178 pub fn new() -> Self {
180 Self::default()
181 }
182
183 pub fn with_enable_pruning(mut self, enable: bool) -> Self {
186 self.options.global.pruning = enable;
187 self
188 }
189
190 pub fn enable_pruning(&self) -> bool {
192 self.options.global.pruning
193 }
194
195 pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
202 self.options.global.metadata_size_hint = size_hint;
203 self
204 }
205
206 pub fn metadata_size_hint(&self) -> Option<usize> {
208 self.options.global.metadata_size_hint
209 }
210
211 pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
217 self.options.global.skip_metadata = skip_metadata;
218 self
219 }
220
221 pub fn skip_metadata(&self) -> bool {
224 self.options.global.skip_metadata
225 }
226
227 pub fn with_options(mut self, options: TableParquetOptions) -> Self {
229 self.options = options;
230 self
231 }
232
233 pub fn options(&self) -> &TableParquetOptions {
235 &self.options
236 }
237
238 pub fn force_view_types(&self) -> bool {
250 self.options.global.schema_force_view_types
251 }
252
253 pub fn with_force_view_types(mut self, use_views: bool) -> Self {
255 self.options.global.schema_force_view_types = use_views;
256 self
257 }
258
259 pub fn binary_as_string(&self) -> bool {
268 self.options.global.binary_as_string
269 }
270
271 pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
273 self.options.global.binary_as_string = binary_as_string;
274 self
275 }
276
277 pub fn coerce_int96(&self) -> Option<String> {
278 self.options.global.coerce_int96.clone()
279 }
280
281 pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
282 self.options.global.coerce_int96 = time_unit;
283 self
284 }
285}
286
287fn clear_metadata(
290 schemas: impl IntoIterator<Item = Schema>,
291) -> impl Iterator<Item = Schema> {
292 schemas.into_iter().map(|schema| {
293 let fields = schema
294 .fields()
295 .iter()
296 .map(|field| {
297 field.as_ref().clone().with_metadata(Default::default()) })
299 .collect::<Fields>();
300 Schema::new(fields)
301 })
302}
303
304#[cfg(feature = "parquet_encryption")]
305async fn get_file_decryption_properties(
306 state: &dyn Session,
307 options: &TableParquetOptions,
308 file_path: &Path,
309) -> Result<Option<FileDecryptionProperties>> {
310 let file_decryption_properties: Option<FileDecryptionProperties> =
311 match &options.crypto.file_decryption {
312 Some(cfd) => Some(map_config_decryption_to_decryption(cfd)),
313 None => match &options.crypto.factory_id {
314 Some(factory_id) => {
315 let factory =
316 state.runtime_env().parquet_encryption_factory(factory_id)?;
317 factory
318 .get_file_decryption_properties(
319 &options.crypto.factory_options,
320 file_path,
321 )
322 .await?
323 }
324 None => None,
325 },
326 };
327 Ok(file_decryption_properties)
328}
329
330#[cfg(not(feature = "parquet_encryption"))]
331async fn get_file_decryption_properties(
332 _state: &dyn Session,
333 _options: &TableParquetOptions,
334 _file_path: &Path,
335) -> Result<Option<FileDecryptionProperties>> {
336 Ok(None)
337}
338
339#[async_trait]
340impl FileFormat for ParquetFormat {
341 fn as_any(&self) -> &dyn Any {
342 self
343 }
344
345 fn get_ext(&self) -> String {
346 ParquetFormatFactory::new().get_ext()
347 }
348
349 fn get_ext_with_compression(
350 &self,
351 file_compression_type: &FileCompressionType,
352 ) -> Result<String> {
353 let ext = self.get_ext();
354 match file_compression_type.get_variant() {
355 CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
356 _ => internal_err!("Parquet FileFormat does not support compression."),
357 }
358 }
359
360 fn compression_type(&self) -> Option<FileCompressionType> {
361 None
362 }
363
364 async fn infer_schema(
365 &self,
366 state: &dyn Session,
367 store: &Arc<dyn ObjectStore>,
368 objects: &[ObjectMeta],
369 ) -> Result<SchemaRef> {
370 let coerce_int96 = match self.coerce_int96() {
371 Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
372 None => None,
373 };
374
375 let file_metadata_cache =
376 state.runtime_env().cache_manager.get_file_metadata_cache();
377
378 let mut schemas: Vec<_> = futures::stream::iter(objects)
379 .map(|object| async {
380 let file_decryption_properties = get_file_decryption_properties(
381 state,
382 &self.options,
383 &object.location,
384 )
385 .await?;
386 let result = DFParquetMetadata::new(store.as_ref(), object)
387 .with_metadata_size_hint(self.metadata_size_hint())
388 .with_decryption_properties(file_decryption_properties.as_ref())
389 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
390 .with_coerce_int96(coerce_int96)
391 .fetch_schema_with_location()
392 .await?;
393 Ok::<_, DataFusionError>(result)
394 })
395 .boxed() .buffered(state.config_options().execution.meta_fetch_concurrency)
398 .try_collect()
399 .await?;
400
401 schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
408
409 let schemas = schemas
410 .into_iter()
411 .map(|(_, schema)| schema)
412 .collect::<Vec<_>>();
413
414 let schema = if self.skip_metadata() {
415 Schema::try_merge(clear_metadata(schemas))
416 } else {
417 Schema::try_merge(schemas)
418 }?;
419
420 let schema = if self.binary_as_string() {
421 transform_binary_to_string(&schema)
422 } else {
423 schema
424 };
425
426 let schema = if self.force_view_types() {
427 transform_schema_to_view(&schema)
428 } else {
429 schema
430 };
431
432 Ok(Arc::new(schema))
433 }
434
435 async fn infer_stats(
436 &self,
437 state: &dyn Session,
438 store: &Arc<dyn ObjectStore>,
439 table_schema: SchemaRef,
440 object: &ObjectMeta,
441 ) -> Result<Statistics> {
442 let file_decryption_properties =
443 get_file_decryption_properties(state, &self.options, &object.location)
444 .await?;
445 let file_metadata_cache =
446 state.runtime_env().cache_manager.get_file_metadata_cache();
447 DFParquetMetadata::new(store, object)
448 .with_metadata_size_hint(self.metadata_size_hint())
449 .with_decryption_properties(file_decryption_properties.as_ref())
450 .with_file_metadata_cache(Some(file_metadata_cache))
451 .fetch_statistics(&table_schema)
452 .await
453 }
454
455 async fn create_physical_plan(
456 &self,
457 state: &dyn Session,
458 conf: FileScanConfig,
459 ) -> Result<Arc<dyn ExecutionPlan>> {
460 let mut metadata_size_hint = None;
461
462 if let Some(metadata) = self.metadata_size_hint() {
463 metadata_size_hint = Some(metadata);
464 }
465
466 let mut source = ParquetSource::new(self.options.clone());
467
468 let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
470 let store = state
471 .runtime_env()
472 .object_store(conf.object_store_url.clone())?;
473 let cached_parquet_read_factory =
474 Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
475 source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
476
477 if let Some(metadata_size_hint) = metadata_size_hint {
478 source = source.with_metadata_size_hint(metadata_size_hint)
479 }
480
481 source = self.set_source_encryption_factory(source, state)?;
482
483 let file_source = source.apply_schema_adapter(&conf)?;
485
486 let conf = FileScanConfigBuilder::from(conf)
487 .with_source(file_source)
488 .build();
489 Ok(DataSourceExec::from_data_source(conf))
490 }
491
492 async fn create_writer_physical_plan(
493 &self,
494 input: Arc<dyn ExecutionPlan>,
495 _state: &dyn Session,
496 conf: FileSinkConfig,
497 order_requirements: Option<LexRequirement>,
498 ) -> Result<Arc<dyn ExecutionPlan>> {
499 if conf.insert_op != InsertOp::Append {
500 return not_impl_err!("Overwrites are not implemented yet for Parquet");
501 }
502
503 let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
504
505 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
506 }
507
508 fn file_source(&self) -> Arc<dyn FileSource> {
509 Arc::new(ParquetSource::default())
510 }
511}
512
513#[cfg(feature = "parquet_encryption")]
514impl ParquetFormat {
515 fn set_source_encryption_factory(
516 &self,
517 source: ParquetSource,
518 state: &dyn Session,
519 ) -> Result<ParquetSource> {
520 if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
521 Ok(source.with_encryption_factory(
522 state
523 .runtime_env()
524 .parquet_encryption_factory(encryption_factory_id)?,
525 ))
526 } else {
527 Ok(source)
528 }
529 }
530}
531
532#[cfg(not(feature = "parquet_encryption"))]
533impl ParquetFormat {
534 fn set_source_encryption_factory(
535 &self,
536 source: ParquetSource,
537 _state: &dyn Session,
538 ) -> Result<ParquetSource> {
539 if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
540 Err(DataFusionError::Configuration(
541 format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled")))
542 } else {
543 Ok(source)
544 }
545 }
546}
547
548pub fn apply_file_schema_type_coercions(
564 table_schema: &Schema,
565 file_schema: &Schema,
566) -> Option<Schema> {
567 let mut needs_view_transform = false;
568 let mut needs_string_transform = false;
569
570 let table_fields: HashMap<_, _> = table_schema
573 .fields()
574 .iter()
575 .map(|f| {
576 let dt = f.data_type();
577 if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
579 needs_view_transform = true;
580 }
581 if matches!(
583 dt,
584 &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
585 ) {
586 needs_string_transform = true;
587 }
588
589 (f.name(), dt)
590 })
591 .collect();
592
593 if !needs_view_transform && !needs_string_transform {
595 return None;
596 }
597
598 let transformed_fields: Vec<Arc<Field>> = file_schema
599 .fields()
600 .iter()
601 .map(|field| {
602 let field_name = field.name();
603 let field_type = field.data_type();
604
605 if let Some(table_type) = table_fields.get(field_name) {
607 match (table_type, field_type) {
608 (
610 &DataType::Utf8,
611 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
612 ) => {
613 return field_with_new_type(field, DataType::Utf8);
614 }
615 (
617 &DataType::LargeUtf8,
618 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
619 ) => {
620 return field_with_new_type(field, DataType::LargeUtf8);
621 }
622 (
624 &DataType::Utf8View,
625 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
626 ) => {
627 return field_with_new_type(field, DataType::Utf8View);
628 }
629 (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
631 return field_with_new_type(field, DataType::Utf8View);
632 }
633 (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
634 return field_with_new_type(field, DataType::BinaryView);
635 }
636 _ => {}
637 }
638 }
639
640 Arc::clone(field)
642 })
643 .collect();
644
645 Some(Schema::new_with_metadata(
646 transformed_fields,
647 file_schema.metadata.clone(),
648 ))
649}
650
651pub fn coerce_int96_to_resolution(
653 parquet_schema: &SchemaDescriptor,
654 file_schema: &Schema,
655 time_unit: &TimeUnit,
656) -> Option<Schema> {
657 let int96_fields: HashSet<_> = parquet_schema
660 .columns()
661 .iter()
662 .filter(|f| f.physical_type() == Type::INT96)
663 .map(|f| f.path().string())
664 .collect();
665
666 if int96_fields.is_empty() {
667 return None;
669 }
670
671 type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
675 type StackContext<'a> = (
676 Vec<&'a str>, &'a FieldRef, NestedFields, Option<NestedFields>, );
688
689 let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
692
693 let transformed_schema = {
697 let mut stack: Vec<StackContext> = file_schema
699 .fields()
700 .iter()
701 .rev()
702 .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
703 .collect();
704
705 while let Some((parquet_path, current_field, parent_fields, child_fields)) =
707 stack.pop()
708 {
709 match (current_field.data_type(), child_fields) {
710 (DataType::Struct(unprocessed_children), None) => {
711 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
716 unprocessed_children.len(),
717 )));
718 stack.push((
721 parquet_path.clone(),
722 current_field,
723 parent_fields,
724 Some(Rc::clone(&child_fields)),
725 ));
726 for child in unprocessed_children.into_iter().rev() {
729 let mut child_path = parquet_path.clone();
730 child_path.push(".");
733 child_path.push(child.name());
734 stack.push((child_path, child, Rc::clone(&child_fields), None));
737 }
738 }
739 (DataType::Struct(unprocessed_children), Some(processed_children)) => {
740 let processed_children = processed_children.borrow();
744 assert_eq!(processed_children.len(), unprocessed_children.len());
745 let processed_struct = Field::new_struct(
746 current_field.name(),
747 processed_children.as_slice(),
748 current_field.is_nullable(),
749 );
750 parent_fields.borrow_mut().push(Arc::new(processed_struct));
751 }
752 (DataType::List(unprocessed_child), None) => {
753 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
755 stack.push((
756 parquet_path.clone(),
757 current_field,
758 parent_fields,
759 Some(Rc::clone(&child_fields)),
760 ));
761 let mut child_path = parquet_path.clone();
762 child_path.push(".list.");
766 child_path.push(unprocessed_child.name());
767 stack.push((
768 child_path.clone(),
769 unprocessed_child,
770 Rc::clone(&child_fields),
771 None,
772 ));
773 }
774 (DataType::List(_), Some(processed_children)) => {
775 let processed_children = processed_children.borrow();
777 assert_eq!(processed_children.len(), 1);
778 let processed_list = Field::new_list(
779 current_field.name(),
780 Arc::clone(&processed_children[0]),
781 current_field.is_nullable(),
782 );
783 parent_fields.borrow_mut().push(Arc::new(processed_list));
784 }
785 (DataType::Map(unprocessed_child, _), None) => {
786 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
788 stack.push((
789 parquet_path.clone(),
790 current_field,
791 parent_fields,
792 Some(Rc::clone(&child_fields)),
793 ));
794 let mut child_path = parquet_path.clone();
795 child_path.push(".");
796 child_path.push(unprocessed_child.name());
797 stack.push((
798 child_path.clone(),
799 unprocessed_child,
800 Rc::clone(&child_fields),
801 None,
802 ));
803 }
804 (DataType::Map(_, sorted), Some(processed_children)) => {
805 let processed_children = processed_children.borrow();
807 assert_eq!(processed_children.len(), 1);
808 let processed_map = Field::new(
809 current_field.name(),
810 DataType::Map(Arc::clone(&processed_children[0]), *sorted),
811 current_field.is_nullable(),
812 );
813 parent_fields.borrow_mut().push(Arc::new(processed_map));
814 }
815 (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
816 if int96_fields.contains(parquet_path.concat().as_str()) =>
817 {
820 parent_fields.borrow_mut().push(field_with_new_type(
821 current_field,
822 DataType::Timestamp(*time_unit, None),
823 ));
824 }
825 _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
827 }
828 }
829 assert_eq!(fields.borrow().len(), file_schema.fields.len());
830 Schema::new_with_metadata(
831 fields.borrow_mut().clone(),
832 file_schema.metadata.clone(),
833 )
834 };
835
836 Some(transformed_schema)
837}
838
839#[deprecated(
841 since = "47.0.0",
842 note = "Use `apply_file_schema_type_coercions` instead"
843)]
844pub fn coerce_file_schema_to_view_type(
845 table_schema: &Schema,
846 file_schema: &Schema,
847) -> Option<Schema> {
848 let mut transform = false;
849 let table_fields: HashMap<_, _> = table_schema
850 .fields
851 .iter()
852 .map(|f| {
853 let dt = f.data_type();
854 if dt.equals_datatype(&DataType::Utf8View)
855 || dt.equals_datatype(&DataType::BinaryView)
856 {
857 transform = true;
858 }
859 (f.name(), dt)
860 })
861 .collect();
862
863 if !transform {
864 return None;
865 }
866
867 let transformed_fields: Vec<Arc<Field>> = file_schema
868 .fields
869 .iter()
870 .map(
871 |field| match (table_fields.get(field.name()), field.data_type()) {
872 (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
873 field_with_new_type(field, DataType::Utf8View)
874 }
875 (
876 Some(DataType::BinaryView),
877 DataType::Binary | DataType::LargeBinary,
878 ) => field_with_new_type(field, DataType::BinaryView),
879 _ => Arc::clone(field),
880 },
881 )
882 .collect();
883
884 Some(Schema::new_with_metadata(
885 transformed_fields,
886 file_schema.metadata.clone(),
887 ))
888}
889
890#[deprecated(
894 since = "47.0.0",
895 note = "Use `apply_file_schema_type_coercions` instead"
896)]
897pub fn coerce_file_schema_to_string_type(
898 table_schema: &Schema,
899 file_schema: &Schema,
900) -> Option<Schema> {
901 let mut transform = false;
902 let table_fields: HashMap<_, _> = table_schema
903 .fields
904 .iter()
905 .map(|f| (f.name(), f.data_type()))
906 .collect();
907 let transformed_fields: Vec<Arc<Field>> = file_schema
908 .fields
909 .iter()
910 .map(
911 |field| match (table_fields.get(field.name()), field.data_type()) {
912 (
914 Some(DataType::Utf8),
915 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
916 ) => {
917 transform = true;
918 field_with_new_type(field, DataType::Utf8)
919 }
920 (
922 Some(DataType::LargeUtf8),
923 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
924 ) => {
925 transform = true;
926 field_with_new_type(field, DataType::LargeUtf8)
927 }
928 (
930 Some(DataType::Utf8View),
931 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
932 ) => {
933 transform = true;
934 field_with_new_type(field, DataType::Utf8View)
935 }
936 _ => Arc::clone(field),
937 },
938 )
939 .collect();
940
941 if !transform {
942 None
943 } else {
944 Some(Schema::new_with_metadata(
945 transformed_fields,
946 file_schema.metadata.clone(),
947 ))
948 }
949}
950
951fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
954 Arc::new(field.as_ref().clone().with_data_type(new_type))
955}
956
957pub fn transform_schema_to_view(schema: &Schema) -> Schema {
961 let transformed_fields: Vec<Arc<Field>> = schema
962 .fields
963 .iter()
964 .map(|field| match field.data_type() {
965 DataType::Utf8 | DataType::LargeUtf8 => {
966 field_with_new_type(field, DataType::Utf8View)
967 }
968 DataType::Binary | DataType::LargeBinary => {
969 field_with_new_type(field, DataType::BinaryView)
970 }
971 _ => Arc::clone(field),
972 })
973 .collect();
974 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
975}
976
977pub fn transform_binary_to_string(schema: &Schema) -> Schema {
979 let transformed_fields: Vec<Arc<Field>> = schema
980 .fields
981 .iter()
982 .map(|field| match field.data_type() {
983 DataType::Binary => field_with_new_type(field, DataType::Utf8),
984 DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
985 DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
986 _ => Arc::clone(field),
987 })
988 .collect();
989 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
990}
991
992pub struct ObjectStoreFetch<'a> {
994 store: &'a dyn ObjectStore,
995 meta: &'a ObjectMeta,
996}
997
998impl<'a> ObjectStoreFetch<'a> {
999 pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
1000 Self { store, meta }
1001 }
1002}
1003
1004impl MetadataFetch for ObjectStoreFetch<'_> {
1005 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
1006 async {
1007 self.store
1008 .get_range(&self.meta.location, range)
1009 .await
1010 .map_err(ParquetError::from)
1011 }
1012 .boxed()
1013 }
1014}
1015
1016#[deprecated(
1023 since = "50.0.0",
1024 note = "Use `DFParquetMetadata::fetch_metadata` instead"
1025)]
1026pub async fn fetch_parquet_metadata(
1027 store: &dyn ObjectStore,
1028 object_meta: &ObjectMeta,
1029 size_hint: Option<usize>,
1030 #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>,
1031 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1032) -> Result<Arc<ParquetMetaData>> {
1033 DFParquetMetadata::new(store, object_meta)
1034 .with_metadata_size_hint(size_hint)
1035 .with_decryption_properties(decryption_properties)
1036 .with_file_metadata_cache(file_metadata_cache)
1037 .fetch_metadata()
1038 .await
1039}
1040
1041#[deprecated(
1045 since = "50.0.0",
1046 note = "Use `DFParquetMetadata::fetch_statistics` instead"
1047)]
1048pub async fn fetch_statistics(
1049 store: &dyn ObjectStore,
1050 table_schema: SchemaRef,
1051 file: &ObjectMeta,
1052 metadata_size_hint: Option<usize>,
1053 decryption_properties: Option<&FileDecryptionProperties>,
1054 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1055) -> Result<Statistics> {
1056 DFParquetMetadata::new(store, file)
1057 .with_metadata_size_hint(metadata_size_hint)
1058 .with_decryption_properties(decryption_properties)
1059 .with_file_metadata_cache(file_metadata_cache)
1060 .fetch_statistics(&table_schema)
1061 .await
1062}
1063
1064#[deprecated(
1065 since = "50.0.0",
1066 note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
1067)]
1068pub fn statistics_from_parquet_meta_calc(
1069 metadata: &ParquetMetaData,
1070 table_schema: SchemaRef,
1071) -> Result<Statistics> {
1072 DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
1073}
1074
1075pub struct ParquetSink {
1077 config: FileSinkConfig,
1079 parquet_options: TableParquetOptions,
1081 written: Arc<parking_lot::Mutex<HashMap<Path, FileMetaData>>>,
1084}
1085
1086impl Debug for ParquetSink {
1087 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088 f.debug_struct("ParquetSink").finish()
1089 }
1090}
1091
1092impl DisplayAs for ParquetSink {
1093 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094 match t {
1095 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1096 write!(f, "ParquetSink(file_groups=",)?;
1097 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1098 write!(f, ")")
1099 }
1100 DisplayFormatType::TreeRender => {
1101 write!(f, "")
1103 }
1104 }
1105 }
1106}
1107
1108impl ParquetSink {
1109 pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1111 Self {
1112 config,
1113 parquet_options,
1114 written: Default::default(),
1115 }
1116 }
1117
1118 pub fn written(&self) -> HashMap<Path, FileMetaData> {
1121 self.written.lock().clone()
1122 }
1123
1124 async fn create_writer_props(
1127 &self,
1128 runtime: &Arc<RuntimeEnv>,
1129 path: &Path,
1130 ) -> Result<WriterProperties> {
1131 let schema = if self.parquet_options.global.allow_single_file_parallelism {
1132 &get_writer_schema(&self.config)
1136 } else {
1137 self.config.output_schema()
1138 };
1139
1140 let mut parquet_opts = self.parquet_options.clone();
1143 if !self.parquet_options.global.skip_arrow_metadata {
1144 parquet_opts.arrow_schema(schema);
1145 }
1146
1147 let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1148 builder = set_writer_encryption_properties(
1149 builder,
1150 runtime,
1151 &parquet_opts,
1152 schema,
1153 path,
1154 )
1155 .await?;
1156 Ok(builder.build())
1157 }
1158
1159 async fn create_async_arrow_writer(
1162 &self,
1163 location: &Path,
1164 object_store: Arc<dyn ObjectStore>,
1165 context: &Arc<TaskContext>,
1166 parquet_props: WriterProperties,
1167 ) -> Result<AsyncArrowWriter<BufWriter>> {
1168 let buf_writer = BufWriter::with_capacity(
1169 object_store,
1170 location.clone(),
1171 context
1172 .session_config()
1173 .options()
1174 .execution
1175 .objectstore_writer_buffer_size,
1176 );
1177 let options = ArrowWriterOptions::new()
1178 .with_properties(parquet_props)
1179 .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1180
1181 let writer = AsyncArrowWriter::try_new_with_options(
1182 buf_writer,
1183 get_writer_schema(&self.config),
1184 options,
1185 )?;
1186 Ok(writer)
1187 }
1188
1189 pub fn parquet_options(&self) -> &TableParquetOptions {
1191 &self.parquet_options
1192 }
1193}
1194
1195#[cfg(feature = "parquet_encryption")]
1196async fn set_writer_encryption_properties(
1197 builder: WriterPropertiesBuilder,
1198 runtime: &Arc<RuntimeEnv>,
1199 parquet_opts: &TableParquetOptions,
1200 schema: &Arc<Schema>,
1201 path: &Path,
1202) -> Result<WriterPropertiesBuilder> {
1203 if let Some(file_encryption_properties) = &parquet_opts.crypto.file_encryption {
1204 return Ok(builder
1206 .with_file_encryption_properties(file_encryption_properties.clone().into()));
1207 } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
1208 let encryption_factory =
1210 runtime.parquet_encryption_factory(encryption_factory_id)?;
1211 let file_encryption_properties = encryption_factory
1212 .get_file_encryption_properties(
1213 &parquet_opts.crypto.factory_options,
1214 schema,
1215 path,
1216 )
1217 .await?;
1218 if let Some(file_encryption_properties) = file_encryption_properties {
1219 return Ok(
1220 builder.with_file_encryption_properties(file_encryption_properties)
1221 );
1222 }
1223 }
1224 Ok(builder)
1225}
1226
1227#[cfg(not(feature = "parquet_encryption"))]
1228async fn set_writer_encryption_properties(
1229 builder: WriterPropertiesBuilder,
1230 _runtime: &Arc<RuntimeEnv>,
1231 _parquet_opts: &TableParquetOptions,
1232 _schema: &Arc<Schema>,
1233 _path: &Path,
1234) -> Result<WriterPropertiesBuilder> {
1235 Ok(builder)
1236}
1237
1238#[async_trait]
1239impl FileSink for ParquetSink {
1240 fn config(&self) -> &FileSinkConfig {
1241 &self.config
1242 }
1243
1244 async fn spawn_writer_tasks_and_join(
1245 &self,
1246 context: &Arc<TaskContext>,
1247 demux_task: SpawnedTask<Result<()>>,
1248 mut file_stream_rx: DemuxedStreamReceiver,
1249 object_store: Arc<dyn ObjectStore>,
1250 ) -> Result<u64> {
1251 let parquet_opts = &self.parquet_options;
1252 let mut allow_single_file_parallelism =
1253 parquet_opts.global.allow_single_file_parallelism;
1254
1255 if parquet_opts.crypto.file_encryption.is_some()
1256 || parquet_opts.crypto.factory_id.is_some()
1257 {
1258 allow_single_file_parallelism = false;
1261 }
1262
1263 let mut file_write_tasks: JoinSet<
1264 std::result::Result<(Path, FileMetaData), DataFusionError>,
1265 > = JoinSet::new();
1266
1267 let runtime = context.runtime_env();
1268 let parallel_options = ParallelParquetWriterOptions {
1269 max_parallel_row_groups: parquet_opts
1270 .global
1271 .maximum_parallel_row_group_writers,
1272 max_buffered_record_batches_per_stream: parquet_opts
1273 .global
1274 .maximum_buffered_record_batches_per_stream,
1275 };
1276
1277 while let Some((path, mut rx)) = file_stream_rx.recv().await {
1278 let parquet_props = self.create_writer_props(&runtime, &path).await?;
1279 if !allow_single_file_parallelism {
1280 let mut writer = self
1281 .create_async_arrow_writer(
1282 &path,
1283 Arc::clone(&object_store),
1284 context,
1285 parquet_props.clone(),
1286 )
1287 .await?;
1288 let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
1289 .register(context.memory_pool());
1290 file_write_tasks.spawn(async move {
1291 while let Some(batch) = rx.recv().await {
1292 writer.write(&batch).await?;
1293 reservation.try_resize(writer.memory_size())?;
1294 }
1295 let file_metadata = writer
1296 .close()
1297 .await
1298 .map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1299 Ok((path, file_metadata))
1300 });
1301 } else {
1302 let writer = ObjectWriterBuilder::new(
1303 FileCompressionType::UNCOMPRESSED,
1306 &path,
1307 Arc::clone(&object_store),
1308 )
1309 .with_buffer_size(Some(
1310 context
1311 .session_config()
1312 .options()
1313 .execution
1314 .objectstore_writer_buffer_size,
1315 ))
1316 .build()?;
1317 let schema = get_writer_schema(&self.config);
1318 let props = parquet_props.clone();
1319 let parallel_options_clone = parallel_options.clone();
1320 let pool = Arc::clone(context.memory_pool());
1321 file_write_tasks.spawn(async move {
1322 let file_metadata = output_single_parquet_file_parallelized(
1323 writer,
1324 rx,
1325 schema,
1326 &props,
1327 parallel_options_clone,
1328 pool,
1329 )
1330 .await?;
1331 Ok((path, file_metadata))
1332 });
1333 }
1334 }
1335
1336 let mut row_count = 0;
1337 while let Some(result) = file_write_tasks.join_next().await {
1338 match result {
1339 Ok(r) => {
1340 let (path, file_metadata) = r?;
1341 row_count += file_metadata.num_rows;
1342 let mut written_files = self.written.lock();
1343 written_files
1344 .try_insert(path.clone(), file_metadata)
1345 .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1346 drop(written_files);
1347 }
1348 Err(e) => {
1349 if e.is_panic() {
1350 std::panic::resume_unwind(e.into_panic());
1351 } else {
1352 unreachable!();
1353 }
1354 }
1355 }
1356 }
1357
1358 demux_task
1359 .join_unwind()
1360 .await
1361 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1362
1363 Ok(row_count as u64)
1364 }
1365}
1366
1367#[async_trait]
1368impl DataSink for ParquetSink {
1369 fn as_any(&self) -> &dyn Any {
1370 self
1371 }
1372
1373 fn schema(&self) -> &SchemaRef {
1374 self.config.output_schema()
1375 }
1376
1377 async fn write_all(
1378 &self,
1379 data: SendableRecordBatchStream,
1380 context: &Arc<TaskContext>,
1381 ) -> Result<u64> {
1382 FileSink::write_all(self, data, context).await
1383 }
1384}
1385
1386async fn column_serializer_task(
1389 mut rx: Receiver<ArrowLeafColumn>,
1390 mut writer: ArrowColumnWriter,
1391 mut reservation: MemoryReservation,
1392) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1393 while let Some(col) = rx.recv().await {
1394 writer.write(&col)?;
1395 reservation.try_resize(writer.memory_size())?;
1396 }
1397 Ok((writer, reservation))
1398}
1399
1400type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1401type ColSender = Sender<ArrowLeafColumn>;
1402
1403fn spawn_column_parallel_row_group_writer(
1407 schema: Arc<Schema>,
1408 parquet_props: Arc<WriterProperties>,
1409 max_buffer_size: usize,
1410 pool: &Arc<dyn MemoryPool>,
1411) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1412 let schema_desc = ArrowSchemaConverter::new().convert(&schema)?;
1413 let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
1414 let num_columns = col_writers.len();
1415
1416 let mut col_writer_tasks = Vec::with_capacity(num_columns);
1417 let mut col_array_channels = Vec::with_capacity(num_columns);
1418 for writer in col_writers.into_iter() {
1419 let (send_array, receive_array) =
1421 mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1422 col_array_channels.push(send_array);
1423
1424 let reservation =
1425 MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1426 let task = SpawnedTask::spawn(column_serializer_task(
1427 receive_array,
1428 writer,
1429 reservation,
1430 ));
1431 col_writer_tasks.push(task);
1432 }
1433
1434 Ok((col_writer_tasks, col_array_channels))
1435}
1436
1437#[derive(Clone)]
1439struct ParallelParquetWriterOptions {
1440 max_parallel_row_groups: usize,
1441 max_buffered_record_batches_per_stream: usize,
1442}
1443
1444type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1447
1448async fn send_arrays_to_col_writers(
1451 col_array_channels: &[ColSender],
1452 rb: &RecordBatch,
1453 schema: Arc<Schema>,
1454) -> Result<()> {
1455 let mut next_channel = 0;
1457 for (array, field) in rb.columns().iter().zip(schema.fields()) {
1458 for c in compute_leaves(field, array)? {
1459 if col_array_channels[next_channel].send(c).await.is_err() {
1462 return Ok(());
1463 }
1464
1465 next_channel += 1;
1466 }
1467 }
1468
1469 Ok(())
1470}
1471
1472fn spawn_rg_join_and_finalize_task(
1475 column_writer_tasks: Vec<ColumnWriterTask>,
1476 rg_rows: usize,
1477 pool: &Arc<dyn MemoryPool>,
1478) -> SpawnedTask<RBStreamSerializeResult> {
1479 let mut rg_reservation =
1480 MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1481
1482 SpawnedTask::spawn(async move {
1483 let num_cols = column_writer_tasks.len();
1484 let mut finalized_rg = Vec::with_capacity(num_cols);
1485 for task in column_writer_tasks.into_iter() {
1486 let (writer, _col_reservation) = task
1487 .join_unwind()
1488 .await
1489 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1490 let encoded_size = writer.get_estimated_total_bytes();
1491 rg_reservation.grow(encoded_size);
1492 finalized_rg.push(writer.close()?);
1493 }
1494
1495 Ok((finalized_rg, rg_reservation, rg_rows))
1496 })
1497}
1498
1499fn spawn_parquet_parallel_serialization_task(
1508 mut data: Receiver<RecordBatch>,
1509 serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1510 schema: Arc<Schema>,
1511 writer_props: Arc<WriterProperties>,
1512 parallel_options: ParallelParquetWriterOptions,
1513 pool: Arc<dyn MemoryPool>,
1514) -> SpawnedTask<Result<(), DataFusionError>> {
1515 SpawnedTask::spawn(async move {
1516 let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1517 let max_row_group_rows = writer_props.max_row_group_size();
1518 let (mut column_writer_handles, mut col_array_channels) =
1519 spawn_column_parallel_row_group_writer(
1520 Arc::clone(&schema),
1521 Arc::clone(&writer_props),
1522 max_buffer_rb,
1523 &pool,
1524 )?;
1525 let mut current_rg_rows = 0;
1526
1527 while let Some(mut rb) = data.recv().await {
1528 loop {
1532 if current_rg_rows + rb.num_rows() < max_row_group_rows {
1533 send_arrays_to_col_writers(
1534 &col_array_channels,
1535 &rb,
1536 Arc::clone(&schema),
1537 )
1538 .await?;
1539 current_rg_rows += rb.num_rows();
1540 break;
1541 } else {
1542 let rows_left = max_row_group_rows - current_rg_rows;
1543 let a = rb.slice(0, rows_left);
1544 send_arrays_to_col_writers(
1545 &col_array_channels,
1546 &a,
1547 Arc::clone(&schema),
1548 )
1549 .await?;
1550
1551 drop(col_array_channels);
1555 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1556 column_writer_handles,
1557 max_row_group_rows,
1558 &pool,
1559 );
1560
1561 if serialize_tx.send(finalize_rg_task).await.is_err() {
1564 return Ok(());
1565 }
1566
1567 current_rg_rows = 0;
1568 rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1569
1570 (column_writer_handles, col_array_channels) =
1571 spawn_column_parallel_row_group_writer(
1572 Arc::clone(&schema),
1573 Arc::clone(&writer_props),
1574 max_buffer_rb,
1575 &pool,
1576 )?;
1577 }
1578 }
1579 }
1580
1581 drop(col_array_channels);
1582 if current_rg_rows > 0 {
1584 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1585 column_writer_handles,
1586 current_rg_rows,
1587 &pool,
1588 );
1589
1590 if serialize_tx.send(finalize_rg_task).await.is_err() {
1593 return Ok(());
1594 }
1595 }
1596
1597 Ok(())
1598 })
1599}
1600
1601async fn concatenate_parallel_row_groups(
1604 mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1605 schema: Arc<Schema>,
1606 writer_props: Arc<WriterProperties>,
1607 mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1608 pool: Arc<dyn MemoryPool>,
1609) -> Result<FileMetaData> {
1610 let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1611
1612 let mut file_reservation =
1613 MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1614
1615 let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?;
1616 let mut parquet_writer = SerializedFileWriter::new(
1617 merged_buff.clone(),
1618 schema_desc.root_schema_ptr(),
1619 writer_props,
1620 )?;
1621
1622 while let Some(task) = serialize_rx.recv().await {
1623 let result = task.join_unwind().await;
1624 let mut rg_out = parquet_writer.next_row_group()?;
1625 let (serialized_columns, mut rg_reservation, _cnt) =
1626 result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1627 for chunk in serialized_columns {
1628 chunk.append_to_row_group(&mut rg_out)?;
1629 rg_reservation.free();
1630
1631 let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1632 file_reservation.try_resize(buff_to_flush.len())?;
1633
1634 if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1635 object_store_writer
1636 .write_all(buff_to_flush.as_slice())
1637 .await?;
1638 buff_to_flush.clear();
1639 file_reservation.try_resize(buff_to_flush.len())?; }
1641 }
1642 rg_out.close()?;
1643 }
1644
1645 let file_metadata = parquet_writer.close()?;
1646 let final_buff = merged_buff.buffer.try_lock().unwrap();
1647
1648 object_store_writer.write_all(final_buff.as_slice()).await?;
1649 object_store_writer.shutdown().await?;
1650 file_reservation.free();
1651
1652 Ok(file_metadata)
1653}
1654
1655async fn output_single_parquet_file_parallelized(
1660 object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1661 data: Receiver<RecordBatch>,
1662 output_schema: Arc<Schema>,
1663 parquet_props: &WriterProperties,
1664 parallel_options: ParallelParquetWriterOptions,
1665 pool: Arc<dyn MemoryPool>,
1666) -> Result<FileMetaData> {
1667 let max_rowgroups = parallel_options.max_parallel_row_groups;
1668 let (serialize_tx, serialize_rx) =
1670 mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1671
1672 let arc_props = Arc::new(parquet_props.clone());
1673 let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1674 data,
1675 serialize_tx,
1676 Arc::clone(&output_schema),
1677 Arc::clone(&arc_props),
1678 parallel_options,
1679 Arc::clone(&pool),
1680 );
1681 let file_metadata = concatenate_parallel_row_groups(
1682 serialize_rx,
1683 Arc::clone(&output_schema),
1684 Arc::clone(&arc_props),
1685 object_store_writer,
1686 pool,
1687 )
1688 .await?;
1689
1690 launch_serialization_task
1691 .join_unwind()
1692 .await
1693 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1694 Ok(file_metadata)
1695}
1696
1697#[cfg(test)]
1698mod tests {
1699 use parquet::arrow::parquet_to_arrow_schema;
1700 use std::sync::Arc;
1701
1702 use super::*;
1703
1704 use arrow::datatypes::DataType;
1705 use parquet::schema::parser::parse_message_type;
1706
1707 #[test]
1708 fn coerce_int96_to_resolution_with_mixed_timestamps() {
1709 let spark_schema = "
1713 message spark_schema {
1714 optional int96 c0;
1715 optional int64 c1 (TIMESTAMP(NANOS,true));
1716 optional int64 c2 (TIMESTAMP(NANOS,false));
1717 optional int64 c3 (TIMESTAMP(MILLIS,true));
1718 optional int64 c4 (TIMESTAMP(MILLIS,false));
1719 optional int64 c5 (TIMESTAMP(MICROS,true));
1720 optional int64 c6 (TIMESTAMP(MICROS,false));
1721 }
1722 ";
1723
1724 let schema = parse_message_type(spark_schema).expect("should parse schema");
1725 let descr = SchemaDescriptor::new(Arc::new(schema));
1726
1727 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1728
1729 let result =
1730 coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1731 .unwrap();
1732
1733 let expected_schema = Schema::new(vec![
1736 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1737 Field::new(
1738 "c1",
1739 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1740 true,
1741 ),
1742 Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1743 Field::new(
1744 "c3",
1745 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1746 true,
1747 ),
1748 Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1749 Field::new(
1750 "c5",
1751 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1752 true,
1753 ),
1754 Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1755 ]);
1756
1757 assert_eq!(result, expected_schema);
1758 }
1759
1760 #[test]
1761 fn coerce_int96_to_resolution_with_nested_types() {
1762 let spark_schema = "
1767 message spark_schema {
1768 optional int96 c0;
1769 optional group c1 {
1770 optional int96 c0;
1771 }
1772 optional group c2 {
1773 optional group c0 (LIST) {
1774 repeated group list {
1775 optional int96 element;
1776 }
1777 }
1778 }
1779 optional group c3 (LIST) {
1780 repeated group list {
1781 optional int96 element;
1782 }
1783 }
1784 optional group c4 (LIST) {
1785 repeated group list {
1786 optional group element {
1787 optional int96 c0;
1788 optional int96 c1;
1789 }
1790 }
1791 }
1792 optional group c5 (MAP) {
1793 repeated group key_value {
1794 required int96 key;
1795 optional int96 value;
1796 }
1797 }
1798 optional group c6 (LIST) {
1799 repeated group list {
1800 optional group element (MAP) {
1801 repeated group key_value {
1802 required int96 key;
1803 optional int96 value;
1804 }
1805 }
1806 }
1807 }
1808 }
1809 ";
1810
1811 let schema = parse_message_type(spark_schema).expect("should parse schema");
1812 let descr = SchemaDescriptor::new(Arc::new(schema));
1813
1814 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1815
1816 let result =
1817 coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1818 .unwrap();
1819
1820 let expected_schema = Schema::new(vec![
1821 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1822 Field::new_struct(
1823 "c1",
1824 vec![Field::new(
1825 "c0",
1826 DataType::Timestamp(TimeUnit::Microsecond, None),
1827 true,
1828 )],
1829 true,
1830 ),
1831 Field::new_struct(
1832 "c2",
1833 vec![Field::new_list(
1834 "c0",
1835 Field::new(
1836 "element",
1837 DataType::Timestamp(TimeUnit::Microsecond, None),
1838 true,
1839 ),
1840 true,
1841 )],
1842 true,
1843 ),
1844 Field::new_list(
1845 "c3",
1846 Field::new(
1847 "element",
1848 DataType::Timestamp(TimeUnit::Microsecond, None),
1849 true,
1850 ),
1851 true,
1852 ),
1853 Field::new_list(
1854 "c4",
1855 Field::new_struct(
1856 "element",
1857 vec![
1858 Field::new(
1859 "c0",
1860 DataType::Timestamp(TimeUnit::Microsecond, None),
1861 true,
1862 ),
1863 Field::new(
1864 "c1",
1865 DataType::Timestamp(TimeUnit::Microsecond, None),
1866 true,
1867 ),
1868 ],
1869 true,
1870 ),
1871 true,
1872 ),
1873 Field::new_map(
1874 "c5",
1875 "key_value",
1876 Field::new(
1877 "key",
1878 DataType::Timestamp(TimeUnit::Microsecond, None),
1879 false,
1880 ),
1881 Field::new(
1882 "value",
1883 DataType::Timestamp(TimeUnit::Microsecond, None),
1884 true,
1885 ),
1886 false,
1887 true,
1888 ),
1889 Field::new_list(
1890 "c6",
1891 Field::new_map(
1892 "element",
1893 "key_value",
1894 Field::new(
1895 "key",
1896 DataType::Timestamp(TimeUnit::Microsecond, None),
1897 false,
1898 ),
1899 Field::new(
1900 "value",
1901 DataType::Timestamp(TimeUnit::Microsecond, None),
1902 true,
1903 ),
1904 false,
1905 true,
1906 ),
1907 true,
1908 ),
1909 ]);
1910
1911 assert_eq!(result, expected_schema);
1912 }
1913}