use std::any::Any;
use std::cell::RefCell;
use std::fmt::Debug;
use std::ops::Range;
use std::rc::Rc;
use std::sync::Arc;
use std::{fmt, vec};
use arrow::array::RecordBatch;
use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
use datafusion_datasource::TableSchema;
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
use datafusion_datasource::write::{
ObjectWriterBuilder, SharedBuffer, get_writer_schema,
};
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
use datafusion_common::encryption::FileDecryptionProperties;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
DEFAULT_PARQUET_EXTENSION, DataFusionError, GetExt, HashSet, Result,
internal_datafusion_err, internal_err, not_impl_err,
};
use datafusion_common::{HashMap, Statistics};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_session::Session;
use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns};
use crate::reader::CachedParquetFileReaderFactory;
use crate::source::{ParquetSource, parse_coerce_int96_string};
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::source::DataSourceExec;
use datafusion_execution::cache::cache_manager::FileMetadataCache;
use datafusion_execution::runtime_env::RuntimeEnv;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
use parquet::arrow::arrow_writer::{
ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowRowGroupWriterFactory,
ArrowWriterOptions, compute_leaves,
};
use parquet::arrow::async_reader::MetadataFetch;
use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
use parquet::basic::Type;
#[cfg(feature = "parquet_encryption")]
use parquet::encryption::encrypt::FileEncryptionProperties;
use parquet::errors::ParquetError;
use parquet::file::metadata::{ParquetMetaData, SortingColumn};
use parquet::file::properties::{
DEFAULT_MAX_ROW_GROUP_ROW_COUNT, WriterProperties, WriterPropertiesBuilder,
};
use parquet::file::writer::SerializedFileWriter;
use parquet::schema::types::SchemaDescriptor;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
const INITIAL_BUFFER_BYTES: usize = 1048576;
const BUFFER_FLUSH_BYTES: usize = 1024000;
#[derive(Default)]
pub struct ParquetFormatFactory {
pub options: Option<TableParquetOptions>,
}
impl ParquetFormatFactory {
pub fn new() -> Self {
Self { options: None }
}
pub fn new_with_options(options: TableParquetOptions) -> Self {
Self {
options: Some(options),
}
}
}
impl FileFormatFactory for ParquetFormatFactory {
fn create(
&self,
state: &dyn Session,
format_options: &std::collections::HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
let parquet_options = match &self.options {
None => {
let mut table_options = state.default_table_options();
table_options.set_config_format(ConfigFileType::PARQUET);
table_options.alter_with_string_hash_map(format_options)?;
table_options.parquet
}
Some(parquet_options) => {
let mut parquet_options = parquet_options.clone();
for (k, v) in format_options {
parquet_options.set(k, v)?;
}
parquet_options
}
};
Ok(Arc::new(
ParquetFormat::default().with_options(parquet_options),
))
}
fn default(&self) -> Arc<dyn FileFormat> {
Arc::new(ParquetFormat::default())
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl GetExt for ParquetFormatFactory {
fn get_ext(&self) -> String {
DEFAULT_PARQUET_EXTENSION[1..].to_string()
}
}
impl Debug for ParquetFormatFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ParquetFormatFactory")
.field("ParquetFormatFactory", &self.options)
.finish()
}
}
#[derive(Debug, Default)]
pub struct ParquetFormat {
options: TableParquetOptions,
}
impl ParquetFormat {
pub fn new() -> Self {
Self::default()
}
pub fn with_enable_pruning(mut self, enable: bool) -> Self {
self.options.global.pruning = enable;
self
}
pub fn enable_pruning(&self) -> bool {
self.options.global.pruning
}
pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
self.options.global.metadata_size_hint = size_hint;
self
}
pub fn metadata_size_hint(&self) -> Option<usize> {
self.options.global.metadata_size_hint
}
pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
self.options.global.skip_metadata = skip_metadata;
self
}
pub fn skip_metadata(&self) -> bool {
self.options.global.skip_metadata
}
pub fn with_options(mut self, options: TableParquetOptions) -> Self {
self.options = options;
self
}
pub fn options(&self) -> &TableParquetOptions {
&self.options
}
pub fn force_view_types(&self) -> bool {
self.options.global.schema_force_view_types
}
pub fn with_force_view_types(mut self, use_views: bool) -> Self {
self.options.global.schema_force_view_types = use_views;
self
}
pub fn binary_as_string(&self) -> bool {
self.options.global.binary_as_string
}
pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
self.options.global.binary_as_string = binary_as_string;
self
}
pub fn coerce_int96(&self) -> Option<String> {
self.options.global.coerce_int96.clone()
}
pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
self.options.global.coerce_int96 = time_unit;
self
}
}
fn clear_metadata(
schemas: impl IntoIterator<Item = Schema>,
) -> impl Iterator<Item = Schema> {
schemas.into_iter().map(|schema| {
let fields = schema
.fields()
.iter()
.map(|field| {
field.as_ref().clone().with_metadata(Default::default()) })
.collect::<Fields>();
Schema::new(fields)
})
}
#[cfg(feature = "parquet_encryption")]
async fn get_file_decryption_properties(
state: &dyn Session,
options: &TableParquetOptions,
file_path: &Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
Ok(match &options.crypto.file_decryption {
Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))),
None => match &options.crypto.factory_id {
Some(factory_id) => {
let factory =
state.runtime_env().parquet_encryption_factory(factory_id)?;
factory
.get_file_decryption_properties(
&options.crypto.factory_options,
file_path,
)
.await?
}
None => None,
},
})
}
#[cfg(not(feature = "parquet_encryption"))]
async fn get_file_decryption_properties(
_state: &dyn Session,
_options: &TableParquetOptions,
_file_path: &Path,
) -> Result<Option<Arc<FileDecryptionProperties>>> {
Ok(None)
}
#[async_trait]
impl FileFormat for ParquetFormat {
fn as_any(&self) -> &dyn Any {
self
}
fn get_ext(&self) -> String {
ParquetFormatFactory::new().get_ext()
}
fn get_ext_with_compression(
&self,
file_compression_type: &FileCompressionType,
) -> Result<String> {
let ext = self.get_ext();
match file_compression_type.get_variant() {
CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
_ => internal_err!("Parquet FileFormat does not support compression."),
}
}
fn compression_type(&self) -> Option<FileCompressionType> {
None
}
async fn infer_schema(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let coerce_int96 = match self.coerce_int96() {
Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
None => None,
};
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let mut schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| async {
let file_decryption_properties = get_file_decryption_properties(
state,
&self.options,
&object.location,
)
.await?;
let result = DFParquetMetadata::new(store.as_ref(), object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.with_coerce_int96(coerce_int96)
.fetch_schema_with_location()
.await?;
Ok::<_, DataFusionError>(result)
})
.boxed() .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await?;
schemas
.sort_unstable_by(|(location1, _), (location2, _)| location1.cmp(location2));
let schemas = schemas.into_iter().map(|(_, schema)| schema);
let schema = if self.skip_metadata() {
Schema::try_merge(clear_metadata(schemas))
} else {
Schema::try_merge(schemas)
}?;
let schema = if self.binary_as_string() {
transform_binary_to_string(&schema)
} else {
schema
};
let schema = if self.force_view_types() {
transform_schema_to_view(&schema)
} else {
schema
};
Ok(Arc::new(schema))
}
async fn infer_stats(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics> {
let file_decryption_properties =
get_file_decryption_properties(state, &self.options, &object.location)
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
DFParquetMetadata::new(store, object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(file_metadata_cache))
.fetch_statistics(&table_schema)
.await
}
async fn infer_ordering(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Option<LexOrdering>> {
let file_decryption_properties =
get_file_decryption_properties(state, &self.options, &object.location)
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let metadata = DFParquetMetadata::new(store, object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(file_metadata_cache))
.fetch_metadata()
.await?;
crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)
}
async fn infer_stats_and_ordering(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<datafusion_datasource::file_format::FileMeta> {
let file_decryption_properties =
get_file_decryption_properties(state, &self.options, &object.location)
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let metadata = DFParquetMetadata::new(store, object)
.with_metadata_size_hint(self.metadata_size_hint())
.with_decryption_properties(file_decryption_properties)
.with_file_metadata_cache(Some(file_metadata_cache))
.fetch_metadata()
.await?;
let statistics = DFParquetMetadata::statistics_from_parquet_metadata(
&metadata,
&table_schema,
)?;
let ordering =
crate::metadata::ordering_from_parquet_metadata(&metadata, &table_schema)?;
Ok(
datafusion_datasource::file_format::FileMeta::new(statistics)
.with_ordering(ordering),
)
}
async fn create_physical_plan(
&self,
state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut metadata_size_hint = None;
if let Some(metadata) = self.metadata_size_hint() {
metadata_size_hint = Some(metadata);
}
let mut source = conf
.file_source()
.as_any()
.downcast_ref::<ParquetSource>()
.cloned()
.ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
source = source.with_table_parquet_options(self.options.clone());
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
let store = state
.runtime_env()
.object_store(conf.object_store_url.clone())?;
let cached_parquet_read_factory =
Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
if let Some(metadata_size_hint) = metadata_size_hint {
source = source.with_metadata_size_hint(metadata_size_hint)
}
source = self.set_source_encryption_factory(source, state)?;
let conf = FileScanConfigBuilder::from(conf)
.with_source(Arc::new(source))
.build();
Ok(DataSourceExec::from_data_source(conf))
}
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
_state: &dyn Session,
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
}
let sorting_columns = if let Some(ref requirements) = order_requirements {
let ordering: LexOrdering = requirements.clone().into();
lex_ordering_to_sorting_columns(&ordering).ok()
} else {
None
};
let sink = Arc::new(
ParquetSink::new(conf, self.options.clone())
.with_sorting_columns(sorting_columns),
);
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
Arc::new(
ParquetSource::new(table_schema)
.with_table_parquet_options(self.options.clone()),
)
}
}
#[cfg(feature = "parquet_encryption")]
impl ParquetFormat {
fn set_source_encryption_factory(
&self,
source: ParquetSource,
state: &dyn Session,
) -> Result<ParquetSource> {
if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
Ok(source.with_encryption_factory(
state
.runtime_env()
.parquet_encryption_factory(encryption_factory_id)?,
))
} else {
Ok(source)
}
}
}
#[cfg(not(feature = "parquet_encryption"))]
impl ParquetFormat {
fn set_source_encryption_factory(
&self,
source: ParquetSource,
_state: &dyn Session,
) -> Result<ParquetSource> {
if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
Err(DataFusionError::Configuration(format!(
"Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled"
)))
} else {
Ok(source)
}
}
}
pub fn apply_file_schema_type_coercions(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut needs_view_transform = false;
let mut needs_string_transform = false;
let table_fields: HashMap<_, _> = table_schema
.fields()
.iter()
.map(|f| {
let dt = f.data_type();
if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
needs_view_transform = true;
}
if matches!(
dt,
&DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
) {
needs_string_transform = true;
}
(f.name(), dt)
})
.collect();
if !needs_view_transform && !needs_string_transform {
return None;
}
let transformed_fields: Vec<Arc<Field>> = file_schema
.fields()
.iter()
.map(|field| {
let field_name = field.name();
let field_type = field.data_type();
if let Some(table_type) = table_fields.get(field_name) {
match (table_type, field_type) {
(
&DataType::Utf8,
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
return field_with_new_type(field, DataType::Utf8);
}
(
&DataType::LargeUtf8,
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
return field_with_new_type(field, DataType::LargeUtf8);
}
(
&DataType::Utf8View,
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
return field_with_new_type(field, DataType::Utf8View);
}
(&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
return field_with_new_type(field, DataType::Utf8View);
}
(&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
return field_with_new_type(field, DataType::BinaryView);
}
_ => {}
}
}
Arc::clone(field)
})
.collect();
Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}
pub fn coerce_int96_to_resolution(
parquet_schema: &SchemaDescriptor,
file_schema: &Schema,
time_unit: &TimeUnit,
) -> Option<Schema> {
let int96_fields: HashSet<_> = parquet_schema
.columns()
.iter()
.filter(|f| f.physical_type() == Type::INT96)
.map(|f| f.path().string())
.collect();
if int96_fields.is_empty() {
return None;
}
type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
type StackContext<'a> = (
Vec<&'a str>, &'a FieldRef, NestedFields, Option<NestedFields>, );
let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
let transformed_schema = {
let mut stack: Vec<StackContext> = file_schema
.fields()
.iter()
.rev()
.map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
.collect();
while let Some((parquet_path, current_field, parent_fields, child_fields)) =
stack.pop()
{
match (current_field.data_type(), child_fields) {
(DataType::Struct(unprocessed_children), None) => {
let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
unprocessed_children.len(),
)));
stack.push((
parquet_path.clone(),
current_field,
parent_fields,
Some(Rc::clone(&child_fields)),
));
for child in unprocessed_children.into_iter().rev() {
let mut child_path = parquet_path.clone();
child_path.push(".");
child_path.push(child.name());
stack.push((child_path, child, Rc::clone(&child_fields), None));
}
}
(DataType::Struct(unprocessed_children), Some(processed_children)) => {
let processed_children = processed_children.borrow();
assert_eq!(processed_children.len(), unprocessed_children.len());
let processed_struct = Field::new_struct(
current_field.name(),
processed_children.as_slice(),
current_field.is_nullable(),
);
parent_fields.borrow_mut().push(Arc::new(processed_struct));
}
(DataType::List(unprocessed_child), None) => {
let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
stack.push((
parquet_path.clone(),
current_field,
parent_fields,
Some(Rc::clone(&child_fields)),
));
let mut child_path = parquet_path.clone();
child_path.push(".list.");
child_path.push(unprocessed_child.name());
stack.push((
child_path.clone(),
unprocessed_child,
Rc::clone(&child_fields),
None,
));
}
(DataType::List(_), Some(processed_children)) => {
let processed_children = processed_children.borrow();
assert_eq!(processed_children.len(), 1);
let processed_list = Field::new_list(
current_field.name(),
Arc::clone(&processed_children[0]),
current_field.is_nullable(),
);
parent_fields.borrow_mut().push(Arc::new(processed_list));
}
(DataType::Map(unprocessed_child, _), None) => {
let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
stack.push((
parquet_path.clone(),
current_field,
parent_fields,
Some(Rc::clone(&child_fields)),
));
let mut child_path = parquet_path.clone();
child_path.push(".");
child_path.push(unprocessed_child.name());
stack.push((
child_path.clone(),
unprocessed_child,
Rc::clone(&child_fields),
None,
));
}
(DataType::Map(_, sorted), Some(processed_children)) => {
let processed_children = processed_children.borrow();
assert_eq!(processed_children.len(), 1);
let processed_map = Field::new(
current_field.name(),
DataType::Map(Arc::clone(&processed_children[0]), *sorted),
current_field.is_nullable(),
);
parent_fields.borrow_mut().push(Arc::new(processed_map));
}
(DataType::Timestamp(TimeUnit::Nanosecond, None), None)
if int96_fields.contains(parquet_path.concat().as_str()) =>
{
parent_fields.borrow_mut().push(field_with_new_type(
current_field,
DataType::Timestamp(*time_unit, None),
));
}
_ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
}
}
assert_eq!(fields.borrow().len(), file_schema.fields.len());
Schema::new_with_metadata(
fields.borrow_mut().clone(),
file_schema.metadata.clone(),
)
};
Some(transformed_schema)
}
#[deprecated(
since = "47.0.0",
note = "Use `apply_file_schema_type_coercions` instead"
)]
pub fn coerce_file_schema_to_view_type(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut transform = false;
let table_fields: HashMap<_, _> = table_schema
.fields
.iter()
.map(|f| {
let dt = f.data_type();
if dt.equals_datatype(&DataType::Utf8View)
|| dt.equals_datatype(&DataType::BinaryView)
{
transform = true;
}
(f.name(), dt)
})
.collect();
if !transform {
return None;
}
let transformed_fields: Vec<Arc<Field>> = file_schema
.fields
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
(Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
field_with_new_type(field, DataType::Utf8View)
}
(
Some(DataType::BinaryView),
DataType::Binary | DataType::LargeBinary,
) => field_with_new_type(field, DataType::BinaryView),
_ => Arc::clone(field),
},
)
.collect();
Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}
#[deprecated(
since = "47.0.0",
note = "Use `apply_file_schema_type_coercions` instead"
)]
pub fn coerce_file_schema_to_string_type(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
let mut transform = false;
let table_fields: HashMap<_, _> = table_schema
.fields
.iter()
.map(|f| (f.name(), f.data_type()))
.collect();
let transformed_fields: Vec<Arc<Field>> = file_schema
.fields
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
(
Some(DataType::Utf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::Utf8)
}
(
Some(DataType::LargeUtf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::LargeUtf8)
}
(
Some(DataType::Utf8View),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
field_with_new_type(field, DataType::Utf8View)
}
_ => Arc::clone(field),
},
)
.collect();
if !transform {
None
} else {
Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}
}
fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
Arc::new(field.as_ref().clone().with_data_type(new_type))
}
pub fn transform_schema_to_view(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => {
field_with_new_type(field, DataType::Utf8View)
}
DataType::Binary | DataType::LargeBinary => {
field_with_new_type(field, DataType::BinaryView)
}
_ => Arc::clone(field),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}
pub fn transform_binary_to_string(schema: &Schema) -> Schema {
let transformed_fields: Vec<Arc<Field>> = schema
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Binary => field_with_new_type(field, DataType::Utf8),
DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
_ => Arc::clone(field),
})
.collect();
Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
}
pub struct ObjectStoreFetch<'a> {
store: &'a dyn ObjectStore,
meta: &'a ObjectMeta,
}
impl<'a> ObjectStoreFetch<'a> {
pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
Self { store, meta }
}
}
impl MetadataFetch for ObjectStoreFetch<'_> {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
async {
self.store
.get_range(&self.meta.location, range)
.await
.map_err(ParquetError::from)
}
.boxed()
}
}
#[deprecated(
since = "50.0.0",
note = "Use `DFParquetMetadata::fetch_metadata` instead"
)]
pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
object_meta: &ObjectMeta,
size_hint: Option<usize>,
decryption_properties: Option<&FileDecryptionProperties>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
) -> Result<Arc<ParquetMetaData>> {
let decryption_properties = decryption_properties.cloned().map(Arc::new);
DFParquetMetadata::new(store, object_meta)
.with_metadata_size_hint(size_hint)
.with_decryption_properties(decryption_properties)
.with_file_metadata_cache(file_metadata_cache)
.fetch_metadata()
.await
}
#[deprecated(
since = "50.0.0",
note = "Use `DFParquetMetadata::fetch_statistics` instead"
)]
pub async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
decryption_properties: Option<&FileDecryptionProperties>,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
) -> Result<Statistics> {
let decryption_properties = decryption_properties.cloned().map(Arc::new);
DFParquetMetadata::new(store, file)
.with_metadata_size_hint(metadata_size_hint)
.with_decryption_properties(decryption_properties)
.with_file_metadata_cache(file_metadata_cache)
.fetch_statistics(&table_schema)
.await
}
#[deprecated(
since = "50.0.0",
note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
)]
#[expect(clippy::needless_pass_by_value)]
pub fn statistics_from_parquet_meta_calc(
metadata: &ParquetMetaData,
table_schema: SchemaRef,
) -> Result<Statistics> {
DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
}
pub struct ParquetSink {
config: FileSinkConfig,
parquet_options: TableParquetOptions,
written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
sorting_columns: Option<Vec<SortingColumn>>,
}
impl Debug for ParquetSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ParquetSink").finish()
}
}
impl DisplayAs for ParquetSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ParquetSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
write!(f, ")")
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ParquetSink {
pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
Self {
config,
parquet_options,
written: Default::default(),
sorting_columns: None,
}
}
pub fn with_sorting_columns(
mut self,
sorting_columns: Option<Vec<SortingColumn>>,
) -> Self {
self.sorting_columns = sorting_columns;
self
}
pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
self.written.lock().clone()
}
async fn create_writer_props(
&self,
runtime: &Arc<RuntimeEnv>,
path: &Path,
) -> Result<WriterProperties> {
let schema = self.config.output_schema();
let mut parquet_opts = self.parquet_options.clone();
if !self.parquet_options.global.skip_arrow_metadata {
parquet_opts.arrow_schema(schema);
}
let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
if let Some(ref sorting_columns) = self.sorting_columns {
builder = builder.set_sorting_columns(Some(sorting_columns.clone()));
}
builder = set_writer_encryption_properties(
builder,
runtime,
parquet_opts,
schema,
path,
)
.await?;
Ok(builder.build())
}
async fn create_async_arrow_writer(
&self,
location: &Path,
object_store: Arc<dyn ObjectStore>,
context: &Arc<TaskContext>,
parquet_props: WriterProperties,
) -> Result<AsyncArrowWriter<BufWriter>> {
let buf_writer = BufWriter::with_capacity(
object_store,
location.clone(),
context
.session_config()
.options()
.execution
.objectstore_writer_buffer_size,
);
let options = ArrowWriterOptions::new()
.with_properties(parquet_props)
.with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
let writer = AsyncArrowWriter::try_new_with_options(
buf_writer,
get_writer_schema(&self.config),
options,
)?;
Ok(writer)
}
pub fn parquet_options(&self) -> &TableParquetOptions {
&self.parquet_options
}
}
#[cfg(feature = "parquet_encryption")]
async fn set_writer_encryption_properties(
builder: WriterPropertiesBuilder,
runtime: &Arc<RuntimeEnv>,
parquet_opts: TableParquetOptions,
schema: &Arc<Schema>,
path: &Path,
) -> Result<WriterPropertiesBuilder> {
if let Some(file_encryption_properties) = parquet_opts.crypto.file_encryption {
return Ok(builder.with_file_encryption_properties(Arc::new(
FileEncryptionProperties::from(file_encryption_properties),
)));
} else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
let encryption_factory =
runtime.parquet_encryption_factory(encryption_factory_id)?;
let file_encryption_properties = encryption_factory
.get_file_encryption_properties(
&parquet_opts.crypto.factory_options,
schema,
path,
)
.await?;
if let Some(file_encryption_properties) = file_encryption_properties {
return Ok(
builder.with_file_encryption_properties(file_encryption_properties)
);
}
}
Ok(builder)
}
#[cfg(not(feature = "parquet_encryption"))]
async fn set_writer_encryption_properties(
builder: WriterPropertiesBuilder,
_runtime: &Arc<RuntimeEnv>,
_parquet_opts: TableParquetOptions,
_schema: &Arc<Schema>,
_path: &Path,
) -> Result<WriterPropertiesBuilder> {
Ok(builder)
}
#[async_trait]
impl FileSink for ParquetSink {
fn config(&self) -> &FileSinkConfig {
&self.config
}
async fn spawn_writer_tasks_and_join(
&self,
context: &Arc<TaskContext>,
demux_task: SpawnedTask<Result<()>>,
mut file_stream_rx: DemuxedStreamReceiver,
object_store: Arc<dyn ObjectStore>,
) -> Result<u64> {
let parquet_opts = &self.parquet_options;
let mut file_write_tasks: JoinSet<
std::result::Result<(Path, ParquetMetaData), DataFusionError>,
> = JoinSet::new();
let runtime = context.runtime_env();
let parallel_options = ParallelParquetWriterOptions {
max_parallel_row_groups: parquet_opts
.global
.maximum_parallel_row_group_writers,
max_buffered_record_batches_per_stream: parquet_opts
.global
.maximum_buffered_record_batches_per_stream,
};
while let Some((path, mut rx)) = file_stream_rx.recv().await {
let parquet_props = self.create_writer_props(&runtime, &path).await?;
if !parquet_opts.global.allow_single_file_parallelism {
let mut writer = self
.create_async_arrow_writer(
&path,
Arc::clone(&object_store),
context,
parquet_props.clone(),
)
.await?;
let reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
.register(context.memory_pool());
file_write_tasks.spawn(async move {
while let Some(batch) = rx.recv().await {
writer.write(&batch).await?;
reservation.try_resize(writer.memory_size())?;
}
let parquet_meta_data = writer
.close()
.await
.map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
Ok((path, parquet_meta_data))
});
} else {
let writer = ObjectWriterBuilder::new(
FileCompressionType::UNCOMPRESSED,
&path,
Arc::clone(&object_store),
)
.with_buffer_size(Some(
context
.session_config()
.options()
.execution
.objectstore_writer_buffer_size,
))
.build()?;
let schema = get_writer_schema(&self.config);
let props = parquet_props.clone();
let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
let parallel_options_clone = parallel_options.clone();
let pool = Arc::clone(context.memory_pool());
file_write_tasks.spawn(async move {
let parquet_meta_data = output_single_parquet_file_parallelized(
writer,
rx,
schema,
&props,
skip_arrow_metadata,
parallel_options_clone,
pool,
)
.await?;
Ok((path, parquet_meta_data))
});
}
}
let mut row_count = 0;
while let Some(result) = file_write_tasks.join_next().await {
match result {
Ok(r) => {
let (path, parquet_meta_data) = r?;
row_count += parquet_meta_data.file_metadata().num_rows();
let mut written_files = self.written.lock();
written_files
.try_insert(path.clone(), parquet_meta_data)
.map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
drop(written_files);
}
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
demux_task
.join_unwind()
.await
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
Ok(row_count as u64)
}
}
#[async_trait]
impl DataSink for ParquetSink {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> &SchemaRef {
self.config.output_schema()
}
async fn write_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
FileSink::write_all(self, data, context).await
}
}
async fn column_serializer_task(
mut rx: Receiver<ArrowLeafColumn>,
mut writer: ArrowColumnWriter,
reservation: MemoryReservation,
) -> Result<(ArrowColumnWriter, MemoryReservation)> {
while let Some(col) = rx.recv().await {
writer.write(&col)?;
reservation.try_resize(writer.memory_size())?;
}
Ok((writer, reservation))
}
type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
type ColSender = Sender<ArrowLeafColumn>;
fn spawn_column_parallel_row_group_writer(
col_writers: Vec<ArrowColumnWriter>,
max_buffer_size: usize,
pool: &Arc<dyn MemoryPool>,
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
let num_columns = col_writers.len();
let mut col_writer_tasks = Vec::with_capacity(num_columns);
let mut col_array_channels = Vec::with_capacity(num_columns);
for writer in col_writers.into_iter() {
let (send_array, receive_array) =
mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
col_array_channels.push(send_array);
let reservation =
MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
let task = SpawnedTask::spawn(column_serializer_task(
receive_array,
writer,
reservation,
));
col_writer_tasks.push(task);
}
Ok((col_writer_tasks, col_array_channels))
}
#[derive(Clone)]
struct ParallelParquetWriterOptions {
max_parallel_row_groups: usize,
max_buffered_record_batches_per_stream: usize,
}
type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
async fn send_arrays_to_col_writers(
col_array_channels: &[ColSender],
rb: &RecordBatch,
schema: Arc<Schema>,
) -> Result<()> {
let mut next_channel = 0;
for (array, field) in rb.columns().iter().zip(schema.fields()) {
for c in compute_leaves(field, array)? {
if col_array_channels[next_channel].send(c).await.is_err() {
return Ok(());
}
next_channel += 1;
}
}
Ok(())
}
fn spawn_rg_join_and_finalize_task(
column_writer_tasks: Vec<ColumnWriterTask>,
rg_rows: usize,
pool: &Arc<dyn MemoryPool>,
) -> SpawnedTask<RBStreamSerializeResult> {
let rg_reservation =
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
SpawnedTask::spawn(async move {
let num_cols = column_writer_tasks.len();
let mut finalized_rg = Vec::with_capacity(num_cols);
for task in column_writer_tasks.into_iter() {
let (writer, _col_reservation) = task
.join_unwind()
.await
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
let encoded_size = writer.get_estimated_total_bytes();
rg_reservation.grow(encoded_size);
finalized_rg.push(writer.close()?);
}
Ok((finalized_rg, rg_reservation, rg_rows))
})
}
fn spawn_parquet_parallel_serialization_task(
row_group_writer_factory: ArrowRowGroupWriterFactory,
mut data: Receiver<RecordBatch>,
serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
parallel_options: Arc<ParallelParquetWriterOptions>,
pool: Arc<dyn MemoryPool>,
) -> SpawnedTask<Result<(), DataFusionError>> {
SpawnedTask::spawn(async move {
let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
let max_row_group_rows = writer_props
.max_row_group_row_count()
.unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT);
let mut row_group_index = 0;
let col_writers =
row_group_writer_factory.create_column_writers(row_group_index)?;
let (mut column_writer_handles, mut col_array_channels) =
spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
let mut current_rg_rows = 0;
while let Some(mut rb) = data.recv().await {
loop {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(
&col_array_channels,
&rb,
Arc::clone(&schema),
)
.await?;
current_rg_rows += rb.num_rows();
break;
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(
&col_array_channels,
&a,
Arc::clone(&schema),
)
.await?;
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
&pool,
);
if serialize_tx.send(finalize_rg_task).await.is_err() {
return Ok(());
}
current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);
row_group_index += 1;
let col_writers = row_group_writer_factory
.create_column_writers(row_group_index)?;
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
col_writers,
max_buffer_rb,
&pool,
)?;
}
}
}
drop(col_array_channels);
if current_rg_rows > 0 {
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
current_rg_rows,
&pool,
);
if serialize_tx.send(finalize_rg_task).await.is_err() {
return Ok(());
}
}
Ok(())
})
}
async fn concatenate_parallel_row_groups(
mut parquet_writer: SerializedFileWriter<SharedBuffer>,
merged_buff: SharedBuffer,
mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
pool: Arc<dyn MemoryPool>,
) -> Result<ParquetMetaData> {
let file_reservation =
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let (serialized_columns, rg_reservation, _cnt) =
result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
let mut rg_out = parquet_writer.next_row_group()?;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
rg_reservation.free();
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
file_reservation.try_resize(buff_to_flush.len())?;
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
buff_to_flush.clear();
file_reservation.try_resize(buff_to_flush.len())?; }
}
rg_out.close()?;
}
let parquet_meta_data = parquet_writer.close()?;
let final_buff = merged_buff.buffer.try_lock().unwrap();
object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;
file_reservation.free();
Ok(parquet_meta_data)
}
async fn output_single_parquet_file_parallelized(
object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
data: Receiver<RecordBatch>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
skip_arrow_metadata: bool,
parallel_options: ParallelParquetWriterOptions,
pool: Arc<dyn MemoryPool>,
) -> Result<ParquetMetaData> {
let max_rowgroups = parallel_options.max_parallel_row_groups;
let (serialize_tx, serialize_rx) =
mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
let arc_props = Arc::new(parquet_props.clone());
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
let options = ArrowWriterOptions::new()
.with_properties(parquet_props.clone())
.with_skip_arrow_metadata(skip_arrow_metadata);
let writer = ArrowWriter::try_new_with_options(
merged_buff.clone(),
Arc::clone(&output_schema),
options,
)?;
let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
let launch_serialization_task = spawn_parquet_parallel_serialization_task(
row_group_writer_factory,
data,
serialize_tx,
Arc::clone(&output_schema),
Arc::clone(&arc_props),
parallel_options.into(),
Arc::clone(&pool),
);
let parquet_meta_data = concatenate_parallel_row_groups(
writer,
merged_buff,
serialize_rx,
object_store_writer,
pool,
)
.await?;
launch_serialization_task
.join_unwind()
.await
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
Ok(parquet_meta_data)
}
#[cfg(test)]
mod tests {
use parquet::arrow::parquet_to_arrow_schema;
use std::sync::Arc;
use super::*;
use arrow::datatypes::DataType;
use parquet::schema::parser::parse_message_type;
#[test]
fn coerce_int96_to_resolution_with_mixed_timestamps() {
let spark_schema = "
message spark_schema {
optional int96 c0;
optional int64 c1 (TIMESTAMP(NANOS,true));
optional int64 c2 (TIMESTAMP(NANOS,false));
optional int64 c3 (TIMESTAMP(MILLIS,true));
optional int64 c4 (TIMESTAMP(MILLIS,false));
optional int64 c5 (TIMESTAMP(MICROS,true));
optional int64 c6 (TIMESTAMP(MICROS,false));
}
";
let schema = parse_message_type(spark_schema).expect("should parse schema");
let descr = SchemaDescriptor::new(Arc::new(schema));
let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
let result =
coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
.unwrap();
let expected_schema = Schema::new(vec![
Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
Field::new(
"c1",
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
true,
),
Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
Field::new(
"c3",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
true,
),
Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new(
"c5",
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
true,
),
Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
]);
assert_eq!(result, expected_schema);
}
#[test]
fn coerce_int96_to_resolution_with_nested_types() {
let spark_schema = "
message spark_schema {
optional int96 c0;
optional group c1 {
optional int96 c0;
}
optional group c2 {
optional group c0 (LIST) {
repeated group list {
optional int96 element;
}
}
}
optional group c3 (LIST) {
repeated group list {
optional int96 element;
}
}
optional group c4 (LIST) {
repeated group list {
optional group element {
optional int96 c0;
optional int96 c1;
}
}
}
optional group c5 (MAP) {
repeated group key_value {
required int96 key;
optional int96 value;
}
}
optional group c6 (LIST) {
repeated group list {
optional group element (MAP) {
repeated group key_value {
required int96 key;
optional int96 value;
}
}
}
}
}
";
let schema = parse_message_type(spark_schema).expect("should parse schema");
let descr = SchemaDescriptor::new(Arc::new(schema));
let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
let result =
coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
.unwrap();
let expected_schema = Schema::new(vec![
Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
Field::new_struct(
"c1",
vec![Field::new(
"c0",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
)],
true,
),
Field::new_struct(
"c2",
vec![Field::new_list(
"c0",
Field::new(
"element",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
true,
)],
true,
),
Field::new_list(
"c3",
Field::new(
"element",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
true,
),
Field::new_list(
"c4",
Field::new_struct(
"element",
vec![
Field::new(
"c0",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
Field::new(
"c1",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
],
true,
),
true,
),
Field::new_map(
"c5",
"key_value",
Field::new(
"key",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
Field::new(
"value",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
false,
true,
),
Field::new_list(
"c6",
Field::new_map(
"element",
"key_value",
Field::new(
"key",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
Field::new(
"value",
DataType::Timestamp(TimeUnit::Microsecond, None),
true,
),
false,
true,
),
true,
),
]);
assert_eq!(result, expected_schema);
}
}