1use std::any::Any;
21use std::cell::RefCell;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{fmt, vec};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
32use datafusion_datasource::write::{
33 get_writer_schema, ObjectWriterBuilder, SharedBuffer,
34};
35
36use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
37use datafusion_datasource::write::demux::DemuxedStreamReceiver;
38
39use arrow::datatypes::{DataType, Field, FieldRef};
40use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
41use datafusion_common::encryption::FileDecryptionProperties;
42use datafusion_common::parsers::CompressionTypeVariant;
43use datafusion_common::{
44 internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt,
45 HashSet, Result, DEFAULT_PARQUET_EXTENSION,
46};
47use datafusion_common::{HashMap, Statistics};
48use datafusion_common_runtime::{JoinSet, SpawnedTask};
49use datafusion_datasource::display::FileGroupDisplay;
50use datafusion_datasource::file::FileSource;
51use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
52use datafusion_datasource::sink::{DataSink, DataSinkExec};
53use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
54use datafusion_execution::{SendableRecordBatchStream, TaskContext};
55use datafusion_expr::dml::InsertOp;
56use datafusion_physical_expr_common::sort_expr::LexRequirement;
57use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
58use datafusion_session::Session;
59
60use crate::metadata::DFParquetMetadata;
61use crate::reader::CachedParquetFileReaderFactory;
62use crate::source::{parse_coerce_int96_string, ParquetSource};
63use async_trait::async_trait;
64use bytes::Bytes;
65use datafusion_datasource::source::DataSourceExec;
66use datafusion_execution::cache::cache_manager::FileMetadataCache;
67use datafusion_execution::runtime_env::RuntimeEnv;
68use futures::future::BoxFuture;
69use futures::{FutureExt, StreamExt, TryStreamExt};
70use object_store::buffered::BufWriter;
71use object_store::path::Path;
72use object_store::{ObjectMeta, ObjectStore};
73use parquet::arrow::arrow_writer::{
74 compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn,
75 ArrowRowGroupWriterFactory, ArrowWriterOptions,
76};
77use parquet::arrow::async_reader::MetadataFetch;
78use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
79use parquet::basic::Type;
80#[cfg(feature = "parquet_encryption")]
81use parquet::encryption::encrypt::FileEncryptionProperties;
82use parquet::errors::ParquetError;
83use parquet::file::metadata::ParquetMetaData;
84use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
85use parquet::file::writer::SerializedFileWriter;
86use parquet::schema::types::SchemaDescriptor;
87use tokio::io::{AsyncWrite, AsyncWriteExt};
88use tokio::sync::mpsc::{self, Receiver, Sender};
89
90const INITIAL_BUFFER_BYTES: usize = 1048576;
93
94const BUFFER_FLUSH_BYTES: usize = 1024000;
97
98#[derive(Default)]
99pub struct ParquetFormatFactory {
101 pub options: Option<TableParquetOptions>,
103}
104
105impl ParquetFormatFactory {
106 pub fn new() -> Self {
108 Self { options: None }
109 }
110
111 pub fn new_with_options(options: TableParquetOptions) -> Self {
113 Self {
114 options: Some(options),
115 }
116 }
117}
118
119impl FileFormatFactory for ParquetFormatFactory {
120 fn create(
121 &self,
122 state: &dyn Session,
123 format_options: &std::collections::HashMap<String, String>,
124 ) -> Result<Arc<dyn FileFormat>> {
125 let parquet_options = match &self.options {
126 None => {
127 let mut table_options = state.default_table_options();
128 table_options.set_config_format(ConfigFileType::PARQUET);
129 table_options.alter_with_string_hash_map(format_options)?;
130 table_options.parquet
131 }
132 Some(parquet_options) => {
133 let mut parquet_options = parquet_options.clone();
134 for (k, v) in format_options {
135 parquet_options.set(k, v)?;
136 }
137 parquet_options
138 }
139 };
140
141 Ok(Arc::new(
142 ParquetFormat::default().with_options(parquet_options),
143 ))
144 }
145
146 fn default(&self) -> Arc<dyn FileFormat> {
147 Arc::new(ParquetFormat::default())
148 }
149
150 fn as_any(&self) -> &dyn Any {
151 self
152 }
153}
154
155impl GetExt for ParquetFormatFactory {
156 fn get_ext(&self) -> String {
157 DEFAULT_PARQUET_EXTENSION[1..].to_string()
159 }
160}
161
162impl Debug for ParquetFormatFactory {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 f.debug_struct("ParquetFormatFactory")
165 .field("ParquetFormatFactory", &self.options)
166 .finish()
167 }
168}
169#[derive(Debug, Default)]
171pub struct ParquetFormat {
172 options: TableParquetOptions,
173}
174
175impl ParquetFormat {
176 pub fn new() -> Self {
178 Self::default()
179 }
180
181 pub fn with_enable_pruning(mut self, enable: bool) -> Self {
184 self.options.global.pruning = enable;
185 self
186 }
187
188 pub fn enable_pruning(&self) -> bool {
190 self.options.global.pruning
191 }
192
193 pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
200 self.options.global.metadata_size_hint = size_hint;
201 self
202 }
203
204 pub fn metadata_size_hint(&self) -> Option<usize> {
206 self.options.global.metadata_size_hint
207 }
208
209 pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
215 self.options.global.skip_metadata = skip_metadata;
216 self
217 }
218
219 pub fn skip_metadata(&self) -> bool {
222 self.options.global.skip_metadata
223 }
224
225 pub fn with_options(mut self, options: TableParquetOptions) -> Self {
227 self.options = options;
228 self
229 }
230
231 pub fn options(&self) -> &TableParquetOptions {
233 &self.options
234 }
235
236 pub fn force_view_types(&self) -> bool {
248 self.options.global.schema_force_view_types
249 }
250
251 pub fn with_force_view_types(mut self, use_views: bool) -> Self {
253 self.options.global.schema_force_view_types = use_views;
254 self
255 }
256
257 pub fn binary_as_string(&self) -> bool {
266 self.options.global.binary_as_string
267 }
268
269 pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
271 self.options.global.binary_as_string = binary_as_string;
272 self
273 }
274
275 pub fn coerce_int96(&self) -> Option<String> {
276 self.options.global.coerce_int96.clone()
277 }
278
279 pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
280 self.options.global.coerce_int96 = time_unit;
281 self
282 }
283}
284
285fn clear_metadata(
288 schemas: impl IntoIterator<Item = Schema>,
289) -> impl Iterator<Item = Schema> {
290 schemas.into_iter().map(|schema| {
291 let fields = schema
292 .fields()
293 .iter()
294 .map(|field| {
295 field.as_ref().clone().with_metadata(Default::default()) })
297 .collect::<Fields>();
298 Schema::new(fields)
299 })
300}
301
302#[cfg(feature = "parquet_encryption")]
303async fn get_file_decryption_properties(
304 state: &dyn Session,
305 options: &TableParquetOptions,
306 file_path: &Path,
307) -> Result<Option<Arc<FileDecryptionProperties>>> {
308 Ok(match &options.crypto.file_decryption {
309 Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))),
310 None => match &options.crypto.factory_id {
311 Some(factory_id) => {
312 let factory =
313 state.runtime_env().parquet_encryption_factory(factory_id)?;
314 factory
315 .get_file_decryption_properties(
316 &options.crypto.factory_options,
317 file_path,
318 )
319 .await?
320 }
321 None => None,
322 },
323 })
324}
325
326#[cfg(not(feature = "parquet_encryption"))]
327async fn get_file_decryption_properties(
328 _state: &dyn Session,
329 _options: &TableParquetOptions,
330 _file_path: &Path,
331) -> Result<Option<Arc<FileDecryptionProperties>>> {
332 Ok(None)
333}
334
335#[async_trait]
336impl FileFormat for ParquetFormat {
337 fn as_any(&self) -> &dyn Any {
338 self
339 }
340
341 fn get_ext(&self) -> String {
342 ParquetFormatFactory::new().get_ext()
343 }
344
345 fn get_ext_with_compression(
346 &self,
347 file_compression_type: &FileCompressionType,
348 ) -> Result<String> {
349 let ext = self.get_ext();
350 match file_compression_type.get_variant() {
351 CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
352 _ => internal_err!("Parquet FileFormat does not support compression."),
353 }
354 }
355
356 fn compression_type(&self) -> Option<FileCompressionType> {
357 None
358 }
359
360 async fn infer_schema(
361 &self,
362 state: &dyn Session,
363 store: &Arc<dyn ObjectStore>,
364 objects: &[ObjectMeta],
365 ) -> Result<SchemaRef> {
366 let coerce_int96 = match self.coerce_int96() {
367 Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
368 None => None,
369 };
370
371 let file_metadata_cache =
372 state.runtime_env().cache_manager.get_file_metadata_cache();
373
374 let mut schemas: Vec<_> = futures::stream::iter(objects)
375 .map(|object| async {
376 let file_decryption_properties = get_file_decryption_properties(
377 state,
378 &self.options,
379 &object.location,
380 )
381 .await?;
382 let result = DFParquetMetadata::new(store.as_ref(), object)
383 .with_metadata_size_hint(self.metadata_size_hint())
384 .with_decryption_properties(file_decryption_properties)
385 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
386 .with_coerce_int96(coerce_int96)
387 .fetch_schema_with_location()
388 .await?;
389 Ok::<_, DataFusionError>(result)
390 })
391 .boxed() .buffered(state.config_options().execution.meta_fetch_concurrency)
394 .try_collect()
395 .await?;
396
397 schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
404
405 let schemas = schemas
406 .into_iter()
407 .map(|(_, schema)| schema)
408 .collect::<Vec<_>>();
409
410 let schema = if self.skip_metadata() {
411 Schema::try_merge(clear_metadata(schemas))
412 } else {
413 Schema::try_merge(schemas)
414 }?;
415
416 let schema = if self.binary_as_string() {
417 transform_binary_to_string(&schema)
418 } else {
419 schema
420 };
421
422 let schema = if self.force_view_types() {
423 transform_schema_to_view(&schema)
424 } else {
425 schema
426 };
427
428 Ok(Arc::new(schema))
429 }
430
431 async fn infer_stats(
432 &self,
433 state: &dyn Session,
434 store: &Arc<dyn ObjectStore>,
435 table_schema: SchemaRef,
436 object: &ObjectMeta,
437 ) -> Result<Statistics> {
438 let file_decryption_properties =
439 get_file_decryption_properties(state, &self.options, &object.location)
440 .await?;
441 let file_metadata_cache =
442 state.runtime_env().cache_manager.get_file_metadata_cache();
443 DFParquetMetadata::new(store, object)
444 .with_metadata_size_hint(self.metadata_size_hint())
445 .with_decryption_properties(file_decryption_properties)
446 .with_file_metadata_cache(Some(file_metadata_cache))
447 .fetch_statistics(&table_schema)
448 .await
449 }
450
451 async fn create_physical_plan(
452 &self,
453 state: &dyn Session,
454 conf: FileScanConfig,
455 ) -> Result<Arc<dyn ExecutionPlan>> {
456 let mut metadata_size_hint = None;
457
458 if let Some(metadata) = self.metadata_size_hint() {
459 metadata_size_hint = Some(metadata);
460 }
461
462 let mut source = ParquetSource::new(self.options.clone());
463
464 let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
466 let store = state
467 .runtime_env()
468 .object_store(conf.object_store_url.clone())?;
469 let cached_parquet_read_factory =
470 Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
471 source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
472
473 if let Some(metadata_size_hint) = metadata_size_hint {
474 source = source.with_metadata_size_hint(metadata_size_hint)
475 }
476
477 source = self.set_source_encryption_factory(source, state)?;
478
479 let file_source = source.apply_schema_adapter(&conf)?;
481
482 let conf = FileScanConfigBuilder::from(conf)
483 .with_source(file_source)
484 .build();
485 Ok(DataSourceExec::from_data_source(conf))
486 }
487
488 async fn create_writer_physical_plan(
489 &self,
490 input: Arc<dyn ExecutionPlan>,
491 _state: &dyn Session,
492 conf: FileSinkConfig,
493 order_requirements: Option<LexRequirement>,
494 ) -> Result<Arc<dyn ExecutionPlan>> {
495 if conf.insert_op != InsertOp::Append {
496 return not_impl_err!("Overwrites are not implemented yet for Parquet");
497 }
498
499 let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
500
501 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
502 }
503
504 fn file_source(&self) -> Arc<dyn FileSource> {
505 Arc::new(ParquetSource::default())
506 }
507}
508
509#[cfg(feature = "parquet_encryption")]
510impl ParquetFormat {
511 fn set_source_encryption_factory(
512 &self,
513 source: ParquetSource,
514 state: &dyn Session,
515 ) -> Result<ParquetSource> {
516 if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
517 Ok(source.with_encryption_factory(
518 state
519 .runtime_env()
520 .parquet_encryption_factory(encryption_factory_id)?,
521 ))
522 } else {
523 Ok(source)
524 }
525 }
526}
527
528#[cfg(not(feature = "parquet_encryption"))]
529impl ParquetFormat {
530 fn set_source_encryption_factory(
531 &self,
532 source: ParquetSource,
533 _state: &dyn Session,
534 ) -> Result<ParquetSource> {
535 if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
536 Err(DataFusionError::Configuration(
537 format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled")))
538 } else {
539 Ok(source)
540 }
541 }
542}
543
544pub fn apply_file_schema_type_coercions(
560 table_schema: &Schema,
561 file_schema: &Schema,
562) -> Option<Schema> {
563 let mut needs_view_transform = false;
564 let mut needs_string_transform = false;
565
566 let table_fields: HashMap<_, _> = table_schema
569 .fields()
570 .iter()
571 .map(|f| {
572 let dt = f.data_type();
573 if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
575 needs_view_transform = true;
576 }
577 if matches!(
579 dt,
580 &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
581 ) {
582 needs_string_transform = true;
583 }
584
585 (f.name(), dt)
586 })
587 .collect();
588
589 if !needs_view_transform && !needs_string_transform {
591 return None;
592 }
593
594 let transformed_fields: Vec<Arc<Field>> = file_schema
595 .fields()
596 .iter()
597 .map(|field| {
598 let field_name = field.name();
599 let field_type = field.data_type();
600
601 if let Some(table_type) = table_fields.get(field_name) {
603 match (table_type, field_type) {
604 (
606 &DataType::Utf8,
607 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
608 ) => {
609 return field_with_new_type(field, DataType::Utf8);
610 }
611 (
613 &DataType::LargeUtf8,
614 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
615 ) => {
616 return field_with_new_type(field, DataType::LargeUtf8);
617 }
618 (
620 &DataType::Utf8View,
621 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
622 ) => {
623 return field_with_new_type(field, DataType::Utf8View);
624 }
625 (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
627 return field_with_new_type(field, DataType::Utf8View);
628 }
629 (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
630 return field_with_new_type(field, DataType::BinaryView);
631 }
632 _ => {}
633 }
634 }
635
636 Arc::clone(field)
638 })
639 .collect();
640
641 Some(Schema::new_with_metadata(
642 transformed_fields,
643 file_schema.metadata.clone(),
644 ))
645}
646
647pub fn coerce_int96_to_resolution(
649 parquet_schema: &SchemaDescriptor,
650 file_schema: &Schema,
651 time_unit: &TimeUnit,
652) -> Option<Schema> {
653 let int96_fields: HashSet<_> = parquet_schema
656 .columns()
657 .iter()
658 .filter(|f| f.physical_type() == Type::INT96)
659 .map(|f| f.path().string())
660 .collect();
661
662 if int96_fields.is_empty() {
663 return None;
665 }
666
667 type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
671 type StackContext<'a> = (
672 Vec<&'a str>, &'a FieldRef, NestedFields, Option<NestedFields>, );
684
685 let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
688
689 let transformed_schema = {
693 let mut stack: Vec<StackContext> = file_schema
695 .fields()
696 .iter()
697 .rev()
698 .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
699 .collect();
700
701 while let Some((parquet_path, current_field, parent_fields, child_fields)) =
703 stack.pop()
704 {
705 match (current_field.data_type(), child_fields) {
706 (DataType::Struct(unprocessed_children), None) => {
707 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
712 unprocessed_children.len(),
713 )));
714 stack.push((
717 parquet_path.clone(),
718 current_field,
719 parent_fields,
720 Some(Rc::clone(&child_fields)),
721 ));
722 for child in unprocessed_children.into_iter().rev() {
725 let mut child_path = parquet_path.clone();
726 child_path.push(".");
729 child_path.push(child.name());
730 stack.push((child_path, child, Rc::clone(&child_fields), None));
733 }
734 }
735 (DataType::Struct(unprocessed_children), Some(processed_children)) => {
736 let processed_children = processed_children.borrow();
740 assert_eq!(processed_children.len(), unprocessed_children.len());
741 let processed_struct = Field::new_struct(
742 current_field.name(),
743 processed_children.as_slice(),
744 current_field.is_nullable(),
745 );
746 parent_fields.borrow_mut().push(Arc::new(processed_struct));
747 }
748 (DataType::List(unprocessed_child), None) => {
749 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
751 stack.push((
752 parquet_path.clone(),
753 current_field,
754 parent_fields,
755 Some(Rc::clone(&child_fields)),
756 ));
757 let mut child_path = parquet_path.clone();
758 child_path.push(".list.");
762 child_path.push(unprocessed_child.name());
763 stack.push((
764 child_path.clone(),
765 unprocessed_child,
766 Rc::clone(&child_fields),
767 None,
768 ));
769 }
770 (DataType::List(_), Some(processed_children)) => {
771 let processed_children = processed_children.borrow();
773 assert_eq!(processed_children.len(), 1);
774 let processed_list = Field::new_list(
775 current_field.name(),
776 Arc::clone(&processed_children[0]),
777 current_field.is_nullable(),
778 );
779 parent_fields.borrow_mut().push(Arc::new(processed_list));
780 }
781 (DataType::Map(unprocessed_child, _), None) => {
782 let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
784 stack.push((
785 parquet_path.clone(),
786 current_field,
787 parent_fields,
788 Some(Rc::clone(&child_fields)),
789 ));
790 let mut child_path = parquet_path.clone();
791 child_path.push(".");
792 child_path.push(unprocessed_child.name());
793 stack.push((
794 child_path.clone(),
795 unprocessed_child,
796 Rc::clone(&child_fields),
797 None,
798 ));
799 }
800 (DataType::Map(_, sorted), Some(processed_children)) => {
801 let processed_children = processed_children.borrow();
803 assert_eq!(processed_children.len(), 1);
804 let processed_map = Field::new(
805 current_field.name(),
806 DataType::Map(Arc::clone(&processed_children[0]), *sorted),
807 current_field.is_nullable(),
808 );
809 parent_fields.borrow_mut().push(Arc::new(processed_map));
810 }
811 (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
812 if int96_fields.contains(parquet_path.concat().as_str()) =>
813 {
816 parent_fields.borrow_mut().push(field_with_new_type(
817 current_field,
818 DataType::Timestamp(*time_unit, None),
819 ));
820 }
821 _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
823 }
824 }
825 assert_eq!(fields.borrow().len(), file_schema.fields.len());
826 Schema::new_with_metadata(
827 fields.borrow_mut().clone(),
828 file_schema.metadata.clone(),
829 )
830 };
831
832 Some(transformed_schema)
833}
834
835#[deprecated(
837 since = "47.0.0",
838 note = "Use `apply_file_schema_type_coercions` instead"
839)]
840pub fn coerce_file_schema_to_view_type(
841 table_schema: &Schema,
842 file_schema: &Schema,
843) -> Option<Schema> {
844 let mut transform = false;
845 let table_fields: HashMap<_, _> = table_schema
846 .fields
847 .iter()
848 .map(|f| {
849 let dt = f.data_type();
850 if dt.equals_datatype(&DataType::Utf8View)
851 || dt.equals_datatype(&DataType::BinaryView)
852 {
853 transform = true;
854 }
855 (f.name(), dt)
856 })
857 .collect();
858
859 if !transform {
860 return None;
861 }
862
863 let transformed_fields: Vec<Arc<Field>> = file_schema
864 .fields
865 .iter()
866 .map(
867 |field| match (table_fields.get(field.name()), field.data_type()) {
868 (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
869 field_with_new_type(field, DataType::Utf8View)
870 }
871 (
872 Some(DataType::BinaryView),
873 DataType::Binary | DataType::LargeBinary,
874 ) => field_with_new_type(field, DataType::BinaryView),
875 _ => Arc::clone(field),
876 },
877 )
878 .collect();
879
880 Some(Schema::new_with_metadata(
881 transformed_fields,
882 file_schema.metadata.clone(),
883 ))
884}
885
886#[deprecated(
890 since = "47.0.0",
891 note = "Use `apply_file_schema_type_coercions` instead"
892)]
893pub fn coerce_file_schema_to_string_type(
894 table_schema: &Schema,
895 file_schema: &Schema,
896) -> Option<Schema> {
897 let mut transform = false;
898 let table_fields: HashMap<_, _> = table_schema
899 .fields
900 .iter()
901 .map(|f| (f.name(), f.data_type()))
902 .collect();
903 let transformed_fields: Vec<Arc<Field>> = file_schema
904 .fields
905 .iter()
906 .map(
907 |field| match (table_fields.get(field.name()), field.data_type()) {
908 (
910 Some(DataType::Utf8),
911 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
912 ) => {
913 transform = true;
914 field_with_new_type(field, DataType::Utf8)
915 }
916 (
918 Some(DataType::LargeUtf8),
919 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
920 ) => {
921 transform = true;
922 field_with_new_type(field, DataType::LargeUtf8)
923 }
924 (
926 Some(DataType::Utf8View),
927 DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
928 ) => {
929 transform = true;
930 field_with_new_type(field, DataType::Utf8View)
931 }
932 _ => Arc::clone(field),
933 },
934 )
935 .collect();
936
937 if !transform {
938 None
939 } else {
940 Some(Schema::new_with_metadata(
941 transformed_fields,
942 file_schema.metadata.clone(),
943 ))
944 }
945}
946
947fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
950 Arc::new(field.as_ref().clone().with_data_type(new_type))
951}
952
953pub fn transform_schema_to_view(schema: &Schema) -> Schema {
957 let transformed_fields: Vec<Arc<Field>> = schema
958 .fields
959 .iter()
960 .map(|field| match field.data_type() {
961 DataType::Utf8 | DataType::LargeUtf8 => {
962 field_with_new_type(field, DataType::Utf8View)
963 }
964 DataType::Binary | DataType::LargeBinary => {
965 field_with_new_type(field, DataType::BinaryView)
966 }
967 _ => Arc::clone(field),
968 })
969 .collect();
970 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
971}
972
973pub fn transform_binary_to_string(schema: &Schema) -> Schema {
975 let transformed_fields: Vec<Arc<Field>> = schema
976 .fields
977 .iter()
978 .map(|field| match field.data_type() {
979 DataType::Binary => field_with_new_type(field, DataType::Utf8),
980 DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
981 DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
982 _ => Arc::clone(field),
983 })
984 .collect();
985 Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
986}
987
988pub struct ObjectStoreFetch<'a> {
990 store: &'a dyn ObjectStore,
991 meta: &'a ObjectMeta,
992}
993
994impl<'a> ObjectStoreFetch<'a> {
995 pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
996 Self { store, meta }
997 }
998}
999
1000impl MetadataFetch for ObjectStoreFetch<'_> {
1001 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
1002 async {
1003 self.store
1004 .get_range(&self.meta.location, range)
1005 .await
1006 .map_err(ParquetError::from)
1007 }
1008 .boxed()
1009 }
1010}
1011
1012#[deprecated(
1019 since = "50.0.0",
1020 note = "Use `DFParquetMetadata::fetch_metadata` instead"
1021)]
1022pub async fn fetch_parquet_metadata(
1023 store: &dyn ObjectStore,
1024 object_meta: &ObjectMeta,
1025 size_hint: Option<usize>,
1026 decryption_properties: Option<&FileDecryptionProperties>,
1027 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1028) -> Result<Arc<ParquetMetaData>> {
1029 let decryption_properties = decryption_properties.cloned().map(Arc::new);
1030 DFParquetMetadata::new(store, object_meta)
1031 .with_metadata_size_hint(size_hint)
1032 .with_decryption_properties(decryption_properties)
1033 .with_file_metadata_cache(file_metadata_cache)
1034 .fetch_metadata()
1035 .await
1036}
1037
1038#[deprecated(
1042 since = "50.0.0",
1043 note = "Use `DFParquetMetadata::fetch_statistics` instead"
1044)]
1045pub async fn fetch_statistics(
1046 store: &dyn ObjectStore,
1047 table_schema: SchemaRef,
1048 file: &ObjectMeta,
1049 metadata_size_hint: Option<usize>,
1050 decryption_properties: Option<&FileDecryptionProperties>,
1051 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1052) -> Result<Statistics> {
1053 let decryption_properties = decryption_properties.cloned().map(Arc::new);
1054 DFParquetMetadata::new(store, file)
1055 .with_metadata_size_hint(metadata_size_hint)
1056 .with_decryption_properties(decryption_properties)
1057 .with_file_metadata_cache(file_metadata_cache)
1058 .fetch_statistics(&table_schema)
1059 .await
1060}
1061
1062#[deprecated(
1063 since = "50.0.0",
1064 note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
1065)]
1066pub fn statistics_from_parquet_meta_calc(
1067 metadata: &ParquetMetaData,
1068 table_schema: SchemaRef,
1069) -> Result<Statistics> {
1070 DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
1071}
1072
1073pub struct ParquetSink {
1075 config: FileSinkConfig,
1077 parquet_options: TableParquetOptions,
1079 written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
1082}
1083
1084impl Debug for ParquetSink {
1085 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1086 f.debug_struct("ParquetSink").finish()
1087 }
1088}
1089
1090impl DisplayAs for ParquetSink {
1091 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1092 match t {
1093 DisplayFormatType::Default | DisplayFormatType::Verbose => {
1094 write!(f, "ParquetSink(file_groups=",)?;
1095 FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1096 write!(f, ")")
1097 }
1098 DisplayFormatType::TreeRender => {
1099 write!(f, "")
1101 }
1102 }
1103 }
1104}
1105
1106impl ParquetSink {
1107 pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1109 Self {
1110 config,
1111 parquet_options,
1112 written: Default::default(),
1113 }
1114 }
1115
1116 pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
1119 self.written.lock().clone()
1120 }
1121
1122 async fn create_writer_props(
1125 &self,
1126 runtime: &Arc<RuntimeEnv>,
1127 path: &Path,
1128 ) -> Result<WriterProperties> {
1129 let schema = self.config.output_schema();
1130
1131 let mut parquet_opts = self.parquet_options.clone();
1134 if !self.parquet_options.global.skip_arrow_metadata {
1135 parquet_opts.arrow_schema(schema);
1136 }
1137
1138 let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1139 builder = set_writer_encryption_properties(
1140 builder,
1141 runtime,
1142 parquet_opts,
1143 schema,
1144 path,
1145 )
1146 .await?;
1147 Ok(builder.build())
1148 }
1149
1150 async fn create_async_arrow_writer(
1153 &self,
1154 location: &Path,
1155 object_store: Arc<dyn ObjectStore>,
1156 context: &Arc<TaskContext>,
1157 parquet_props: WriterProperties,
1158 ) -> Result<AsyncArrowWriter<BufWriter>> {
1159 let buf_writer = BufWriter::with_capacity(
1160 object_store,
1161 location.clone(),
1162 context
1163 .session_config()
1164 .options()
1165 .execution
1166 .objectstore_writer_buffer_size,
1167 );
1168 let options = ArrowWriterOptions::new()
1169 .with_properties(parquet_props)
1170 .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1171
1172 let writer = AsyncArrowWriter::try_new_with_options(
1173 buf_writer,
1174 get_writer_schema(&self.config),
1175 options,
1176 )?;
1177 Ok(writer)
1178 }
1179
1180 pub fn parquet_options(&self) -> &TableParquetOptions {
1182 &self.parquet_options
1183 }
1184}
1185
1186#[cfg(feature = "parquet_encryption")]
1187async fn set_writer_encryption_properties(
1188 builder: WriterPropertiesBuilder,
1189 runtime: &Arc<RuntimeEnv>,
1190 parquet_opts: TableParquetOptions,
1191 schema: &Arc<Schema>,
1192 path: &Path,
1193) -> Result<WriterPropertiesBuilder> {
1194 if let Some(file_encryption_properties) = parquet_opts.crypto.file_encryption {
1195 return Ok(builder.with_file_encryption_properties(Arc::new(
1197 FileEncryptionProperties::from(file_encryption_properties),
1198 )));
1199 } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
1200 let encryption_factory =
1202 runtime.parquet_encryption_factory(encryption_factory_id)?;
1203 let file_encryption_properties = encryption_factory
1204 .get_file_encryption_properties(
1205 &parquet_opts.crypto.factory_options,
1206 schema,
1207 path,
1208 )
1209 .await?;
1210 if let Some(file_encryption_properties) = file_encryption_properties {
1211 return Ok(
1212 builder.with_file_encryption_properties(file_encryption_properties)
1213 );
1214 }
1215 }
1216 Ok(builder)
1217}
1218
1219#[cfg(not(feature = "parquet_encryption"))]
1220async fn set_writer_encryption_properties(
1221 builder: WriterPropertiesBuilder,
1222 _runtime: &Arc<RuntimeEnv>,
1223 _parquet_opts: TableParquetOptions,
1224 _schema: &Arc<Schema>,
1225 _path: &Path,
1226) -> Result<WriterPropertiesBuilder> {
1227 Ok(builder)
1228}
1229
1230#[async_trait]
1231impl FileSink for ParquetSink {
1232 fn config(&self) -> &FileSinkConfig {
1233 &self.config
1234 }
1235
1236 async fn spawn_writer_tasks_and_join(
1237 &self,
1238 context: &Arc<TaskContext>,
1239 demux_task: SpawnedTask<Result<()>>,
1240 mut file_stream_rx: DemuxedStreamReceiver,
1241 object_store: Arc<dyn ObjectStore>,
1242 ) -> Result<u64> {
1243 let parquet_opts = &self.parquet_options;
1244
1245 let mut file_write_tasks: JoinSet<
1246 std::result::Result<(Path, ParquetMetaData), DataFusionError>,
1247 > = JoinSet::new();
1248
1249 let runtime = context.runtime_env();
1250 let parallel_options = ParallelParquetWriterOptions {
1251 max_parallel_row_groups: parquet_opts
1252 .global
1253 .maximum_parallel_row_group_writers,
1254 max_buffered_record_batches_per_stream: parquet_opts
1255 .global
1256 .maximum_buffered_record_batches_per_stream,
1257 };
1258
1259 while let Some((path, mut rx)) = file_stream_rx.recv().await {
1260 let parquet_props = self.create_writer_props(&runtime, &path).await?;
1261 if !parquet_opts.global.allow_single_file_parallelism {
1262 let mut writer = self
1263 .create_async_arrow_writer(
1264 &path,
1265 Arc::clone(&object_store),
1266 context,
1267 parquet_props.clone(),
1268 )
1269 .await?;
1270 let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
1271 .register(context.memory_pool());
1272 file_write_tasks.spawn(async move {
1273 while let Some(batch) = rx.recv().await {
1274 writer.write(&batch).await?;
1275 reservation.try_resize(writer.memory_size())?;
1276 }
1277 let parquet_meta_data = writer
1278 .close()
1279 .await
1280 .map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1281 Ok((path, parquet_meta_data))
1282 });
1283 } else {
1284 let writer = ObjectWriterBuilder::new(
1285 FileCompressionType::UNCOMPRESSED,
1288 &path,
1289 Arc::clone(&object_store),
1290 )
1291 .with_buffer_size(Some(
1292 context
1293 .session_config()
1294 .options()
1295 .execution
1296 .objectstore_writer_buffer_size,
1297 ))
1298 .build()?;
1299 let schema = get_writer_schema(&self.config);
1300 let props = parquet_props.clone();
1301 let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
1302 let parallel_options_clone = parallel_options.clone();
1303 let pool = Arc::clone(context.memory_pool());
1304 file_write_tasks.spawn(async move {
1305 let parquet_meta_data = output_single_parquet_file_parallelized(
1306 writer,
1307 rx,
1308 schema,
1309 &props,
1310 skip_arrow_metadata,
1311 parallel_options_clone,
1312 pool,
1313 )
1314 .await?;
1315 Ok((path, parquet_meta_data))
1316 });
1317 }
1318 }
1319
1320 let mut row_count = 0;
1321 while let Some(result) = file_write_tasks.join_next().await {
1322 match result {
1323 Ok(r) => {
1324 let (path, parquet_meta_data) = r?;
1325 row_count += parquet_meta_data.file_metadata().num_rows();
1326 let mut written_files = self.written.lock();
1327 written_files
1328 .try_insert(path.clone(), parquet_meta_data)
1329 .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1330 drop(written_files);
1331 }
1332 Err(e) => {
1333 if e.is_panic() {
1334 std::panic::resume_unwind(e.into_panic());
1335 } else {
1336 unreachable!();
1337 }
1338 }
1339 }
1340 }
1341
1342 demux_task
1343 .join_unwind()
1344 .await
1345 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1346
1347 Ok(row_count as u64)
1348 }
1349}
1350
1351#[async_trait]
1352impl DataSink for ParquetSink {
1353 fn as_any(&self) -> &dyn Any {
1354 self
1355 }
1356
1357 fn schema(&self) -> &SchemaRef {
1358 self.config.output_schema()
1359 }
1360
1361 async fn write_all(
1362 &self,
1363 data: SendableRecordBatchStream,
1364 context: &Arc<TaskContext>,
1365 ) -> Result<u64> {
1366 FileSink::write_all(self, data, context).await
1367 }
1368}
1369
1370async fn column_serializer_task(
1373 mut rx: Receiver<ArrowLeafColumn>,
1374 mut writer: ArrowColumnWriter,
1375 mut reservation: MemoryReservation,
1376) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1377 while let Some(col) = rx.recv().await {
1378 writer.write(&col)?;
1379 reservation.try_resize(writer.memory_size())?;
1380 }
1381 Ok((writer, reservation))
1382}
1383
1384type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1385type ColSender = Sender<ArrowLeafColumn>;
1386
1387fn spawn_column_parallel_row_group_writer(
1391 col_writers: Vec<ArrowColumnWriter>,
1392 max_buffer_size: usize,
1393 pool: &Arc<dyn MemoryPool>,
1394) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1395 let num_columns = col_writers.len();
1396
1397 let mut col_writer_tasks = Vec::with_capacity(num_columns);
1398 let mut col_array_channels = Vec::with_capacity(num_columns);
1399 for writer in col_writers.into_iter() {
1400 let (send_array, receive_array) =
1402 mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1403 col_array_channels.push(send_array);
1404
1405 let reservation =
1406 MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1407 let task = SpawnedTask::spawn(column_serializer_task(
1408 receive_array,
1409 writer,
1410 reservation,
1411 ));
1412 col_writer_tasks.push(task);
1413 }
1414
1415 Ok((col_writer_tasks, col_array_channels))
1416}
1417
1418#[derive(Clone)]
1420struct ParallelParquetWriterOptions {
1421 max_parallel_row_groups: usize,
1422 max_buffered_record_batches_per_stream: usize,
1423}
1424
1425type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1428
1429async fn send_arrays_to_col_writers(
1432 col_array_channels: &[ColSender],
1433 rb: &RecordBatch,
1434 schema: Arc<Schema>,
1435) -> Result<()> {
1436 let mut next_channel = 0;
1438 for (array, field) in rb.columns().iter().zip(schema.fields()) {
1439 for c in compute_leaves(field, array)? {
1440 if col_array_channels[next_channel].send(c).await.is_err() {
1443 return Ok(());
1444 }
1445
1446 next_channel += 1;
1447 }
1448 }
1449
1450 Ok(())
1451}
1452
1453fn spawn_rg_join_and_finalize_task(
1456 column_writer_tasks: Vec<ColumnWriterTask>,
1457 rg_rows: usize,
1458 pool: &Arc<dyn MemoryPool>,
1459) -> SpawnedTask<RBStreamSerializeResult> {
1460 let mut rg_reservation =
1461 MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1462
1463 SpawnedTask::spawn(async move {
1464 let num_cols = column_writer_tasks.len();
1465 let mut finalized_rg = Vec::with_capacity(num_cols);
1466 for task in column_writer_tasks.into_iter() {
1467 let (writer, _col_reservation) = task
1468 .join_unwind()
1469 .await
1470 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1471 let encoded_size = writer.get_estimated_total_bytes();
1472 rg_reservation.grow(encoded_size);
1473 finalized_rg.push(writer.close()?);
1474 }
1475
1476 Ok((finalized_rg, rg_reservation, rg_rows))
1477 })
1478}
1479
1480fn spawn_parquet_parallel_serialization_task(
1489 row_group_writer_factory: ArrowRowGroupWriterFactory,
1490 mut data: Receiver<RecordBatch>,
1491 serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1492 schema: Arc<Schema>,
1493 writer_props: Arc<WriterProperties>,
1494 parallel_options: ParallelParquetWriterOptions,
1495 pool: Arc<dyn MemoryPool>,
1496) -> SpawnedTask<Result<(), DataFusionError>> {
1497 SpawnedTask::spawn(async move {
1498 let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1499 let max_row_group_rows = writer_props.max_row_group_size();
1500 let mut row_group_index = 0;
1501 let col_writers =
1502 row_group_writer_factory.create_column_writers(row_group_index)?;
1503 let (mut column_writer_handles, mut col_array_channels) =
1504 spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
1505 let mut current_rg_rows = 0;
1506
1507 while let Some(mut rb) = data.recv().await {
1508 loop {
1512 if current_rg_rows + rb.num_rows() < max_row_group_rows {
1513 send_arrays_to_col_writers(
1514 &col_array_channels,
1515 &rb,
1516 Arc::clone(&schema),
1517 )
1518 .await?;
1519 current_rg_rows += rb.num_rows();
1520 break;
1521 } else {
1522 let rows_left = max_row_group_rows - current_rg_rows;
1523 let a = rb.slice(0, rows_left);
1524 send_arrays_to_col_writers(
1525 &col_array_channels,
1526 &a,
1527 Arc::clone(&schema),
1528 )
1529 .await?;
1530
1531 drop(col_array_channels);
1535 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1536 column_writer_handles,
1537 max_row_group_rows,
1538 &pool,
1539 );
1540
1541 if serialize_tx.send(finalize_rg_task).await.is_err() {
1544 return Ok(());
1545 }
1546
1547 current_rg_rows = 0;
1548 rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1549
1550 row_group_index += 1;
1551 let col_writers = row_group_writer_factory
1552 .create_column_writers(row_group_index)?;
1553 (column_writer_handles, col_array_channels) =
1554 spawn_column_parallel_row_group_writer(
1555 col_writers,
1556 max_buffer_rb,
1557 &pool,
1558 )?;
1559 }
1560 }
1561 }
1562
1563 drop(col_array_channels);
1564 if current_rg_rows > 0 {
1566 let finalize_rg_task = spawn_rg_join_and_finalize_task(
1567 column_writer_handles,
1568 current_rg_rows,
1569 &pool,
1570 );
1571
1572 if serialize_tx.send(finalize_rg_task).await.is_err() {
1575 return Ok(());
1576 }
1577 }
1578
1579 Ok(())
1580 })
1581}
1582
1583async fn concatenate_parallel_row_groups(
1586 mut parquet_writer: SerializedFileWriter<SharedBuffer>,
1587 merged_buff: SharedBuffer,
1588 mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1589 mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1590 pool: Arc<dyn MemoryPool>,
1591) -> Result<ParquetMetaData> {
1592 let mut file_reservation =
1593 MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1594
1595 while let Some(task) = serialize_rx.recv().await {
1596 let result = task.join_unwind().await;
1597 let (serialized_columns, mut rg_reservation, _cnt) =
1598 result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1599
1600 let mut rg_out = parquet_writer.next_row_group()?;
1601 for chunk in serialized_columns {
1602 chunk.append_to_row_group(&mut rg_out)?;
1603 rg_reservation.free();
1604
1605 let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1606 file_reservation.try_resize(buff_to_flush.len())?;
1607
1608 if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1609 object_store_writer
1610 .write_all(buff_to_flush.as_slice())
1611 .await?;
1612 buff_to_flush.clear();
1613 file_reservation.try_resize(buff_to_flush.len())?; }
1615 }
1616 rg_out.close()?;
1617 }
1618
1619 let parquet_meta_data = parquet_writer.close()?;
1620 let final_buff = merged_buff.buffer.try_lock().unwrap();
1621
1622 object_store_writer.write_all(final_buff.as_slice()).await?;
1623 object_store_writer.shutdown().await?;
1624 file_reservation.free();
1625
1626 Ok(parquet_meta_data)
1627}
1628
1629async fn output_single_parquet_file_parallelized(
1634 object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1635 data: Receiver<RecordBatch>,
1636 output_schema: Arc<Schema>,
1637 parquet_props: &WriterProperties,
1638 skip_arrow_metadata: bool,
1639 parallel_options: ParallelParquetWriterOptions,
1640 pool: Arc<dyn MemoryPool>,
1641) -> Result<ParquetMetaData> {
1642 let max_rowgroups = parallel_options.max_parallel_row_groups;
1643 let (serialize_tx, serialize_rx) =
1645 mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1646
1647 let arc_props = Arc::new(parquet_props.clone());
1648 let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1649 let options = ArrowWriterOptions::new()
1650 .with_properties(parquet_props.clone())
1651 .with_skip_arrow_metadata(skip_arrow_metadata);
1652 let writer = ArrowWriter::try_new_with_options(
1653 merged_buff.clone(),
1654 Arc::clone(&output_schema),
1655 options,
1656 )?;
1657 let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
1658
1659 let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1660 row_group_writer_factory,
1661 data,
1662 serialize_tx,
1663 Arc::clone(&output_schema),
1664 Arc::clone(&arc_props),
1665 parallel_options,
1666 Arc::clone(&pool),
1667 );
1668 let parquet_meta_data = concatenate_parallel_row_groups(
1669 writer,
1670 merged_buff,
1671 serialize_rx,
1672 object_store_writer,
1673 pool,
1674 )
1675 .await?;
1676
1677 launch_serialization_task
1678 .join_unwind()
1679 .await
1680 .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1681 Ok(parquet_meta_data)
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686 use parquet::arrow::parquet_to_arrow_schema;
1687 use std::sync::Arc;
1688
1689 use super::*;
1690
1691 use arrow::datatypes::DataType;
1692 use parquet::schema::parser::parse_message_type;
1693
1694 #[test]
1695 fn coerce_int96_to_resolution_with_mixed_timestamps() {
1696 let spark_schema = "
1700 message spark_schema {
1701 optional int96 c0;
1702 optional int64 c1 (TIMESTAMP(NANOS,true));
1703 optional int64 c2 (TIMESTAMP(NANOS,false));
1704 optional int64 c3 (TIMESTAMP(MILLIS,true));
1705 optional int64 c4 (TIMESTAMP(MILLIS,false));
1706 optional int64 c5 (TIMESTAMP(MICROS,true));
1707 optional int64 c6 (TIMESTAMP(MICROS,false));
1708 }
1709 ";
1710
1711 let schema = parse_message_type(spark_schema).expect("should parse schema");
1712 let descr = SchemaDescriptor::new(Arc::new(schema));
1713
1714 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1715
1716 let result =
1717 coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1718 .unwrap();
1719
1720 let expected_schema = Schema::new(vec![
1723 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1724 Field::new(
1725 "c1",
1726 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1727 true,
1728 ),
1729 Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1730 Field::new(
1731 "c3",
1732 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1733 true,
1734 ),
1735 Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1736 Field::new(
1737 "c5",
1738 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1739 true,
1740 ),
1741 Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1742 ]);
1743
1744 assert_eq!(result, expected_schema);
1745 }
1746
1747 #[test]
1748 fn coerce_int96_to_resolution_with_nested_types() {
1749 let spark_schema = "
1754 message spark_schema {
1755 optional int96 c0;
1756 optional group c1 {
1757 optional int96 c0;
1758 }
1759 optional group c2 {
1760 optional group c0 (LIST) {
1761 repeated group list {
1762 optional int96 element;
1763 }
1764 }
1765 }
1766 optional group c3 (LIST) {
1767 repeated group list {
1768 optional int96 element;
1769 }
1770 }
1771 optional group c4 (LIST) {
1772 repeated group list {
1773 optional group element {
1774 optional int96 c0;
1775 optional int96 c1;
1776 }
1777 }
1778 }
1779 optional group c5 (MAP) {
1780 repeated group key_value {
1781 required int96 key;
1782 optional int96 value;
1783 }
1784 }
1785 optional group c6 (LIST) {
1786 repeated group list {
1787 optional group element (MAP) {
1788 repeated group key_value {
1789 required int96 key;
1790 optional int96 value;
1791 }
1792 }
1793 }
1794 }
1795 }
1796 ";
1797
1798 let schema = parse_message_type(spark_schema).expect("should parse schema");
1799 let descr = SchemaDescriptor::new(Arc::new(schema));
1800
1801 let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1802
1803 let result =
1804 coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1805 .unwrap();
1806
1807 let expected_schema = Schema::new(vec![
1808 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1809 Field::new_struct(
1810 "c1",
1811 vec![Field::new(
1812 "c0",
1813 DataType::Timestamp(TimeUnit::Microsecond, None),
1814 true,
1815 )],
1816 true,
1817 ),
1818 Field::new_struct(
1819 "c2",
1820 vec![Field::new_list(
1821 "c0",
1822 Field::new(
1823 "element",
1824 DataType::Timestamp(TimeUnit::Microsecond, None),
1825 true,
1826 ),
1827 true,
1828 )],
1829 true,
1830 ),
1831 Field::new_list(
1832 "c3",
1833 Field::new(
1834 "element",
1835 DataType::Timestamp(TimeUnit::Microsecond, None),
1836 true,
1837 ),
1838 true,
1839 ),
1840 Field::new_list(
1841 "c4",
1842 Field::new_struct(
1843 "element",
1844 vec![
1845 Field::new(
1846 "c0",
1847 DataType::Timestamp(TimeUnit::Microsecond, None),
1848 true,
1849 ),
1850 Field::new(
1851 "c1",
1852 DataType::Timestamp(TimeUnit::Microsecond, None),
1853 true,
1854 ),
1855 ],
1856 true,
1857 ),
1858 true,
1859 ),
1860 Field::new_map(
1861 "c5",
1862 "key_value",
1863 Field::new(
1864 "key",
1865 DataType::Timestamp(TimeUnit::Microsecond, None),
1866 false,
1867 ),
1868 Field::new(
1869 "value",
1870 DataType::Timestamp(TimeUnit::Microsecond, None),
1871 true,
1872 ),
1873 false,
1874 true,
1875 ),
1876 Field::new_list(
1877 "c6",
1878 Field::new_map(
1879 "element",
1880 "key_value",
1881 Field::new(
1882 "key",
1883 DataType::Timestamp(TimeUnit::Microsecond, None),
1884 false,
1885 ),
1886 Field::new(
1887 "value",
1888 DataType::Timestamp(TimeUnit::Microsecond, None),
1889 true,
1890 ),
1891 false,
1892 true,
1893 ),
1894 true,
1895 ),
1896 ]);
1897
1898 assert_eq!(result, expected_schema);
1899 }
1900}