use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::{
Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream,
Statistics,
};
use datafusion_common::config::TableParquetOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{
exec_err, internal_datafusion_err, not_impl_err, DataFusionError,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use object_store::buffered::BufWriter;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn,
};
use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;
use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
const INITIAL_BUFFER_BYTES: usize = 1048576;
const BUFFER_FLUSH_BYTES: usize = 1024000;
#[derive(Debug, Default)]
pub struct ParquetFormat {
options: TableParquetOptions,
}
impl ParquetFormat {
pub fn new() -> Self {
Self::default()
}
pub fn with_enable_pruning(mut self, enable: bool) -> Self {
self.options.global.pruning = enable;
self
}
pub fn enable_pruning(&self) -> bool {
self.options.global.pruning
}
pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
self.options.global.metadata_size_hint = size_hint;
self
}
pub fn metadata_size_hint(&self) -> Option<usize> {
self.options.global.metadata_size_hint
}
pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
self.options.global.skip_metadata = skip_metadata;
self
}
pub fn skip_metadata(&self) -> bool {
self.options.global.skip_metadata
}
pub fn with_options(mut self, options: TableParquetOptions) -> Self {
self.options = options;
self
}
pub fn options(&self) -> &TableParquetOptions {
&self.options
}
}
fn clear_metadata(
schemas: impl IntoIterator<Item = Schema>,
) -> impl Iterator<Item = Schema> {
schemas.into_iter().map(|schema| {
let fields = schema
.fields()
.iter()
.map(|field| {
field.as_ref().clone().with_metadata(Default::default()) })
.collect::<Fields>();
Schema::new(fields)
})
}
async fn fetch_schema_with_location(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<(Path, Schema)> {
let loc_path = file.location.clone();
let schema = fetch_schema(store, file, metadata_size_hint).await?;
Ok((loc_path, schema))
}
#[async_trait]
impl FileFormat for ParquetFormat {
fn as_any(&self) -> &dyn Any {
self
}
async fn infer_schema(
&self,
state: &SessionState,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef> {
let mut schemas: Vec<_> = futures::stream::iter(objects)
.map(|object| {
fetch_schema_with_location(
store.as_ref(),
object,
self.metadata_size_hint(),
)
})
.boxed() .buffered(state.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await?;
schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
let schemas = schemas
.into_iter()
.map(|(_, schema)| schema)
.collect::<Vec<_>>();
let schema = if self.skip_metadata() {
Schema::try_merge(clear_metadata(schemas))
} else {
Schema::try_merge(schemas)
}?;
Ok(Arc::new(schema))
}
async fn infer_stats(
&self,
_state: &SessionState,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics> {
let stats = fetch_statistics(
store.as_ref(),
table_schema,
object,
self.metadata_size_hint(),
)
.await?;
Ok(stats)
}
async fn create_physical_plan(
&self,
_state: &SessionState,
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut builder =
ParquetExecBuilder::new_with_options(conf, self.options.clone());
if self.enable_pruning() {
if let Some(predicate) = filters.cloned() {
builder = builder.with_predicate(predicate);
}
}
if let Some(metadata_size_hint) = self.metadata_size_hint() {
builder = builder.with_metadata_size_hint(metadata_size_hint);
}
Ok(builder.build_arc())
}
async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
order_requirements: Option<Vec<PhysicalSortRequirement>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
}
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
Ok(Arc::new(DataSinkExec::new(
input,
sink,
sink_schema,
order_requirements,
)) as _)
}
}
fn summarize_min_max(
max_values: &mut [Option<MaxAccumulator>],
min_values: &mut [Option<MinAccumulator>],
fields: &Fields,
i: usize,
stat: &ParquetStatistics,
) {
if !stat.has_min_max_set() {
max_values[i] = None;
min_values[i] = None;
return;
}
match stat {
ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
_ => {
max_values[i] = None;
min_values[i] = None;
}
}
}
pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
size_hint: Option<usize>,
) -> Result<ParquetMetaData> {
if meta.size < 8 {
return exec_err!("file size of {} is less than footer", meta.size);
}
let footer_start = if let Some(size_hint) = size_hint {
meta.size.saturating_sub(size_hint)
} else {
meta.size - 8
};
let suffix = store
.get_range(&meta.location, footer_start..meta.size)
.await?;
let suffix_len = suffix.len();
let mut footer = [0; 8];
footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
let length = decode_footer(&footer)?;
if meta.size < length + 8 {
return exec_err!(
"file size of {} is less than footer + metadata {}",
meta.size,
length + 8
);
}
if length > suffix_len - 8 {
let metadata_start = meta.size - length - 8;
let remaining_metadata = store
.get_range(&meta.location, metadata_start..footer_start)
.await?;
let mut metadata = BytesMut::with_capacity(length);
metadata.put(remaining_metadata.as_ref());
metadata.put(&suffix[..suffix_len - 8]);
Ok(decode_metadata(metadata.as_ref())?)
} else {
let metadata_start = meta.size - length - 8;
Ok(decode_metadata(
&suffix[metadata_start - footer_start..suffix_len - 8],
)?)
}
}
async fn fetch_schema(
store: &dyn ObjectStore,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Schema> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
Ok(schema)
}
async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
metadata_size_hint: Option<usize>,
) -> Result<Statistics> {
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();
let file_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)?;
let num_fields = table_schema.fields().len();
let fields = table_schema.fields();
let mut num_rows = 0;
let mut total_byte_size = 0;
let mut null_counts = vec![Precision::Exact(0); num_fields];
let mut has_statistics = false;
let schema_adapter =
DefaultSchemaAdapterFactory::default().create(table_schema.clone());
let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
for row_group_meta in metadata.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();
let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = HashMap::new();
for (i, column) in row_group_meta.columns().iter().enumerate() {
if let Some(stat) = column.statistics() {
has_statistics = true;
column_stats.insert(i, (stat.null_count(), stat));
}
}
if has_statistics {
for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() {
if let Some(file_idx) =
schema_adapter.map_column_index(table_idx, &file_schema)
{
if let Some((null_count, stats)) = column_stats.get(&file_idx) {
*null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize));
summarize_min_max(
&mut max_values,
&mut min_values,
fields,
table_idx,
stats,
)
} else {
max_values[table_idx] = None;
min_values[table_idx] = None;
}
} else {
*null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize));
}
}
}
}
let column_stats = if has_statistics {
get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values)
} else {
Statistics::unknown_column(&table_schema)
};
let statistics = Statistics {
num_rows: Precision::Exact(num_rows as usize),
total_byte_size: Precision::Exact(total_byte_size as usize),
column_statistics: column_stats,
};
Ok(statistics)
}
pub struct ParquetSink {
config: FileSinkConfig,
parquet_options: TableParquetOptions,
written: Arc<parking_lot::Mutex<HashMap<Path, FileMetaData>>>,
}
impl Debug for ParquetSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ParquetSink").finish()
}
}
impl DisplayAs for ParquetSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ParquetSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
}
}
}
impl ParquetSink {
pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
Self {
config,
parquet_options,
written: Default::default(),
}
}
pub fn config(&self) -> &FileSinkConfig {
&self.config
}
pub fn written(&self) -> HashMap<Path, FileMetaData> {
self.written.lock().clone()
}
fn get_writer_schema(&self) -> Arc<Schema> {
if !self.config.table_partition_cols.is_empty() {
let schema = self.config.output_schema();
let partition_names: Vec<_> = self
.config
.table_partition_cols
.iter()
.map(|(s, _)| s)
.collect();
Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|f| !partition_names.contains(&f.name()))
.map(|f| (**f).clone())
.collect::<Vec<_>>(),
))
} else {
self.config.output_schema().clone()
}
}
async fn create_async_arrow_writer(
&self,
location: &Path,
object_store: Arc<dyn ObjectStore>,
parquet_props: WriterProperties,
) -> Result<AsyncArrowWriter<BufWriter>> {
let buf_writer = BufWriter::new(object_store, location.clone());
let writer = AsyncArrowWriter::try_new(
buf_writer,
self.get_writer_schema(),
Some(parquet_props),
)?;
Ok(writer)
}
pub fn parquet_options(&self) -> &TableParquetOptions {
&self.parquet_options
}
}
#[async_trait]
impl DataSink for ParquetSink {
fn as_any(&self) -> &dyn Any {
self
}
fn metrics(&self) -> Option<MetricsSet> {
None
}
async fn write_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?;
let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let parquet_opts = &self.parquet_options;
let allow_single_file_parallelism =
parquet_opts.global.allow_single_file_parallelism;
let part_col = if !self.config.table_partition_cols.is_empty() {
Some(self.config.table_partition_cols.clone())
} else {
None
};
let parallel_options = ParallelParquetWriterOptions {
max_parallel_row_groups: parquet_opts
.global
.maximum_parallel_row_group_writers,
max_buffered_record_batches_per_stream: parquet_opts
.global
.maximum_buffered_record_batches_per_stream,
};
let (demux_task, mut file_stream_rx) = start_demuxer_task(
data,
context,
part_col,
self.config.table_paths[0].clone(),
"parquet".into(),
);
let mut file_write_tasks: JoinSet<
std::result::Result<(Path, FileMetaData), DataFusionError>,
> = JoinSet::new();
while let Some((path, mut rx)) = file_stream_rx.recv().await {
if !allow_single_file_parallelism {
let mut writer = self
.create_async_arrow_writer(
&path,
object_store.clone(),
parquet_props.writer_options().clone(),
)
.await?;
file_write_tasks.spawn(async move {
while let Some(batch) = rx.recv().await {
writer.write(&batch).await?;
}
let file_metadata = writer
.close()
.await
.map_err(DataFusionError::ParquetError)?;
Ok((path, file_metadata))
});
} else {
let writer = create_writer(
FileCompressionType::UNCOMPRESSED,
&path,
object_store.clone(),
)
.await?;
let schema = self.get_writer_schema();
let props = parquet_props.clone();
let parallel_options_clone = parallel_options.clone();
file_write_tasks.spawn(async move {
let file_metadata = output_single_parquet_file_parallelized(
writer,
rx,
schema,
props.writer_options(),
parallel_options_clone,
)
.await?;
Ok((path, file_metadata))
});
}
}
let mut row_count = 0;
while let Some(result) = file_write_tasks.join_next().await {
match result {
Ok(r) => {
let (path, file_metadata) = r?;
row_count += file_metadata.num_rows;
let mut written_files = self.written.lock();
written_files
.try_insert(path.clone(), file_metadata)
.map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
drop(written_files);
}
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
demux_task.join_unwind().await?;
Ok(row_count as u64)
}
}
async fn column_serializer_task(
mut rx: Receiver<ArrowLeafColumn>,
mut writer: ArrowColumnWriter,
) -> Result<ArrowColumnWriter> {
while let Some(col) = rx.recv().await {
writer.write(&col)?;
}
Ok(writer)
}
type ColumnWriterTask = SpawnedTask<Result<ArrowColumnWriter>>;
type ColSender = Sender<ArrowLeafColumn>;
fn spawn_column_parallel_row_group_writer(
schema: Arc<Schema>,
parquet_props: Arc<WriterProperties>,
max_buffer_size: usize,
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
let schema_desc = arrow_to_parquet_schema(&schema)?;
let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
let num_columns = col_writers.len();
let mut col_writer_tasks = Vec::with_capacity(num_columns);
let mut col_array_channels = Vec::with_capacity(num_columns);
for writer in col_writers.into_iter() {
let (send_array, recieve_array) =
mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
col_array_channels.push(send_array);
let task = SpawnedTask::spawn(column_serializer_task(recieve_array, writer));
col_writer_tasks.push(task);
}
Ok((col_writer_tasks, col_array_channels))
}
#[derive(Clone)]
struct ParallelParquetWriterOptions {
max_parallel_row_groups: usize,
max_buffered_record_batches_per_stream: usize,
}
type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, usize)>;
async fn send_arrays_to_col_writers(
col_array_channels: &[ColSender],
rb: &RecordBatch,
schema: Arc<Schema>,
) -> Result<()> {
let mut next_channel = 0;
for (array, field) in rb.columns().iter().zip(schema.fields()) {
for c in compute_leaves(field, array)? {
col_array_channels[next_channel]
.send(c)
.await
.map_err(|_| {
DataFusionError::Internal("Unable to send array to writer!".into())
})?;
next_channel += 1;
}
}
Ok(())
}
fn spawn_rg_join_and_finalize_task(
column_writer_tasks: Vec<ColumnWriterTask>,
rg_rows: usize,
) -> SpawnedTask<RBStreamSerializeResult> {
SpawnedTask::spawn(async move {
let num_cols = column_writer_tasks.len();
let mut finalized_rg = Vec::with_capacity(num_cols);
for task in column_writer_tasks.into_iter() {
let writer = task.join_unwind().await?;
finalized_rg.push(writer.close()?);
}
Ok((finalized_rg, rg_rows))
})
}
fn spawn_parquet_parallel_serialization_task(
mut data: Receiver<RecordBatch>,
serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
parallel_options: ParallelParquetWriterOptions,
) -> SpawnedTask<Result<(), DataFusionError>> {
SpawnedTask::spawn(async move {
let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
let max_row_group_rows = writer_props.max_row_group_size();
let (mut column_writer_handles, mut col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
let mut current_rg_rows = 0;
while let Some(mut rb) = data.recv().await {
loop {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
break;
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);
serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
}
}
}
drop(col_array_channels);
if current_rg_rows > 0 {
let finalize_rg_task =
spawn_rg_join_and_finalize_task(column_writer_handles, current_rg_rows);
serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
}
Ok(())
})
}
async fn concatenate_parallel_row_groups(
mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
) -> Result<FileMetaData> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
let mut parquet_writer = SerializedFileWriter::new(
merged_buff.clone(),
schema_desc.root_schema_ptr(),
writer_props,
)?;
while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, _cnt) = result?;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
object_store_writer
.write_all(buff_to_flush.as_slice())
.await?;
buff_to_flush.clear();
}
}
rg_out.close()?;
}
let file_metadata = parquet_writer.close()?;
let final_buff = merged_buff.buffer.try_lock().unwrap();
object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;
Ok(file_metadata)
}
async fn output_single_parquet_file_parallelized(
object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
data: Receiver<RecordBatch>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
parallel_options: ParallelParquetWriterOptions,
) -> Result<FileMetaData> {
let max_rowgroups = parallel_options.max_parallel_row_groups;
let (serialize_tx, serialize_rx) =
mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
let arc_props = Arc::new(parquet_props.clone());
let launch_serialization_task = spawn_parquet_parallel_serialization_task(
data,
serialize_tx,
output_schema.clone(),
arc_props.clone(),
parallel_options,
);
let file_metadata = concatenate_parallel_row_groups(
serialize_rx,
output_schema.clone(),
arc_props.clone(),
object_store_writer,
)
.await?;
launch_serialization_task.join_unwind().await?;
Ok(file_metadata)
}
#[cfg(test)]
pub(crate) mod test_util {
use super::*;
use crate::test::object_store::local_unpartitioned_file;
use parquet::arrow::ArrowWriter;
use tempfile::NamedTempFile;
const ROWS_PER_PAGE: usize = 2;
pub async fn store_parquet(
batches: Vec<RecordBatch>,
multi_page: bool,
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
let tmp_files = {
let mut tmp_files: Vec<_> = (0..batches.len())
.map(|_| NamedTempFile::new().expect("creating temp file"))
.collect();
tmp_files.sort_by(|a, b| a.path().cmp(b.path()));
tmp_files
};
let files: Vec<_> = batches
.into_iter()
.zip(tmp_files.into_iter())
.map(|(batch, mut output)| {
let builder = WriterProperties::builder();
let props = if multi_page {
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
} else {
builder
}
.build();
let mut writer =
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
.expect("creating writer");
if multi_page {
write_in_chunks(&mut writer, &batch, ROWS_PER_PAGE);
} else {
writer.write(&batch).expect("Writing batch");
};
writer.close().unwrap();
output
})
.collect();
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
}
fn write_in_chunks<W: std::io::Write + Send>(
writer: &mut ArrowWriter<W>,
batch: &RecordBatch,
chunk_size: usize,
) {
let mut i = 0;
while i < batch.num_rows() {
let num = chunk_size.min(batch.num_rows() - i);
writer.write(&batch.slice(i, num)).unwrap();
i += num;
}
}
}
#[cfg(test)]
mod tests {
use super::super::test_util::scan_format;
use crate::datasource::listing::{ListingTableUrl, PartitionedFile};
use crate::physical_plan::collect;
use std::fmt::{Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_schema::Field;
use async_trait::async_trait;
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
as_int32_array, as_timestamp_nanosecond_array,
};
use datafusion_common::config::ParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream::BoxStream;
use log::error;
use object_store::local::LocalFileSystem;
use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOpts, PutOptions,
PutPayload, PutResult,
};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;
#[tokio::test]
async fn read_merged_batches() -> Result<()> {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
let store = Arc::new(LocalFileSystem::new()) as _;
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
let session = SessionContext::new();
let ctx = session.state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
let stats =
fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?;
assert_eq!(stats.num_rows, Precision::Exact(3));
let c1_stats = &stats.column_statistics[0];
let c2_stats = &stats.column_statistics[1];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c2_stats.null_count, Precision::Exact(3));
let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?;
assert_eq!(stats.num_rows, Precision::Exact(3));
let c1_stats = &stats.column_statistics[0];
let c2_stats = &stats.column_statistics[1];
assert_eq!(c1_stats.null_count, Precision::Exact(3));
assert_eq!(c2_stats.null_count, Precision::Exact(1));
assert_eq!(
c2_stats.max_value,
Precision::Exact(ScalarValue::Int64(Some(2)))
);
assert_eq!(
c2_stats.min_value,
Precision::Exact(ScalarValue::Int64(Some(1)))
);
Ok(())
}
#[tokio::test]
async fn is_schema_stable() -> Result<()> {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
let batch1 =
RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())])
.unwrap();
let batch2 =
RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())])
.unwrap();
let store = Arc::new(LocalFileSystem::new()) as _;
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
let session = SessionContext::new();
let ctx = session.state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap();
let order: Vec<_> = ["a", "b", "c", "d"]
.into_iter()
.map(|i| i.to_string())
.collect();
let coll: Vec<_> = schema
.all_fields()
.into_iter()
.map(|i| i.name().to_string())
.collect();
assert_eq!(coll, order);
Ok(())
}
#[derive(Debug)]
struct RequestCountingObjectStore {
inner: Arc<dyn ObjectStore>,
request_count: AtomicUsize,
}
impl Display for RequestCountingObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "RequestCounting({})", self.inner)
}
}
impl RequestCountingObjectStore {
pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
Self {
inner,
request_count: Default::default(),
}
}
pub fn request_count(&self) -> usize {
self.request_count.load(Ordering::SeqCst)
}
pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
self.clone()
}
}
#[async_trait]
impl ObjectStore for RequestCountingObjectStore {
async fn put_opts(
&self,
_location: &Path,
_payload: PutPayload,
_opts: PutOptions,
) -> object_store::Result<PutResult> {
Err(object_store::Error::NotImplemented)
}
async fn put_multipart_opts(
&self,
_location: &Path,
_opts: PutMultipartOpts,
) -> object_store::Result<Box<dyn MultipartUpload>> {
Err(object_store::Error::NotImplemented)
}
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
self.request_count.fetch_add(1, Ordering::SeqCst);
self.inner.get_opts(location, options).await
}
async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
Err(object_store::Error::NotImplemented)
}
async fn delete(&self, _location: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}
fn list(
&self,
_prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
Box::pin(futures::stream::once(async {
Err(object_store::Error::NotImplemented)
}))
}
async fn list_with_delimiter(
&self,
_prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
Err(object_store::Error::NotImplemented)
}
async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}
async fn copy_if_not_exists(
&self,
_from: &Path,
_to: &Path,
) -> object_store::Result<()> {
Err(object_store::Error::NotImplemented)
}
}
#[tokio::test]
async fn fetch_metadata_with_size_hint() -> Result<()> {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], Some(9))
.await
.expect("error reading metadata with hint");
assert_eq!(store.request_count(), 2);
let session = SessionContext::new();
let ctx = session.state();
let format = ParquetFormat::default().with_metadata_size_hint(Some(9));
let schema = format
.infer_schema(&ctx, &store.upcast(), &meta)
.await
.unwrap();
let stats =
fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9))
.await?;
assert_eq!(stats.num_rows, Precision::Exact(3));
let c1_stats = &stats.column_statistics[0];
let c2_stats = &stats.column_statistics[1];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c2_stats.null_count, Precision::Exact(3));
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
let size_hint = meta[0].size;
fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
.expect("error reading metadata with hint");
assert_eq!(store.request_count(), 1);
let format = ParquetFormat::default().with_metadata_size_hint(Some(size_hint));
let schema = format
.infer_schema(&ctx, &store.upcast(), &meta)
.await
.unwrap();
let stats = fetch_statistics(
store.upcast().as_ref(),
schema.clone(),
&meta[0],
Some(size_hint),
)
.await?;
assert_eq!(stats.num_rows, Precision::Exact(3));
let c1_stats = &stats.column_statistics[0];
let c2_stats = &stats.column_statistics[1];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c2_stats.null_count, Precision::Exact(3));
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
let size_hint = meta[0].size + 100;
fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
.expect("error reading metadata with hint");
assert_eq!(store.request_count(), 1);
Ok(())
}
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::new_with_config(config);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = None;
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let stream = exec.execute(0, task_ctx)?;
let tt_batches = stream
.map(|batch| {
let batch = batch.unwrap();
assert_eq!(11, batch.num_columns());
assert_eq!(2, batch.num_rows());
})
.fold(0, |acc, _| async move { acc + 1i32 })
.await;
assert_eq!(tt_batches, 4 );
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
Ok(())
}
#[tokio::test]
async fn capture_bytes_scanned_metric() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let session = SessionContext::new_with_config(config);
let ctx = session.state();
let projection = None;
let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
let projection = Some(vec![0]);
let exec_projected =
get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
let task_ctx = ctx.task_ctx();
let _ = collect(exec.clone(), task_ctx.clone()).await?;
let _ = collect(exec_projected.clone(), task_ctx).await?;
assert_bytes_scanned(exec, 671);
assert_bytes_scanned(exec_projected, 73);
Ok(())
}
#[tokio::test]
async fn read_limit() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = None;
let exec =
get_exec(&state, "alltypes_plain.parquet", projection, Some(1)).await?;
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());
Ok(())
}
#[tokio::test]
async fn read_alltypes_plain_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = None;
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let x: Vec<String> = exec
.schema()
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
let y = x.join("\n");
assert_eq!(
"id: Int32\n\
bool_col: Boolean\n\
tinyint_col: Int32\n\
smallint_col: Int32\n\
int_col: Int32\n\
bigint_col: Int64\n\
float_col: Float32\n\
double_col: Float64\n\
date_string_col: Binary\n\
string_col: Binary\n\
timestamp_col: Timestamp(Nanosecond, None)",
y
);
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
Ok(())
}
#[tokio::test]
async fn read_bool_alltypes_plain_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![1]);
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
let array = as_boolean_array(batches[0].column(0))?;
let mut values: Vec<bool> = vec![];
for i in 0..batches[0].num_rows() {
values.push(array.value(i));
}
assert_eq!(
"[true, false, true, false, true, false, true, false]",
format!("{values:?}")
);
Ok(())
}
#[tokio::test]
async fn read_i32_alltypes_plain_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![0]);
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
let array = as_int32_array(batches[0].column(0))?;
let mut values: Vec<i32> = vec![];
for i in 0..batches[0].num_rows() {
values.push(array.value(i));
}
assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{values:?}"));
Ok(())
}
#[tokio::test]
async fn read_i96_alltypes_plain_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![10]);
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
let array = as_timestamp_nanosecond_array(batches[0].column(0))?;
let mut values: Vec<i64> = vec![];
for i in 0..batches[0].num_rows() {
values.push(array.value(i));
}
assert_eq!("[1235865600000000000, 1235865660000000000, 1238544000000000000, 1238544060000000000, 1233446400000000000, 1233446460000000000, 1230768000000000000, 1230768060000000000]", format!("{values:?}"));
Ok(())
}
#[tokio::test]
async fn read_f32_alltypes_plain_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![6]);
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
let array = as_float32_array(batches[0].column(0))?;
let mut values: Vec<f32> = vec![];
for i in 0..batches[0].num_rows() {
values.push(array.value(i));
}
assert_eq!(
"[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
format!("{values:?}")
);
Ok(())
}
#[tokio::test]
async fn read_f64_alltypes_plain_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![7]);
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
let array = as_float64_array(batches[0].column(0))?;
let mut values: Vec<f64> = vec![];
for i in 0..batches[0].num_rows() {
values.push(array.value(i));
}
assert_eq!(
"[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
format!("{values:?}")
);
Ok(())
}
#[tokio::test]
async fn read_binary_alltypes_plain_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![9]);
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
let array = as_binary_array(batches[0].column(0))?;
let mut values: Vec<&str> = vec![];
for i in 0..batches[0].num_rows() {
values.push(std::str::from_utf8(array.value(i)).unwrap());
}
assert_eq!(
"[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
format!("{values:?}")
);
Ok(())
}
#[tokio::test]
async fn read_decimal_parquet() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let exec = get_exec(&state, "int32_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
let exec = get_exec(&state, "int64_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal128(10, 2), column.data_type());
let exec = get_exec(&state, "fixed_length_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal128(25, 2), column.data_type());
let exec =
get_exec(&state, "fixed_length_decimal_legacy.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal128(13, 2), column.data_type());
let exec = get_exec(&state, "byte_array_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
Ok(())
}
#[tokio::test]
async fn test_read_parquet_page_index() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
let file = File::open(path).await.unwrap();
let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone())
.await
.unwrap()
.metadata()
.clone();
check_page_index_validation(builder.column_index(), builder.offset_index());
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
let file = File::open(path).await.unwrap();
let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
.await
.unwrap()
.metadata()
.clone();
check_page_index_validation(builder.column_index(), builder.offset_index());
Ok(())
}
fn check_page_index_validation(
page_index: Option<&ParquetColumnIndex>,
offset_index: Option<&ParquetOffsetIndex>,
) {
assert!(page_index.is_some());
assert!(offset_index.is_some());
let page_index = page_index.unwrap();
let offset_index = offset_index.unwrap();
assert_eq!(page_index.len(), 1);
assert_eq!(offset_index.len(), 1);
let page_index = page_index.first().unwrap();
let offset_index = offset_index.first().unwrap();
assert_eq!(page_index.len(), 13);
assert_eq!(offset_index.len(), 13);
let int_col_index = page_index.get(4).unwrap();
let int_col_offset = offset_index.get(4).unwrap();
assert_eq!(int_col_offset.len(), 325);
match int_col_index {
Index::INT32(index) => {
assert_eq!(index.indexes.len(), 325);
for min_max in index.clone().indexes {
assert!(min_max.min.is_some());
assert!(min_max.max.is_some());
assert!(min_max.null_count.is_some());
}
}
_ => {
error!("fail to read page index.")
}
}
}
fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
let actual = exec
.metrics()
.expect("Metrics not recorded")
.sum(|metric| matches!(metric.value(), MetricValue::Count { name, .. } if name == "bytes_scanned"))
.map(|t| t.as_usize())
.expect("bytes_scanned metric not recorded");
assert_eq!(actual, expected);
}
async fn get_exec(
state: &SessionState,
file_name: &str,
projection: Option<Vec<usize>>,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::parquet_test_data();
let format = ParquetFormat::default();
scan_format(state, &format, &testdata, file_name, projection, limit).await
}
fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let local = Arc::new(
LocalFileSystem::new_with_prefix(&tmp_dir)
.expect("should create object store"),
);
let mut session = SessionConfig::default();
let mut parquet_opts = ParquetOptions {
allow_single_file_parallelism: true,
..Default::default()
};
parquet_opts.allow_single_file_parallelism = true;
session.options_mut().execution.parquet = parquet_opts;
let runtime = RuntimeEnv::default();
runtime
.object_store_registry
.register_store(store_url, local);
Arc::new(
TaskContext::default()
.with_session_config(session)
.with_runtime(Arc::new(runtime)),
)
}
#[tokio::test]
async fn parquet_sink_write() -> Result<()> {
let field_a = Field::new("a", DataType::Utf8, false);
let field_b = Field::new("b", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let object_store_url = ObjectStoreUrl::local_filesystem();
let file_sink_config = FileSinkConfig {
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions {
key_value_metadata: std::collections::HashMap::from([
("my-data".to_string(), Some("stuff".to_string())),
("my-data-bool-key".to_string(), None),
]),
..Default::default()
},
));
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();
parquet_sink
.write_all(
Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(vec![Ok(batch)]),
)),
&build_ctx(object_store_url.as_ref()),
)
.await
.unwrap();
let mut written = parquet_sink.written();
let written = written.drain();
assert_eq!(
written.len(),
1,
"expected a single parquet files to be written, instead found {}",
written.len()
);
let (
path,
FileMetaData {
num_rows,
schema,
key_value_metadata,
..
},
) = written.take(1).next().unwrap();
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 1, "should not have path prefix");
assert_eq!(num_rows, 2, "file metdata to have 2 rows");
assert!(
schema.iter().any(|col_schema| col_schema.name == "a"),
"output file metadata should contain col a"
);
assert!(
schema.iter().any(|col_schema| col_schema.name == "b"),
"output file metadata should contain col b"
);
let mut key_value_metadata = key_value_metadata.unwrap();
key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
let expected_metadata = vec![
KeyValue {
key: "my-data".to_string(),
value: Some("stuff".to_string()),
},
KeyValue {
key: "my-data-bool-key".to_string(),
value: None,
},
];
assert_eq!(key_value_metadata, expected_metadata);
Ok(())
}
#[tokio::test]
async fn parquet_sink_write_partitions() -> Result<()> {
let field_a = Field::new("a", DataType::Utf8, false);
let field_b = Field::new("b", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let object_store_url = ObjectStoreUrl::local_filesystem();
let file_sink_config = FileSinkConfig {
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], overwrite: true,
};
let parquet_sink = Arc::new(ParquetSink::new(
file_sink_config,
TableParquetOptions::default(),
));
let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)]).unwrap();
parquet_sink
.write_all(
Box::pin(RecordBatchStreamAdapter::new(
schema,
futures::stream::iter(vec![Ok(batch)]),
)),
&build_ctx(object_store_url.as_ref()),
)
.await
.unwrap();
let mut written = parquet_sink.written();
let written = written.drain();
assert_eq!(
written.len(),
2,
"expected two parquet files to be written, instead found {}",
written.len()
);
let mut expected_partitions = std::collections::HashSet::from(["a=foo", "a=bar"]);
for (
path,
FileMetaData {
num_rows, schema, ..
},
) in written.take(2)
{
let path_parts = path.parts().collect::<Vec<_>>();
assert_eq!(path_parts.len(), 2, "should have path prefix");
let prefix = path_parts[0].as_ref();
assert!(
expected_partitions.contains(prefix),
"expected path prefix to match partition, instead found {:?}",
prefix
);
expected_partitions.remove(prefix);
assert_eq!(num_rows, 1, "file metdata to have 1 row");
assert!(
!schema.iter().any(|col_schema| col_schema.name == "a"),
"output file metadata will not contain partitioned col a"
);
assert!(
schema.iter().any(|col_schema| col_schema.name == "b"),
"output file metadata should contain col b"
);
}
Ok(())
}
}