1use std::any::Any;
21use std::cell::RefCell;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{fmt, vec};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
30use datafusion_datasource::TableSchema;
31use datafusion_datasource::file_compression_type::FileCompressionType;
32use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
33use datafusion_datasource::write::{
34 ObjectWriterBuilder, SharedBuffer, get_writer_schema,
35};
36
37use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
38use datafusion_datasource::write::demux::DemuxedStreamReceiver;
39
40use arrow::datatypes::{DataType, Field, FieldRef};
41use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
42use datafusion_common::encryption::FileDecryptionProperties;
43use datafusion_common::parsers::CompressionTypeVariant;
44use datafusion_common::{
45 DEFAULT_PARQUET_EXTENSION, DataFusionError, GetExt, HashSet, Result,
46 internal_datafusion_err, internal_err, not_impl_err,
47};
48use datafusion_common::{HashMap, Statistics};
49use datafusion_common_runtime::{JoinSet, SpawnedTask};
50use datafusion_datasource::display::FileGroupDisplay;
51use datafusion_datasource::file::FileSource;
52use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
53use datafusion_datasource::sink::{DataSink, DataSinkExec};
54use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
55use datafusion_execution::{SendableRecordBatchStream, TaskContext};
56use datafusion_expr::dml::InsertOp;
57use datafusion_physical_expr_common::sort_expr::LexRequirement;
58use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
59use datafusion_session::Session;
60
61use crate::metadata::DFParquetMetadata;
62use crate::reader::CachedParquetFileReaderFactory;
63use crate::source::{ParquetSource, parse_coerce_int96_string};
64use async_trait::async_trait;
65use bytes::Bytes;
66use datafusion_datasource::source::DataSourceExec;
67use datafusion_execution::cache::cache_manager::FileMetadataCache;
68use datafusion_execution::runtime_env::RuntimeEnv;
69use futures::future::BoxFuture;
70use futures::{FutureExt, StreamExt, TryStreamExt};
71use object_store::buffered::BufWriter;
72use object_store::path::Path;
73use object_store::{ObjectMeta, ObjectStore};
74use parquet::arrow::arrow_writer::{
75 ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory,
76 ArrowWriterOptions, compute_leaves,
77};
78use parquet::arrow::async_reader::MetadataFetch;
79use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
80use parquet::basic::Type;
81#[cfg(feature = "parquet_encryption")]
82use parquet::encryption::encrypt::FileEncryptionProperties;
83use parquet::errors::ParquetError;
84use parquet::file::metadata::ParquetMetaData;
85use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
86use parquet::file::writer::SerializedFileWriter;
87use parquet::schema::types::SchemaDescriptor;
88use tokio::io::{AsyncWrite, AsyncWriteExt};
89use tokio::sync::mpsc::{self, Receiver, Sender};
90
91const INITIAL_BUFFER_BYTES: usize = 1048576;
94
95const BUFFER_FLUSH_BYTES: usize = 1024000;
98
99#[derive(Default)]
100pub struct ParquetFormatFactory {
102 pub options: Option<TableParquetOptions>,
104}
105
106impl ParquetFormatFactory {
107 pub fn new() -> Self {
109 Self { options: None }
110 }
111
112 pub fn new_with_options(options: TableParquetOptions) -> Self {
114 Self {
115 options: Some(options),
116 }
117 }
118}
119
120impl FileFormatFactory for ParquetFormatFactory {
121 fn create(
122 &self,
123 state: &dyn Session,
124 format_options: &std::collections::HashMap<String, String>,
125 ) -> Result<Arc<dyn FileFormat>> {
126 let parquet_options = match &self.options {
127 None => {
128 let mut table_options = state.default_table_options();
129 table_options.set_config_format(ConfigFileType::PARQUET);
130 table_options.alter_with_string_hash_map(format_options)?;
131 table_options.parquet
132 }
133 Some(parquet_options) => {
134 let mut parquet_options = parquet_options.clone();
135 for (k, v) in format_options {
136 parquet_options.set(k, v)?;
137 }
138 parquet_options
139 }
140 };
141
142 Ok(Arc::new(
143 ParquetFormat::default().with_options(parquet_options),
144 ))
145 }
146
147 fn default(&self) -> Arc<dyn FileFormat> {
148 Arc::new(ParquetFormat::default())
149 }
150
151 fn as_any(&self) -> &dyn Any {
152 self
153 }
154}
155
156impl GetExt for ParquetFormatFactory {
157 fn get_ext(&self) -> String {
158 DEFAULT_PARQUET_EXTENSION[1..].to_string()
160 }
161}
162
163impl Debug for ParquetFormatFactory {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 f.debug_struct("ParquetFormatFactory")
166 .field("ParquetFormatFactory", &self.options)
167 .finish()
168 }
169}
170#[derive(Debug, Default)]
172pub struct ParquetFormat {
173 options: TableParquetOptions,
174}
175
176impl ParquetFormat {
177 pub fn new() -> Self {
179 Self::default()
180 }
181
182 pub fn with_enable_pruning(mut self, enable: bool) -> Self {
185 self.options.global.pruning = enable;
186 self
187 }
188
189 pub fn enable_pruning(&self) -> bool {
191 self.options.global.pruning
192 }
193
194 pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
201 self.options.global.metadata_size_hint = size_hint;
202 self
203 }
204
205 pub fn metadata_size_hint(&self) -> Option<usize> {
207 self.options.global.metadata_size_hint
208 }
209
210 pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
216 self.options.global.skip_metadata = skip_metadata;
217 self
218 }
219
220 pub fn skip_metadata(&self) -> bool {
223 self.options.global.skip_metadata
224 }
225
226 pub fn with_options(mut self, options: TableParquetOptions) -> Self {
228 self.options = options;
229 self
230 }
231
232 pub fn options(&self) -> &TableParquetOptions {
234 &self.options
235 }
236
237 pub fn force_view_types(&self) -> bool {
249 self.options.global.schema_force_view_types
250 }
251
252 pub fn with_force_view_types(mut self, use_views: bool) -> Self {
254 self.options.global.schema_force_view_types = use_views;
255 self
256 }
257
258 pub fn binary_as_string(&self) -> bool {
267 self.options.global.binary_as_string
268 }
269
270 pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
272 self.options.global.binary_as_string = binary_as_string;
273 self
274 }
275
276 pub fn coerce_int96(&self) -> Option<String> {
277 self.options.global.coerce_int96.clone()
278 }
279
280 pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
281 self.options.global.coerce_int96 = time_unit;
282 self
283 }
284}
285
286fn clear_metadata(
289 schemas: impl IntoIterator<Item = Schema>,
290) -> impl Iterator<Item = Schema> {
291 schemas.into_iter().map(|schema| {
292 let fields = schema
293 .fields()
294 .iter()
295 .map(|field| {
296 field.as_ref().clone().with_metadata(Default::default()) })
298 .collect::<Fields>();
299 Schema::new(fields)
300 })
301}
302
303#[cfg(feature = "parquet_encryption")]
304async fn get_file_decryption_properties(
305 state: &dyn Session,
306 options: &TableParquetOptions,
307 file_path: &Path,
308) -> Result<Option<Arc<FileDecryptionProperties>>> {
309 Ok(match &options.crypto.file_decryption {
310 Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))),
311 None => match &options.crypto.factory_id {
312 Some(factory_id) => {
313 let factory =
314 state.runtime_env().parquet_encryption_factory(factory_id)?;
315 factory
316 .get_file_decryption_properties(
317 &options.crypto.factory_options,
318 file_path,
319 )
320 .await?
321 }
322 None => None,
323 },
324 })
325}
326
327#[cfg(not(feature = "parquet_encryption"))]
328async fn get_file_decryption_properties(
329 _state: &dyn Session,
330 _options: &TableParquetOptions,
331 _file_path: &Path,
332) -> Result<Option<Arc<FileDecryptionProperties>>> {
333 Ok(None)
334}
335
336#[async_trait]
337impl FileFormat for ParquetFormat {
338 fn as_any(&self) -> &dyn Any {
339 self
340 }
341
342 fn get_ext(&self) -> String {
343 ParquetFormatFactory::new().get_ext()
344 }
345
346 fn get_ext_with_compression(
347 &self,
348 file_compression_type: &FileCompressionType,
349 ) -> Result<String> {
350 let ext = self.get_ext();
351 match file_compression_type.get_variant() {
352 CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
353 _ => internal_err!("Parquet FileFormat does not support compression."),
354 }
355 }
356
357 fn compression_type(&self) -> Option<FileCompressionType> {
358 None
359 }
360
361 async fn infer_schema(
362 &self,
363 state: &dyn Session,
364 store: &Arc<dyn ObjectStore>,
365 objects: &[ObjectMeta],
366 ) -> Result<SchemaRef> {
367 let coerce_int96 = match self.coerce_int96() {
368 Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
369 None => None,
370 };
371
372 let file_metadata_cache =
373 state.runtime_env().cache_manager.get_file_metadata_cache();
374
375 let mut schemas: Vec<_> = futures::stream::iter(objects)
376 .map(|object| async {
377 let file_decryption_properties = get_file_decryption_properties(
378 state,
379 &self.options,
380 &object.location,
381 )
382 .await?;
383 let result = DFParquetMetadata::new(store.as_ref(), object)
384 .with_metadata_size_hint(self.metadata_size_hint())
385 .with_decryption_properties(file_decryption_properties)
386 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
387 .with_coerce_int96(coerce_int96)
388 .fetch_schema_with_location()
389 .await?;
390 Ok::<_, DataFusionError>(result)
391 })
392 .boxed() .buffered(state.config_options().execution.meta_fetch_concurrency)
395 .try_collect()
396 .await?;
397
398 schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
405
406 let schemas = schemas
407 .into_iter()
408 .map(|(_, schema)| schema)
409 .collect::<Vec<_>>();
410
411 let schema = if self.skip_metadata() {
412 Schema::try_merge(clear_metadata(schemas))
413 } else {
414 Schema::try_merge(schemas)
415 }?;
416
417 let schema = if self.binary_as_string() {
418 transform_binary_to_string(&schema)
419 } else {
420 schema
421 };
422
423 let schema = if self.force_view_types() {
424 transform_schema_to_view(&schema)
425 } else {
426 schema
427 };
428
429 Ok(Arc::new(schema))
430 }
431
432 async fn infer_stats(
433 &self,
434 state: &dyn Session,
435 store: &Arc<dyn ObjectStore>,
436 table_schema: SchemaRef,
437 object: &ObjectMeta,
438 ) -> Result<Statistics> {
439 let file_decryption_properties =
440 get_file_decryption_properties(state, &self.options, &object.location)
441 .await?;
442 let file_metadata_cache =
443 state.runtime_env().cache_manager.get_file_metadata_cache();
444 DFParquetMetadata::new(store, object)
445 .with_metadata_size_hint(self.metadata_size_hint())
446 .with_decryption_properties(file_decryption_properties)
447 .with_file_metadata_cache(Some(file_metadata_cache))
448 .fetch_statistics(&table_schema)
449 .await
450 }
451
452 async fn create_physical_plan(
453 &self,
454 state: &dyn Session,
455 conf: FileScanConfig,
456 ) -> Result<Arc<dyn ExecutionPlan>> {
457 let mut metadata_size_hint = None;
458
459 if let Some(metadata) = self.metadata_size_hint() {
460 metadata_size_hint = Some(metadata);
461 }
462
463 let mut source = conf
464 .file_source()
465 .as_any()
466 .downcast_ref::<ParquetSource>()
467 .cloned()
468 .ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
469 source = source.with_table_parquet_options(self.options.clone());
470
471 let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
473 let store = state
474 .runtime_env()
475 .object_store(conf.object_store_url.clone())?;
476 let cached_parquet_read_factory =
477 Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
478 source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
479
480 if let Some(metadata_size_hint) = metadata_size_hint {
481 source = source.with_metadata_size_hint(metadata_size_hint)
482 }
483
484 source = self.set_source_encryption_factory(source, state)?;
485
486 let conf = FileScanConfigBuilder::from(conf)
487 .with_source(Arc::new(source))
488 .build();
489 Ok(DataSourceExec::from_data_source(conf))
490 }
491
492 async fn create_writer_physical_plan(
493 &self,
494 input: Arc<dyn ExecutionPlan>,
495 _state: &dyn Session,
496 conf: FileSinkConfig,
497 order_requirements: Option<LexRequirement>,
498 ) -> Result<Arc<dyn ExecutionPlan>> {
499 if conf.insert_op != InsertOp::Append {
500 return not_impl_err!("Overwrites are not implemented yet for Parquet");
501 }
502
503 let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
504
505 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
506 }
507
508 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
509 Arc::new(
510 ParquetSource::new(table_schema)
511 .with_table_parquet_options(self.options.clone()),
512 )
513 }
514}
515
516#[cfg(feature = "parquet_encryption")]
517impl ParquetFormat {
518 fn set_source_encryption_factory(
519 &self,
520 source: ParquetSource,
521 state: &dyn Session,
522 ) -> Result<ParquetSource> {
523 if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
524 Ok(source.with_encryption_factory(
525 state
526 .runtime_env()
527 .parquet_encryption_factory(encryption_factory_id)?,
528 ))
529 } else {
530 Ok(source)
531 }
532 }
533}
534
535#[cfg(not(feature = "parquet_encryption"))]
536impl ParquetFormat {
537 fn set_source_encryption_factory(
538 &self,
539 source: ParquetSource,
540 _state: &dyn Session,
541 ) -> Result<ParquetSource> {
542 if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
543 Err(DataFusionError::Configuration(format!(
544 "Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"
545 )))
546 } else {
547 Ok(source)
548 }
549 }
550}
551
552pub fn apply_file_schema_type_coercions(
568 table_schema: &Schema,
569 file_schema: &Schema,
570) -> Option<Schema> {
571 let mut needs_view_transform = false;
572 let mut needs_string_transform = false;
573
574 let table_fields: HashMap<_, _> = table_schema
577 .fields()
578 .iter()
579 .map(|f| {
580 let dt = f.data_type();
581 if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
583 needs_view_transform = true;
584 }
585 if matches!(
587 dt,
588 &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
589 ) {
590 needs_string_transform = true;
591 }
592
593 (f.name(), dt)
594 })
595 .collect();
596
597 if !needs_view_transform && !needs_string_transform {
599 return None;
600 }
601
602 let transformed_fields: Vec<Arc<Field>> = file_schema
603 .fields()
604 .iter()
605 .map(|field| {
606 let field_name = field.name();
607 let field_type = field.data_type();
608
609 if let Some(table_type) = table_fields.get(field_name) {
611 match (table_type, field_type) {
612 (
614 &DataType::Utf8,
615 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
616 ) => {
617 return field_with_new_type(field, DataType::Utf8);
618 }
619 (
621 &DataType::LargeUtf8,
622 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
623 ) => {
624 return field_with_new_type(field, DataType::LargeUtf8);
625 }
626 (
628 &DataType::Utf8View,
629 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
630 ) => {
631 return field_with_new_type(field, DataType::Utf8View);
632 }
633 (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
635 return field_with_new_type(field, DataType::Utf8View);
636 }
637 (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
638 return field_with_new_type(field, DataType::BinaryView);
639 }
640 _ => {}
641 }
642 }
643
644 Arc::clone(field)
646 })
647 .collect();
648
649 Some(Schema::new_with_metadata(
650 transformed_fields,
651 file_schema.metadata.clone(),
652 ))
653}
654
655pub fn coerce_int96_to_resolution(
657 parquet_schema: &SchemaDescriptor,
658 file_schema: &Schema,
659 time_unit: &TimeUnit,
660) -> Option<Schema> {
661 let int96_fields: HashSet<_> = parquet_schema
664 .columns()
665 .iter()
666 .filter(|f| f.physical_type() == Type::INT96)
667 .map(|f| f.path().string())
668 .collect();
669
670 if int96_fields.is_empty() {
671 return None;
673 }
674
675 type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
679 type StackContext<'a> = (
680 Vec<&'a str>, &'a FieldRef, NestedFields, Option<NestedFields>, );
692
693 let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
696
697 let transformed_schema = {
701 let mut stack: Vec<StackContext> = file_schema
703 .fields()
704 .iter()
705 .rev()
706 .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
707 .collect();
708
709 while let Some((parquet_path, current_field, parent_fields, child_fields)) =
711 stack.pop()
712 {
713 match (current_field.data_type(), child_fields) {
714 (DataType::Struct(unprocessed_children), None) => {
715 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
720 unprocessed_children.len(),
721 )));
722 stack.push((
725 parquet_path.clone(),
726 current_field,
727 parent_fields,
728 Some(Rc::clone(&child_fields)),
729 ));
730 for child in unprocessed_children.into_iter().rev() {
733 let mut child_path = parquet_path.clone();
734 child_path.push(".");
737 child_path.push(child.name());
738 stack.push((child_path, child, Rc::clone(&child_fields), None));
741 }
742 }
743 (DataType::Struct(unprocessed_children), Some(processed_children)) => {
744 let processed_children = processed_children.borrow();
748 assert_eq!(processed_children.len(), unprocessed_children.len());
749 let processed_struct = Field::new_struct(
750 current_field.name(),
751 processed_children.as_slice(),
752 current_field.is_nullable(),
753 );
754 parent_fields.borrow_mut().push(Arc::new(processed_struct));
755 }
756 (DataType::List(unprocessed_child), None) => {
757 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
759 stack.push((
760 parquet_path.clone(),
761 current_field,
762 parent_fields,
763 Some(Rc::clone(&child_fields)),
764 ));
765 let mut child_path = parquet_path.clone();
766 child_path.push(".list.");
770 child_path.push(unprocessed_child.name());
771 stack.push((
772 child_path.clone(),
773 unprocessed_child,
774 Rc::clone(&child_fields),
775 None,
776 ));
777 }
778 (DataType::List(_), Some(processed_children)) => {
779 let processed_children = processed_children.borrow();
781 assert_eq!(processed_children.len(), 1);
782 let processed_list = Field::new_list(
783 current_field.name(),
784 Arc::clone(&processed_children[0]),
785 current_field.is_nullable(),
786 );
787 parent_fields.borrow_mut().push(Arc::new(processed_list));
788 }
789 (DataType::Map(unprocessed_child, _), None) => {
790 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
792 stack.push((
793 parquet_path.clone(),
794 current_field,
795 parent_fields,
796 Some(Rc::clone(&child_fields)),
797 ));
798 let mut child_path = parquet_path.clone();
799 child_path.push(".");
800 child_path.push(unprocessed_child.name());
801 stack.push((
802 child_path.clone(),
803 unprocessed_child,
804 Rc::clone(&child_fields),
805 None,
806 ));
807 }
808 (DataType::Map(_, sorted), Some(processed_children)) => {
809 let processed_children = processed_children.borrow();
811 assert_eq!(processed_children.len(), 1);
812 let processed_map = Field::new(
813 current_field.name(),
814 DataType::Map(Arc::clone(&processed_children[0]), *sorted),
815 current_field.is_nullable(),
816 );
817 parent_fields.borrow_mut().push(Arc::new(processed_map));
818 }
819 (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
820 if int96_fields.contains(parquet_path.concat().as_str()) =>
821 {
824 parent_fields.borrow_mut().push(field_with_new_type(
825 current_field,
826 DataType::Timestamp(*time_unit, None),
827 ));
828 }
829 _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
831 }
832 }
833 assert_eq!(fields.borrow().len(), file_schema.fields.len());
834 Schema::new_with_metadata(
835 fields.borrow_mut().clone(),
836 file_schema.metadata.clone(),
837 )
838 };
839
840 Some(transformed_schema)
841}
842
843#[deprecated(
845 since = "47.0.0",
846 note = "Use `apply_file_schema_type_coercions` instead"
847)]
848pub fn coerce_file_schema_to_view_type(
849 table_schema: &Schema,
850 file_schema: &Schema,
851) -> Option<Schema> {
852 let mut transform = false;
853 let table_fields: HashMap<_, _> = table_schema
854 .fields
855 .iter()
856 .map(|f| {
857 let dt = f.data_type();
858 if dt.equals_datatype(&DataType::Utf8View)
859 || dt.equals_datatype(&DataType::BinaryView)
860 {
861 transform = true;
862 }
863 (f.name(), dt)
864 })
865 .collect();
866
867 if !transform {
868 return None;
869 }
870
871 let transformed_fields: Vec<Arc<Field>> = file_schema
872 .fields
873 .iter()
874 .map(
875 |field| match (table_fields.get(field.name()), field.data_type()) {
876 (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
877 field_with_new_type(field, DataType::Utf8View)
878 }
879 (
880 Some(DataType::BinaryView),
881 DataType::Binary | DataType::LargeBinary,
882 ) => field_with_new_type(field, DataType::BinaryView),
883 _ => Arc::clone(field),
884 },
885 )
886 .collect();
887
888 Some(Schema::new_with_metadata(
889 transformed_fields,
890 file_schema.metadata.clone(),
891 ))
892}
893
894#[deprecated(
898 since = "47.0.0",
899 note = "Use `apply_file_schema_type_coercions` instead"
900)]
901pub fn coerce_file_schema_to_string_type(
902 table_schema: &Schema,
903 file_schema: &Schema,
904) -> Option<Schema> {
905 let mut transform = false;
906 let table_fields: HashMap<_, _> = table_schema
907 .fields
908 .iter()
909 .map(|f| (f.name(), f.data_type()))
910 .collect();
911 let transformed_fields: Vec<Arc<Field>> = file_schema
912 .fields
913 .iter()
914 .map(
915 |field| match (table_fields.get(field.name()), field.data_type()) {
916 (
918 Some(DataType::Utf8),
919 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
920 ) => {
921 transform = true;
922 field_with_new_type(field, DataType::Utf8)
923 }
924 (
926 Some(DataType::LargeUtf8),
927 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
928 ) => {
929 transform = true;
930 field_with_new_type(field, DataType::LargeUtf8)
931 }
932 (
934 Some(DataType::Utf8View),
935 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
936 ) => {
937 transform = true;
938 field_with_new_type(field, DataType::Utf8View)
939 }
940 _ => Arc::clone(field),
941 },
942 )
943 .collect();
944
945 if !transform {
946 None
947 } else {
948 Some(Schema::new_with_metadata(
949 transformed_fields,
950 file_schema.metadata.clone(),
951 ))
952 }
953}
954
955fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
958 Arc::new(field.as_ref().clone().with_data_type(new_type))
959}
960
961pub fn transform_schema_to_view(schema: &Schema) -> Schema {
965 let transformed_fields: Vec<Arc<Field>> = schema
966 .fields
967 .iter()
968 .map(|field| match field.data_type() {
969 DataType::Utf8 | DataType::LargeUtf8 => {
970 field_with_new_type(field, DataType::Utf8View)
971 }
972 DataType::Binary | DataType::LargeBinary => {
973 field_with_new_type(field, DataType::BinaryView)
974 }
975 _ => Arc::clone(field),
976 })
977 .collect();
978 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
979}
980
981pub fn transform_binary_to_string(schema: &Schema) -> Schema {
983 let transformed_fields: Vec<Arc<Field>> = schema
984 .fields
985 .iter()
986 .map(|field| match field.data_type() {
987 DataType::Binary => field_with_new_type(field, DataType::Utf8),
988 DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
989 DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
990 _ => Arc::clone(field),
991 })
992 .collect();
993 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
994}
995
996pub struct ObjectStoreFetch<'a> {
998 store: &'a dyn ObjectStore,
999 meta: &'a ObjectMeta,
1000}
1001
1002impl<'a> ObjectStoreFetch<'a> {
1003 pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
1004 Self { store, meta }
1005 }
1006}
1007
1008impl MetadataFetch for ObjectStoreFetch<'_> {
1009 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
1010 async {
1011 self.store
1012 .get_range(&self.meta.location, range)
1013 .await
1014 .map_err(ParquetError::from)
1015 }
1016 .boxed()
1017 }
1018}
1019
1020#[deprecated(
1027 since = "50.0.0",
1028 note = "Use `DFParquetMetadata::fetch_metadata` instead"
1029)]
1030pub async fn fetch_parquet_metadata(
1031 store: &dyn ObjectStore,
1032 object_meta: &ObjectMeta,
1033 size_hint: Option<usize>,
1034 decryption_properties: Option<&FileDecryptionProperties>,
1035 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1036) -> Result<Arc<ParquetMetaData>> {
1037 let decryption_properties = decryption_properties.cloned().map(Arc::new);
1038 DFParquetMetadata::new(store, object_meta)
1039 .with_metadata_size_hint(size_hint)
1040 .with_decryption_properties(decryption_properties)
1041 .with_file_metadata_cache(file_metadata_cache)
1042 .fetch_metadata()
1043 .await
1044}
1045
1046#[deprecated(
1050 since = "50.0.0",
1051 note = "Use `DFParquetMetadata::fetch_statistics` instead"
1052)]
1053pub async fn fetch_statistics(
1054 store: &dyn ObjectStore,
1055 table_schema: SchemaRef,
1056 file: &ObjectMeta,
1057 metadata_size_hint: Option<usize>,
1058 decryption_properties: Option<&FileDecryptionProperties>,
1059 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1060) -> Result<Statistics> {
1061 let decryption_properties = decryption_properties.cloned().map(Arc::new);
1062 DFParquetMetadata::new(store, file)
1063 .with_metadata_size_hint(metadata_size_hint)
1064 .with_decryption_properties(decryption_properties)
1065 .with_file_metadata_cache(file_metadata_cache)
1066 .fetch_statistics(&table_schema)
1067 .await
1068}
1069
1070#[deprecated(
1071 since = "50.0.0",
1072 note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
1073)]
1074#[expect(clippy::needless_pass_by_value)]
1075pub fn statistics_from_parquet_meta_calc(
1076 metadata: &ParquetMetaData,
1077 table_schema: SchemaRef,
1078) -> Result<Statistics> {
1079 DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
1080}
1081
1082pub struct ParquetSink {
1084 config: FileSinkConfig,
1086 parquet_options: TableParquetOptions,
1088 written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
1091}
1092
1093impl Debug for ParquetSink {
1094 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1095 f.debug_struct("ParquetSink").finish()
1096 }
1097}
1098
1099impl DisplayAs for ParquetSink {
1100 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1101 match t {
1102 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1103 write!(f, "ParquetSink(file_groups=",)?;
1104 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1105 write!(f, ")")
1106 }
1107 DisplayFormatType::TreeRender => {
1108 write!(f, "")
1110 }
1111 }
1112 }
1113}
1114
1115impl ParquetSink {
1116 pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1118 Self {
1119 config,
1120 parquet_options,
1121 written: Default::default(),
1122 }
1123 }
1124
1125 pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
1128 self.written.lock().clone()
1129 }
1130
1131 async fn create_writer_props(
1134 &self,
1135 runtime: &Arc<RuntimeEnv>,
1136 path: &Path,
1137 ) -> Result<WriterProperties> {
1138 let schema = self.config.output_schema();
1139
1140 let mut parquet_opts = self.parquet_options.clone();
1143 if !self.parquet_options.global.skip_arrow_metadata {
1144 parquet_opts.arrow_schema(schema);
1145 }
1146
1147 let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1148 builder = set_writer_encryption_properties(
1149 builder,
1150 runtime,
1151 parquet_opts,
1152 schema,
1153 path,
1154 )
1155 .await?;
1156 Ok(builder.build())
1157 }
1158
1159 async fn create_async_arrow_writer(
1162 &self,
1163 location: &Path,
1164 object_store: Arc<dyn ObjectStore>,
1165 context: &Arc<TaskContext>,
1166 parquet_props: WriterProperties,
1167 ) -> Result<AsyncArrowWriter<BufWriter>> {
1168 let buf_writer = BufWriter::with_capacity(
1169 object_store,
1170 location.clone(),
1171 context
1172 .session_config()
1173 .options()
1174 .execution
1175 .objectstore_writer_buffer_size,
1176 );
1177 let options = ArrowWriterOptions::new()
1178 .with_properties(parquet_props)
1179 .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1180
1181 let writer = AsyncArrowWriter::try_new_with_options(
1182 buf_writer,
1183 get_writer_schema(&self.config),
1184 options,
1185 )?;
1186 Ok(writer)
1187 }
1188
1189 pub fn parquet_options(&self) -> &TableParquetOptions {
1191 &self.parquet_options
1192 }
1193}
1194
1195#[cfg(feature = "parquet_encryption")]
1196async fn set_writer_encryption_properties(
1197 builder: WriterPropertiesBuilder,
1198 runtime: &Arc<RuntimeEnv>,
1199 parquet_opts: TableParquetOptions,
1200 schema: &Arc<Schema>,
1201 path: &Path,
1202) -> Result<WriterPropertiesBuilder> {
1203 if let Some(file_encryption_properties) = parquet_opts.crypto.file_encryption {
1204 return Ok(builder.with_file_encryption_properties(Arc::new(
1206 FileEncryptionProperties::from(file_encryption_properties),
1207 )));
1208 } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
1209 let encryption_factory =
1211 runtime.parquet_encryption_factory(encryption_factory_id)?;
1212 let file_encryption_properties = encryption_factory
1213 .get_file_encryption_properties(
1214 &parquet_opts.crypto.factory_options,
1215 schema,
1216 path,
1217 )
1218 .await?;
1219 if let Some(file_encryption_properties) = file_encryption_properties {
1220 return Ok(
1221 builder.with_file_encryption_properties(file_encryption_properties)
1222 );
1223 }
1224 }
1225 Ok(builder)
1226}
1227
1228#[cfg(not(feature = "parquet_encryption"))]
1229async fn set_writer_encryption_properties(
1230 builder: WriterPropertiesBuilder,
1231 _runtime: &Arc<RuntimeEnv>,
1232 _parquet_opts: TableParquetOptions,
1233 _schema: &Arc<Schema>,
1234 _path: &Path,
1235) -> Result<WriterPropertiesBuilder> {
1236 Ok(builder)
1237}
1238
1239#[async_trait]
1240impl FileSink for ParquetSink {
1241 fn config(&self) -> &FileSinkConfig {
1242 &self.config
1243 }
1244
1245 async fn spawn_writer_tasks_and_join(
1246 &self,
1247 context: &Arc<TaskContext>,
1248 demux_task: SpawnedTask<Result<()>>,
1249 mut file_stream_rx: DemuxedStreamReceiver,
1250 object_store: Arc<dyn ObjectStore>,
1251 ) -> Result<u64> {
1252 let parquet_opts = &self.parquet_options;
1253
1254 let mut file_write_tasks: JoinSet<
1255 std::result::Result<(Path, ParquetMetaData), DataFusionError>,
1256 > = JoinSet::new();
1257
1258 let runtime = context.runtime_env();
1259 let parallel_options = ParallelParquetWriterOptions {
1260 max_parallel_row_groups: parquet_opts
1261 .global
1262 .maximum_parallel_row_group_writers,
1263 max_buffered_record_batches_per_stream: parquet_opts
1264 .global
1265 .maximum_buffered_record_batches_per_stream,
1266 };
1267
1268 while let Some((path, mut rx)) = file_stream_rx.recv().await {
1269 let parquet_props = self.create_writer_props(&runtime, &path).await?;
1270 if !parquet_opts.global.allow_single_file_parallelism {
1271 let mut writer = self
1272 .create_async_arrow_writer(
1273 &path,
1274 Arc::clone(&object_store),
1275 context,
1276 parquet_props.clone(),
1277 )
1278 .await?;
1279 let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
1280 .register(context.memory_pool());
1281 file_write_tasks.spawn(async move {
1282 while let Some(batch) = rx.recv().await {
1283 writer.write(&batch).await?;
1284 reservation.try_resize(writer.memory_size())?;
1285 }
1286 let parquet_meta_data = writer
1287 .close()
1288 .await
1289 .map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1290 Ok((path, parquet_meta_data))
1291 });
1292 } else {
1293 let writer = ObjectWriterBuilder::new(
1294 FileCompressionType::UNCOMPRESSED,
1297 &path,
1298 Arc::clone(&object_store),
1299 )
1300 .with_buffer_size(Some(
1301 context
1302 .session_config()
1303 .options()
1304 .execution
1305 .objectstore_writer_buffer_size,
1306 ))
1307 .build()?;
1308 let schema = get_writer_schema(&self.config);
1309 let props = parquet_props.clone();
1310 let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
1311 let parallel_options_clone = parallel_options.clone();
1312 let pool = Arc::clone(context.memory_pool());
1313 file_write_tasks.spawn(async move {
1314 let parquet_meta_data = output_single_parquet_file_parallelized(
1315 writer,
1316 rx,
1317 schema,
1318 &props,
1319 skip_arrow_metadata,
1320 parallel_options_clone,
1321 pool,
1322 )
1323 .await?;
1324 Ok((path, parquet_meta_data))
1325 });
1326 }
1327 }
1328
1329 let mut row_count = 0;
1330 while let Some(result) = file_write_tasks.join_next().await {
1331 match result {
1332 Ok(r) => {
1333 let (path, parquet_meta_data) = r?;
1334 row_count += parquet_meta_data.file_metadata().num_rows();
1335 let mut written_files = self.written.lock();
1336 written_files
1337 .try_insert(path.clone(), parquet_meta_data)
1338 .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1339 drop(written_files);
1340 }
1341 Err(e) => {
1342 if e.is_panic() {
1343 std::panic::resume_unwind(e.into_panic());
1344 } else {
1345 unreachable!();
1346 }
1347 }
1348 }
1349 }
1350
1351 demux_task
1352 .join_unwind()
1353 .await
1354 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1355
1356 Ok(row_count as u64)
1357 }
1358}
1359
1360#[async_trait]
1361impl DataSink for ParquetSink {
1362 fn as_any(&self) -> &dyn Any {
1363 self
1364 }
1365
1366 fn schema(&self) -> &SchemaRef {
1367 self.config.output_schema()
1368 }
1369
1370 async fn write_all(
1371 &self,
1372 data: SendableRecordBatchStream,
1373 context: &Arc<TaskContext>,
1374 ) -> Result<u64> {
1375 FileSink::write_all(self, data, context).await
1376 }
1377}
1378
1379async fn column_serializer_task(
1382 mut rx: Receiver<ArrowLeafColumn>,
1383 mut writer: ArrowColumnWriter,
1384 mut reservation: MemoryReservation,
1385) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1386 while let Some(col) = rx.recv().await {
1387 writer.write(&col)?;
1388 reservation.try_resize(writer.memory_size())?;
1389 }
1390 Ok((writer, reservation))
1391}
1392
1393type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1394type ColSender = Sender<ArrowLeafColumn>;
1395
1396fn spawn_column_parallel_row_group_writer(
1400 col_writers: Vec<ArrowColumnWriter>,
1401 max_buffer_size: usize,
1402 pool: &Arc<dyn MemoryPool>,
1403) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1404 let num_columns = col_writers.len();
1405
1406 let mut col_writer_tasks = Vec::with_capacity(num_columns);
1407 let mut col_array_channels = Vec::with_capacity(num_columns);
1408 for writer in col_writers.into_iter() {
1409 let (send_array, receive_array) =
1411 mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1412 col_array_channels.push(send_array);
1413
1414 let reservation =
1415 MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1416 let task = SpawnedTask::spawn(column_serializer_task(
1417 receive_array,
1418 writer,
1419 reservation,
1420 ));
1421 col_writer_tasks.push(task);
1422 }
1423
1424 Ok((col_writer_tasks, col_array_channels))
1425}
1426
1427#[derive(Clone)]
1429struct ParallelParquetWriterOptions {
1430 max_parallel_row_groups: usize,
1431 max_buffered_record_batches_per_stream: usize,
1432}
1433
1434type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1437
1438async fn send_arrays_to_col_writers(
1441 col_array_channels: &[ColSender],
1442 rb: &RecordBatch,
1443 schema: Arc<Schema>,
1444) -> Result<()> {
1445 let mut next_channel = 0;
1447 for (array, field) in rb.columns().iter().zip(schema.fields()) {
1448 for c in compute_leaves(field, array)? {
1449 if col_array_channels[next_channel].send(c).await.is_err() {
1452 return Ok(());
1453 }
1454
1455 next_channel += 1;
1456 }
1457 }
1458
1459 Ok(())
1460}
1461
1462fn spawn_rg_join_and_finalize_task(
1465 column_writer_tasks: Vec<ColumnWriterTask>,
1466 rg_rows: usize,
1467 pool: &Arc<dyn MemoryPool>,
1468) -> SpawnedTask<RBStreamSerializeResult> {
1469 let mut rg_reservation =
1470 MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1471
1472 SpawnedTask::spawn(async move {
1473 let num_cols = column_writer_tasks.len();
1474 let mut finalized_rg = Vec::with_capacity(num_cols);
1475 for task in column_writer_tasks.into_iter() {
1476 let (writer, _col_reservation) = task
1477 .join_unwind()
1478 .await
1479 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1480 let encoded_size = writer.get_estimated_total_bytes();
1481 rg_reservation.grow(encoded_size);
1482 finalized_rg.push(writer.close()?);
1483 }
1484
1485 Ok((finalized_rg, rg_reservation, rg_rows))
1486 })
1487}
1488
1489fn spawn_parquet_parallel_serialization_task(
1498 row_group_writer_factory: ArrowRowGroupWriterFactory,
1499 mut data: Receiver<RecordBatch>,
1500 serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1501 schema: Arc<Schema>,
1502 writer_props: Arc<WriterProperties>,
1503 parallel_options: Arc<ParallelParquetWriterOptions>,
1504 pool: Arc<dyn MemoryPool>,
1505) -> SpawnedTask<Result<(), DataFusionError>> {
1506 SpawnedTask::spawn(async move {
1507 let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1508 let max_row_group_rows = writer_props.max_row_group_size();
1509 let mut row_group_index = 0;
1510 let col_writers =
1511 row_group_writer_factory.create_column_writers(row_group_index)?;
1512 let (mut column_writer_handles, mut col_array_channels) =
1513 spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
1514 let mut current_rg_rows = 0;
1515
1516 while let Some(mut rb) = data.recv().await {
1517 loop {
1521 if current_rg_rows + rb.num_rows() < max_row_group_rows {
1522 send_arrays_to_col_writers(
1523 &col_array_channels,
1524 &rb,
1525 Arc::clone(&schema),
1526 )
1527 .await?;
1528 current_rg_rows += rb.num_rows();
1529 break;
1530 } else {
1531 let rows_left = max_row_group_rows - current_rg_rows;
1532 let a = rb.slice(0, rows_left);
1533 send_arrays_to_col_writers(
1534 &col_array_channels,
1535 &a,
1536 Arc::clone(&schema),
1537 )
1538 .await?;
1539
1540 drop(col_array_channels);
1544 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1545 column_writer_handles,
1546 max_row_group_rows,
1547 &pool,
1548 );
1549
1550 if serialize_tx.send(finalize_rg_task).await.is_err() {
1553 return Ok(());
1554 }
1555
1556 current_rg_rows = 0;
1557 rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1558
1559 row_group_index += 1;
1560 let col_writers = row_group_writer_factory
1561 .create_column_writers(row_group_index)?;
1562 (column_writer_handles, col_array_channels) =
1563 spawn_column_parallel_row_group_writer(
1564 col_writers,
1565 max_buffer_rb,
1566 &pool,
1567 )?;
1568 }
1569 }
1570 }
1571
1572 drop(col_array_channels);
1573 if current_rg_rows > 0 {
1575 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1576 column_writer_handles,
1577 current_rg_rows,
1578 &pool,
1579 );
1580
1581 if serialize_tx.send(finalize_rg_task).await.is_err() {
1584 return Ok(());
1585 }
1586 }
1587
1588 Ok(())
1589 })
1590}
1591
1592async fn concatenate_parallel_row_groups(
1595 mut parquet_writer: SerializedFileWriter<SharedBuffer>,
1596 merged_buff: SharedBuffer,
1597 mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1598 mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1599 pool: Arc<dyn MemoryPool>,
1600) -> Result<ParquetMetaData> {
1601 let mut file_reservation =
1602 MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1603
1604 while let Some(task) = serialize_rx.recv().await {
1605 let result = task.join_unwind().await;
1606 let (serialized_columns, mut rg_reservation, _cnt) =
1607 result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1608
1609 let mut rg_out = parquet_writer.next_row_group()?;
1610 for chunk in serialized_columns {
1611 chunk.append_to_row_group(&mut rg_out)?;
1612 rg_reservation.free();
1613
1614 let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1615 file_reservation.try_resize(buff_to_flush.len())?;
1616
1617 if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1618 object_store_writer
1619 .write_all(buff_to_flush.as_slice())
1620 .await?;
1621 buff_to_flush.clear();
1622 file_reservation.try_resize(buff_to_flush.len())?; }
1624 }
1625 rg_out.close()?;
1626 }
1627
1628 let parquet_meta_data = parquet_writer.close()?;
1629 let final_buff = merged_buff.buffer.try_lock().unwrap();
1630
1631 object_store_writer.write_all(final_buff.as_slice()).await?;
1632 object_store_writer.shutdown().await?;
1633 file_reservation.free();
1634
1635 Ok(parquet_meta_data)
1636}
1637
1638async fn output_single_parquet_file_parallelized(
1643 object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1644 data: Receiver<RecordBatch>,
1645 output_schema: Arc<Schema>,
1646 parquet_props: &WriterProperties,
1647 skip_arrow_metadata: bool,
1648 parallel_options: ParallelParquetWriterOptions,
1649 pool: Arc<dyn MemoryPool>,
1650) -> Result<ParquetMetaData> {
1651 let max_rowgroups = parallel_options.max_parallel_row_groups;
1652 let (serialize_tx, serialize_rx) =
1654 mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1655
1656 let arc_props = Arc::new(parquet_props.clone());
1657 let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1658 let options = ArrowWriterOptions::new()
1659 .with_properties(parquet_props.clone())
1660 .with_skip_arrow_metadata(skip_arrow_metadata);
1661 let writer = ArrowWriter::try_new_with_options(
1662 merged_buff.clone(),
1663 Arc::clone(&output_schema),
1664 options,
1665 )?;
1666 let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
1667
1668 let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1669 row_group_writer_factory,
1670 data,
1671 serialize_tx,
1672 Arc::clone(&output_schema),
1673 Arc::clone(&arc_props),
1674 parallel_options.into(),
1675 Arc::clone(&pool),
1676 );
1677 let parquet_meta_data = concatenate_parallel_row_groups(
1678 writer,
1679 merged_buff,
1680 serialize_rx,
1681 object_store_writer,
1682 pool,
1683 )
1684 .await?;
1685
1686 launch_serialization_task
1687 .join_unwind()
1688 .await
1689 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1690 Ok(parquet_meta_data)
1691}
1692
1693#[cfg(test)]
1694mod tests {
1695 use parquet::arrow::parquet_to_arrow_schema;
1696 use std::sync::Arc;
1697
1698 use super::*;
1699
1700 use arrow::datatypes::DataType;
1701 use parquet::schema::parser::parse_message_type;
1702
1703 #[test]
1704 fn coerce_int96_to_resolution_with_mixed_timestamps() {
1705 let spark_schema = "
1709 message spark_schema {
1710 optional int96 c0;
1711 optional int64 c1 (TIMESTAMP(NANOS,true));
1712 optional int64 c2 (TIMESTAMP(NANOS,false));
1713 optional int64 c3 (TIMESTAMP(MILLIS,true));
1714 optional int64 c4 (TIMESTAMP(MILLIS,false));
1715 optional int64 c5 (TIMESTAMP(MICROS,true));
1716 optional int64 c6 (TIMESTAMP(MICROS,false));
1717 }
1718 ";
1719
1720 let schema = parse_message_type(spark_schema).expect("should parse schema");
1721 let descr = SchemaDescriptor::new(Arc::new(schema));
1722
1723 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1724
1725 let result =
1726 coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1727 .unwrap();
1728
1729 let expected_schema = Schema::new(vec![
1732 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1733 Field::new(
1734 "c1",
1735 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1736 true,
1737 ),
1738 Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1739 Field::new(
1740 "c3",
1741 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1742 true,
1743 ),
1744 Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1745 Field::new(
1746 "c5",
1747 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1748 true,
1749 ),
1750 Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1751 ]);
1752
1753 assert_eq!(result, expected_schema);
1754 }
1755
1756 #[test]
1757 fn coerce_int96_to_resolution_with_nested_types() {
1758 let spark_schema = "
1763 message spark_schema {
1764 optional int96 c0;
1765 optional group c1 {
1766 optional int96 c0;
1767 }
1768 optional group c2 {
1769 optional group c0 (LIST) {
1770 repeated group list {
1771 optional int96 element;
1772 }
1773 }
1774 }
1775 optional group c3 (LIST) {
1776 repeated group list {
1777 optional int96 element;
1778 }
1779 }
1780 optional group c4 (LIST) {
1781 repeated group list {
1782 optional group element {
1783 optional int96 c0;
1784 optional int96 c1;
1785 }
1786 }
1787 }
1788 optional group c5 (MAP) {
1789 repeated group key_value {
1790 required int96 key;
1791 optional int96 value;
1792 }
1793 }
1794 optional group c6 (LIST) {
1795 repeated group list {
1796 optional group element (MAP) {
1797 repeated group key_value {
1798 required int96 key;
1799 optional int96 value;
1800 }
1801 }
1802 }
1803 }
1804 }
1805 ";
1806
1807 let schema = parse_message_type(spark_schema).expect("should parse schema");
1808 let descr = SchemaDescriptor::new(Arc::new(schema));
1809
1810 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1811
1812 let result =
1813 coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1814 .unwrap();
1815
1816 let expected_schema = Schema::new(vec![
1817 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1818 Field::new_struct(
1819 "c1",
1820 vec![Field::new(
1821 "c0",
1822 DataType::Timestamp(TimeUnit::Microsecond, None),
1823 true,
1824 )],
1825 true,
1826 ),
1827 Field::new_struct(
1828 "c2",
1829 vec![Field::new_list(
1830 "c0",
1831 Field::new(
1832 "element",
1833 DataType::Timestamp(TimeUnit::Microsecond, None),
1834 true,
1835 ),
1836 true,
1837 )],
1838 true,
1839 ),
1840 Field::new_list(
1841 "c3",
1842 Field::new(
1843 "element",
1844 DataType::Timestamp(TimeUnit::Microsecond, None),
1845 true,
1846 ),
1847 true,
1848 ),
1849 Field::new_list(
1850 "c4",
1851 Field::new_struct(
1852 "element",
1853 vec![
1854 Field::new(
1855 "c0",
1856 DataType::Timestamp(TimeUnit::Microsecond, None),
1857 true,
1858 ),
1859 Field::new(
1860 "c1",
1861 DataType::Timestamp(TimeUnit::Microsecond, None),
1862 true,
1863 ),
1864 ],
1865 true,
1866 ),
1867 true,
1868 ),
1869 Field::new_map(
1870 "c5",
1871 "key_value",
1872 Field::new(
1873 "key",
1874 DataType::Timestamp(TimeUnit::Microsecond, None),
1875 false,
1876 ),
1877 Field::new(
1878 "value",
1879 DataType::Timestamp(TimeUnit::Microsecond, None),
1880 true,
1881 ),
1882 false,
1883 true,
1884 ),
1885 Field::new_list(
1886 "c6",
1887 Field::new_map(
1888 "element",
1889 "key_value",
1890 Field::new(
1891 "key",
1892 DataType::Timestamp(TimeUnit::Microsecond, None),
1893 false,
1894 ),
1895 Field::new(
1896 "value",
1897 DataType::Timestamp(TimeUnit::Microsecond, None),
1898 true,
1899 ),
1900 false,
1901 true,
1902 ),
1903 true,
1904 ),
1905 ]);
1906
1907 assert_eq!(result, expected_schema);
1908 }
1909}