use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
use futures::stream::{self, BoxStream};
use futures::{StreamExt, TryStreamExt};
use uuid::Uuid;
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use super::stats::collect_stats;
use super::UrlExt;
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
use crate::arrow::array::{Array, Int64Array, RecordBatch, StringArray, StructArray};
use crate::arrow::datatypes::{DataType, Field, Schema};
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{
fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes,
RowIndexBuilder,
};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::engine::{reader_options, writer_options};
use crate::expressions::ColumnName;
use crate::metrics::{MetricEvent, MetricsReporter};
use crate::object_store::path::Path;
use crate::object_store::{DynObjectStore, ObjectStoreExt as _};
use crate::parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder};
use crate::parquet::arrow::arrow_writer::ArrowWriter;
use crate::parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use crate::parquet::arrow::async_writer::{AsyncArrowWriter, ParquetObjectWriter};
use crate::schema::{SchemaRef, StructType};
use crate::transaction::WriteContext;
use crate::{
DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetFooter,
ParquetHandler, PredicateRef,
};
#[derive(Debug)]
pub struct DefaultParquetHandler<E: TaskExecutor> {
store: Arc<DynObjectStore>,
task_executor: Arc<E>,
readahead: usize,
reporter: Option<Arc<dyn MetricsReporter>>,
}
#[derive(Debug)]
pub struct DataFileMetadata {
file_meta: FileMeta,
stats: StructArray,
}
impl DataFileMetadata {
pub fn new(file_meta: FileMeta, stats: StructArray) -> Self {
Self { file_meta, stats }
}
pub fn location(&self) -> &url::Url {
&self.file_meta.location
}
pub(crate) fn as_record_batch(
&self,
partition_values: &HashMap<String, Option<String>>,
log_path: &str,
) -> DeltaResult<Box<dyn EngineData>> {
let path = Arc::new(StringArray::from(vec![log_path]));
let key_builder = StringBuilder::new();
let val_builder = StringBuilder::new();
let names = MapFieldNames {
entry: "key_value".to_string(),
key: "key".to_string(),
value: "value".to_string(),
};
let mut builder = MapBuilder::new(Some(names), key_builder, val_builder);
for (k, v) in partition_values {
builder.keys().append_value(k);
match v.as_deref() {
Some(val) if !val.is_empty() => builder.values().append_value(val),
_ => builder.values().append_null(),
}
}
builder.append(true)?;
let partitions = Arc::new(builder.finish());
let size: i64 = self
.file_meta
.size
.try_into()
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
let size = Arc::new(Int64Array::from(vec![size]));
let modification_time = Arc::new(Int64Array::from(vec![self.file_meta.last_modified]));
let stats_array = Arc::new(self.stats.clone());
let key_value_struct = DataType::Struct(
vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, true),
]
.into(),
);
let schema = Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new(
"partitionValues",
DataType::Map(
Arc::new(Field::new("key_value", key_value_struct, false)),
false,
),
false,
),
Field::new("size", DataType::Int64, false),
Field::new("modificationTime", DataType::Int64, false),
Field::new("stats", stats_array.data_type().clone(), true),
]);
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema),
vec![path, partitions, size, modification_time, stats_array],
)?)))
}
}
impl<E: TaskExecutor> DefaultParquetHandler<E> {
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
Self {
store,
task_executor,
readahead: 10,
reporter: None,
}
}
pub fn with_readahead(mut self, readahead: usize) -> Self {
self.readahead = readahead;
self
}
pub fn with_reporter(mut self, reporter: Option<Arc<dyn MetricsReporter>>) -> Self {
self.reporter = reporter;
self
}
async fn write_parquet(
&self,
path: &url::Url,
data: Box<dyn EngineData>,
stats_columns: &[ColumnName],
) -> DeltaResult<DataFileMetadata> {
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
let record_batch = batch.record_batch();
let stats = collect_stats(record_batch, stats_columns)?;
let mut buffer = vec![];
let mut writer = ArrowWriter::try_new_with_options(
&mut buffer,
record_batch.schema(),
writer_options(),
)?;
writer.write(record_batch)?;
writer.close()?;
let size: u64 = buffer
.len()
.try_into()
.map_err(|_| Error::generic("unable to convert usize to u64"))?;
let name: String = format!("{}.parquet", Uuid::new_v4());
if !path.path().ends_with('/') {
return Err(Error::generic(format!(
"Path must end with a trailing slash: {path}"
)));
}
let path = path.join(&name)?;
self.store
.put(&Path::from_url_path(path.path())?, buffer.into())
.await?;
let metadata = self.store.head(&Path::from_url_path(path.path())?).await?;
let modification_time = metadata.last_modified.timestamp_millis();
if size != metadata.size {
return Err(Error::generic(format!(
"Size mismatch after writing parquet file: expected {}, got {}",
size, metadata.size
)));
}
let file_meta = FileMeta::new(path, modification_time, size);
Ok(DataFileMetadata::new(file_meta, stats))
}
pub async fn write_parquet_file(
&self,
data: Box<dyn EngineData>,
write_context: &WriteContext,
) -> DeltaResult<Box<dyn EngineData>> {
let file_metadata = self
.write_parquet(
&write_context.write_dir(),
data,
write_context.stats_columns(),
)
.await?;
super::build_add_file_metadata(file_metadata, write_context)
}
}
async fn read_parquet_files_impl(
store: Arc<DynObjectStore>,
files: Vec<FileMeta>,
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> DeltaResult<BoxStream<'static, DeltaResult<Box<dyn EngineData>>>> {
if files.is_empty() {
return Ok(Box::pin(stream::empty()));
}
let arrow_schema = Arc::new(physical_schema.as_ref().try_into_arrow()?);
if files[0].location.is_presigned() {
let file_opener = Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
));
let stream = FileStream::new(files, arrow_schema, file_opener)?.map_ok(
|record_batch| -> Box<dyn EngineData> { Box::new(ArrowEngineData::new(record_batch)) },
);
return Ok(Box::pin(stream));
}
let file_futures = files.into_iter().map(move |file| {
let store = store.clone();
let schema = physical_schema.clone();
let predicate = predicate.clone();
async move {
open_parquet_file(
store,
schema,
predicate,
None,
super::DEFAULT_BATCH_SIZE,
file,
)
.await
}
});
let result_stream = stream::iter(file_futures)
.buffered(super::DEFAULT_BUFFER_SIZE)
.try_flatten()
.map_ok(|record_batch| -> Box<dyn EngineData> {
Box::new(ArrowEngineData::new(record_batch))
});
Ok(Box::pin(result_stream))
}
impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
fn read_parquet_files(
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> DeltaResult<FileDataReadResultIterator> {
let future = read_parquet_files_impl(
self.store.clone(),
files.to_vec(),
physical_schema,
predicate,
);
let inner = super::stream_future_to_iter(self.task_executor.clone(), future)?;
if let Some(reporter) = &self.reporter {
let num_files = files.len() as u64;
let bytes_read = files.iter().map(|f| f.size).sum();
Ok(Box::new(super::ReadMetricsIterator::new(
inner,
reporter.clone(),
num_files,
bytes_read,
|num_files, bytes_read| MetricEvent::ParquetReadCompleted {
num_files,
bytes_read,
},
)))
} else {
Ok(inner)
}
}
fn write_parquet_file(
&self,
location: url::Url,
mut data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
) -> DeltaResult<()> {
use tokio::sync::Mutex;
let store = self.store.clone();
let path = Path::from_url_path(location.path())?;
let first_batch = data.next().ok_or_else(|| {
Error::generic("Cannot write parquet file with empty data iterator")
})??;
let first_arrow = ArrowEngineData::try_from_engine_data(first_batch)?;
let first_record_batch: RecordBatch = (*first_arrow).into();
let object_writer = ParquetObjectWriter::new(store, path);
let schema = first_record_batch.schema();
let writer = Arc::new(Mutex::new(AsyncArrowWriter::try_new_with_options(
object_writer,
schema,
writer_options(),
)?));
let w = writer.clone();
self.task_executor.block_on(async move {
let mut writer = w.lock().await;
writer.write(&first_record_batch).await
})?;
for result in data {
let engine_data = result?;
let arrow_data = ArrowEngineData::try_from_engine_data(engine_data)?;
let batch: RecordBatch = (*arrow_data).into();
let w = writer.clone();
self.task_executor.block_on(async move {
let mut writer = w.lock().await;
writer.write(&batch).await
})?;
}
let w = writer.clone();
self.task_executor.block_on(async move {
let mut writer = w.lock().await;
writer.finish().await
})?;
Ok(())
}
fn read_parquet_footer(&self, file: &FileMeta) -> DeltaResult<ParquetFooter> {
let store = self.store.clone();
let location = file.location.clone();
let file_size = file.size;
self.task_executor.block_on(async move {
let metadata = if location.is_presigned() {
let client = reqwest::Client::new();
let response =
client.get(location.as_str()).send().await.map_err(|e| {
Error::generic(format!("Failed to fetch presigned URL: {e}"))
})?;
let bytes = response
.bytes()
.await
.map_err(|e| Error::generic(format!("Failed to read response bytes: {e}")))?;
ArrowReaderMetadata::load(&bytes, reader_options())?
} else {
let path = Path::from_url_path(location.path())?;
let mut reader = ParquetObjectReader::new(store, path).with_file_size(file_size);
ArrowReaderMetadata::load_async(&mut reader, reader_options()).await?
};
let schema = StructType::try_from_arrow(metadata.schema().as_ref())
.map(Arc::new)
.map_err(Error::Arrow)?;
Ok(ParquetFooter { schema })
})
}
}
async fn open_parquet_file(
store: Arc<DynObjectStore>,
table_schema: SchemaRef,
predicate: Option<PredicateRef>,
limit: Option<usize>,
batch_size: usize,
file_meta: FileMeta,
) -> DeltaResult<BoxStream<'static, DeltaResult<RecordBatch>>> {
let file_location = file_meta.location.to_string();
let path = Path::from_url_path(file_meta.location.path())?;
let mut reader = {
use crate::object_store::ObjectStoreScheme;
if file_meta.size != 0 {
ParquetObjectReader::new(store, path).with_file_size(file_meta.size)
} else if let Ok((ObjectStoreScheme::MicrosoftAzure, _)) =
ObjectStoreScheme::parse(&file_meta.location)
{
let meta = store.head(&path).await?;
ParquetObjectReader::new(store, path).with_file_size(meta.size)
} else {
ParquetObjectReader::new(store, path)
}
};
let reader_options = reader_options();
let metadata = ArrowReaderMetadata::load_async(&mut reader, reader_options.clone()).await?;
let parquet_schema = metadata.schema();
let (indices, requested_ordering) = get_requested_indices(&table_schema, parquet_schema)?;
let mut builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, reader_options).await?;
if let Some(mask) = generate_mask(
&table_schema,
parquet_schema,
builder.parquet_schema(),
&indices,
) {
builder = builder.with_projection(mask)
}
let mut row_indexes = ordering_needs_row_indexes(&requested_ordering)
.then(|| RowIndexBuilder::new(builder.metadata().row_groups()));
if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate, row_indexes.as_mut());
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?;
let stream = builder.with_batch_size(batch_size).build()?;
let arrow_schema: Arc<Schema> = Arc::new(table_schema.as_ref().try_into_arrow()?);
let stream = stream.map(move |rbr| {
fixup_parquet_read(
rbr?,
&requested_ordering,
row_indexes.as_mut(),
Some(&file_location),
Some(&arrow_schema),
)
.map(Into::into)
});
Ok(stream.boxed())
}
struct PresignedUrlOpener {
batch_size: usize,
predicate: Option<PredicateRef>,
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
}
impl PresignedUrlOpener {
pub(crate) fn new(
batch_size: usize,
schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> Self {
Self {
batch_size,
table_schema: schema,
predicate,
limit: None,
client: reqwest::Client::new(),
}
}
}
impl FileOpener for PresignedUrlOpener {
fn open(&self, file_meta: FileMeta, _range: Option<Range<i64>>) -> DeltaResult<FileOpenFuture> {
let batch_size = self.batch_size;
let table_schema = self.table_schema.clone();
let predicate = self.predicate.clone();
let limit = self.limit;
let client = self.client.clone(); let file_location = file_meta.location.to_string();
Ok(Box::pin(async move {
let reader = client.get(&file_location).send().await?.bytes().await?;
let reader_options = reader_options();
let metadata = ArrowReaderMetadata::load(&reader, reader_options.clone())?;
let parquet_schema = metadata.schema();
let (indices, requested_ordering) =
get_requested_indices(&table_schema, parquet_schema)?;
let mut builder =
ParquetRecordBatchReaderBuilder::try_new_with_options(reader, reader_options)?;
if let Some(mask) = generate_mask(
&table_schema,
parquet_schema,
builder.parquet_schema(),
&indices,
) {
builder = builder.with_projection(mask)
}
let mut row_indexes = ordering_needs_row_indexes(&requested_ordering)
.then(|| RowIndexBuilder::new(builder.metadata().row_groups()));
if let Some(ref predicate) = predicate {
builder = builder.with_row_group_filter(predicate, row_indexes.as_mut());
}
if let Some(limit) = limit {
builder = builder.with_limit(limit)
}
let reader = builder.with_batch_size(batch_size).build()?;
let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?;
let arrow_schema: Arc<Schema> = Arc::new(table_schema.as_ref().try_into_arrow()?);
let stream = futures::stream::iter(reader);
let stream = stream.map(move |rbr| {
fixup_parquet_read(
rbr?,
&requested_ordering,
row_indexes.as_mut(),
Some(&file_location),
Some(&arrow_schema),
)
.map(Into::into)
});
Ok(stream.boxed())
}))
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::path::PathBuf;
use std::slice;
use itertools::Itertools;
use url::Url;
use super::*;
use crate::arrow::array::{
Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray,
TimestampMicrosecondArray,
};
use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
use crate::engine::arrow_conversion::TryIntoKernel as _;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::default::executor::tokio::TokioBackgroundExecutor;
use crate::engine::default::DEFAULT_BATCH_SIZE;
use crate::object_store::local::LocalFileSystem;
use crate::object_store::memory::InMemory;
use crate::parquet::arrow::{ARROW_SCHEMA_META_KEY, PARQUET_FIELD_ID_META_KEY};
use crate::schema::ColumnMetadataKey;
use crate::utils::current_time_ms;
use crate::utils::test_utils::assert_result_error_with_message;
use crate::EngineData;
fn into_record_batch(
engine_data: DeltaResult<Box<dyn EngineData>>,
) -> DeltaResult<RecordBatch> {
engine_data
.and_then(ArrowEngineData::try_from_engine_data)
.map(Into::into)
}
async fn read_all_rows_helper(file_meta: FileMeta) -> DeltaResult<Vec<RecordBatch>> {
let store = Arc::new(LocalFileSystem::new());
let path = Path::from_url_path(file_meta.location.path()).unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();
let stream = open_parquet_file(
store,
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
None,
DEFAULT_BATCH_SIZE,
file_meta,
)
.await
.unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
Ok(batches)
}
#[tokio::test]
async fn test_open_parquet_file_with_size() {
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
)).unwrap();
let file_size = std::fs::metadata(&path).unwrap().len();
let url = Url::from_file_path(path).unwrap();
let file_meta = FileMeta {
location: url,
last_modified: 0,
size: file_size,
};
let data = read_all_rows_helper(file_meta).await.unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 10);
}
#[tokio::test]
async fn test_open_parquet_file_without_size() {
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
)).unwrap();
let url = Url::from_file_path(path).unwrap();
let file_meta = FileMeta {
location: url,
last_modified: 0,
size: 0,
};
let data = read_all_rows_helper(file_meta).await.unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 10);
}
#[tokio::test]
async fn test_read_parquet_files() {
let store = Arc::new(LocalFileSystem::new());
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
)).unwrap();
let url = url::Url::from_file_path(path).unwrap();
let location = Path::from_url_path(url.path()).unwrap();
let meta = store.head(&location).await.unwrap();
let reader = ParquetObjectReader::new(store.clone(), location);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();
let files = &[FileMeta {
location: url.clone(),
last_modified: meta.last_modified.timestamp(),
size: meta.size,
}];
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let data: Vec<RecordBatch> = handler
.read_parquet_files(
files,
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 10);
}
#[rstest::rstest]
fn test_as_record_batch(
#[values(None, Some("a".to_string()))] partition_value: Option<String>,
) {
let location = Url::parse("file:///test_url").unwrap();
let size = 1_000_000;
let last_modified = 10000000000;
let num_records = 10;
let file_metadata = FileMeta::new(location.clone(), last_modified, size);
let stats = StructArray::try_new(
vec![
Field::new("numRecords", ArrowDataType::Int64, true),
Field::new("tightBounds", ArrowDataType::Boolean, true),
]
.into(),
vec![
Arc::new(Int64Array::from(vec![num_records as i64])),
Arc::new(BooleanArray::from(vec![true])),
],
None,
)
.unwrap();
let data_file_metadata = DataFileMetadata::new(file_metadata, stats.clone());
let partition_values = HashMap::from([("partition1".to_string(), partition_value.clone())]);
let actual = data_file_metadata
.as_record_batch(&partition_values, "test_url")
.unwrap();
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap();
let mut partition_values_builder = MapBuilder::new(
Some(MapFieldNames {
entry: "key_value".to_string(),
key: "key".to_string(),
value: "value".to_string(),
}),
StringBuilder::new(),
StringBuilder::new(),
);
partition_values_builder.keys().append_value("partition1");
match &partition_value {
None => partition_values_builder.values().append_null(),
Some(v) => partition_values_builder.values().append_value(v),
}
partition_values_builder.append(true).unwrap();
let partition_values = partition_values_builder.finish();
let stats_field = Field::new("stats", stats.data_type().clone(), true);
let schema = Arc::new(crate::arrow::datatypes::Schema::new(vec![
Field::new("path", ArrowDataType::Utf8, false),
Field::new(
"partitionValues",
ArrowDataType::Map(
Arc::new(Field::new(
"key_value",
ArrowDataType::Struct(
vec![
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Utf8, true),
]
.into(),
),
false,
)),
false,
),
false,
),
Field::new("size", ArrowDataType::Int64, false),
Field::new("modificationTime", ArrowDataType::Int64, false),
stats_field,
]));
let expected = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["test_url"])),
Arc::new(partition_values),
Arc::new(Int64Array::from(vec![size as i64])),
Arc::new(Int64Array::from(vec![last_modified])),
Arc::new(stats),
],
)
.unwrap();
assert_eq!(actual.record_batch(), &expected);
}
#[tokio::test]
async fn test_write_parquet() {
let store = Arc::new(InMemory::new());
let parquet_handler =
DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
let data = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![(
"a",
Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc<dyn Array>,
)])
.unwrap(),
));
let write_metadata = parquet_handler
.write_parquet(&Url::parse("memory:///data/").unwrap(), data, &[])
.await
.unwrap();
let DataFileMetadata {
file_meta:
ref parquet_file @ FileMeta {
ref location,
last_modified,
size,
},
ref stats,
} = write_metadata;
let expected_location = Url::parse("memory:///data/").unwrap();
let meta = store
.head(&Path::from_url_path(location.path()).unwrap())
.await
.unwrap();
let expected_size = meta.size;
let now: i64 = current_time_ms().unwrap();
let filename = location.path().split('/').next_back().unwrap();
assert_eq!(&expected_location.join(filename).unwrap(), location);
assert_eq!(expected_size, size);
assert!(now - last_modified < 10_000);
let num_records = stats
.column_by_name("numRecords")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
assert_eq!(num_records, 3);
let path = Path::from_url_path(location.path()).unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();
let data: Vec<RecordBatch> = parquet_handler
.read_parquet_files(
slice::from_ref(parquet_file),
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 3);
}
#[tokio::test]
async fn test_disallow_non_trailing_slash() {
let store = Arc::new(InMemory::new());
let parquet_handler =
DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
let data = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![(
"a",
Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc<dyn Array>,
)])
.unwrap(),
));
assert_result_error_with_message(
parquet_handler
.write_parquet(&Url::parse("memory:///data").unwrap(), data, &[])
.await,
"Generic delta kernel error: Path must end with a trailing slash: memory:///data",
);
}
#[tokio::test]
async fn test_parquet_handler_trait_write() {
let store = Arc::new(InMemory::new());
let parquet_handler: Arc<dyn ParquetHandler> = Arc::new(DefaultParquetHandler::new(
store.clone(),
Arc::new(TokioBackgroundExecutor::new()),
));
let engine_data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![
(
"x",
Arc::new(Int64Array::from(vec![10, 20, 30])) as Arc<dyn Array>,
),
(
"y",
Arc::new(Int64Array::from(vec![100, 200, 300])) as Arc<dyn Array>,
),
])
.unwrap(),
));
let data_iter: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send> =
Box::new(std::iter::once(Ok(engine_data)));
let file_url = Url::parse("memory:///test/data.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data_iter)
.unwrap();
let path = Path::from_url_path(file_url.path()).unwrap();
let metadata = store.head(&path).await.unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();
let file_meta = FileMeta {
location: file_url,
last_modified: 0,
size: metadata.size,
};
let data: Vec<RecordBatch> = parquet_handler
.read_parquet_files(
slice::from_ref(&file_meta),
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 3);
assert_eq!(data[0].num_columns(), 2);
}
#[tokio::test]
async fn test_parquet_handler_trait_write_and_read_roundtrip() {
let store = Arc::new(InMemory::new());
let parquet_handler: Arc<dyn ParquetHandler> = Arc::new(DefaultParquetHandler::new(
store.clone(),
Arc::new(TokioBackgroundExecutor::new()),
));
let engine_data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![
(
"byte_col",
Arc::new(Int8Array::from(vec![1i8, 2, 3, 4, 5])) as Arc<dyn Array>,
),
(
"short_col",
Arc::new(Int16Array::from(vec![100i16, 200, 300, 400, 500])) as Arc<dyn Array>,
),
(
"int_col",
Arc::new(Int32Array::from(vec![1000i32, 2000, 3000, 4000, 5000]))
as Arc<dyn Array>,
),
(
"long_col",
Arc::new(Int64Array::from(vec![10000i64, 20000, 30000, 40000, 50000]))
as Arc<dyn Array>,
),
(
"float_col",
Arc::new(Float32Array::from(vec![1.1f32, 2.2, 3.3, 4.4, 5.5]))
as Arc<dyn Array>,
),
(
"double_col",
Arc::new(Float64Array::from(vec![1.11f64, 2.22, 3.33, 4.44, 5.55]))
as Arc<dyn Array>,
),
(
"bool_col",
Arc::new(BooleanArray::from(vec![true, false, true, false, true]))
as Arc<dyn Array>,
),
(
"string_col",
Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])) as Arc<dyn Array>,
),
(
"binary_col",
Arc::new(BinaryArray::from_vec(vec![
b"bin1", b"bin2", b"bin3", b"bin4", b"bin5",
])) as Arc<dyn Array>,
),
(
"date_col",
Arc::new(Date32Array::from(vec![18262, 18263, 18264, 18265, 18266]))
as Arc<dyn Array>, ),
(
"timestamp_col",
Arc::new(
TimestampMicrosecondArray::from(vec![
1609459200000000i64, 1609545600000000i64,
1609632000000000i64,
1609718400000000i64,
1609804800000000i64,
])
.with_timezone("UTC"),
) as Arc<dyn Array>,
),
(
"timestamp_ntz_col",
Arc::new(TimestampMicrosecondArray::from(vec![
1609459200000000i64, 1609545600000000i64,
1609632000000000i64,
1609718400000000i64,
1609804800000000i64,
])) as Arc<dyn Array>,
),
(
"decimal_col",
Arc::new(
Decimal128Array::from(vec![12345i128, 23456, 34567, 45678, 56789])
.with_precision_and_scale(10, 2)
.unwrap(),
) as Arc<dyn Array>,
),
])
.unwrap(),
));
let data_iter: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send> =
Box::new(std::iter::once(Ok(engine_data)));
let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data_iter)
.unwrap();
let path = Path::from_url_path(file_url.path()).unwrap();
let metadata = store.head(&path).await.unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.schema()
.clone();
let file_meta = FileMeta {
location: file_url.clone(),
last_modified: 0,
size: metadata.size,
};
let data: Vec<RecordBatch> = parquet_handler
.read_parquet_files(
slice::from_ref(&file_meta),
Arc::new(physical_schema.try_into_kernel().unwrap()),
None,
)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 5);
assert_eq!(data[0].num_columns(), 13);
let mut col_idx = 0;
let byte_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<Int8Array>()
.unwrap();
assert_eq!(byte_col.values(), &[1i8, 2, 3, 4, 5]);
col_idx += 1;
let short_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<Int16Array>()
.unwrap();
assert_eq!(short_col.values(), &[100i16, 200, 300, 400, 500]);
col_idx += 1;
let int_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(int_col.values(), &[1000i32, 2000, 3000, 4000, 5000]);
col_idx += 1;
let long_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(long_col.values(), &[10000i64, 20000, 30000, 40000, 50000]);
col_idx += 1;
let float_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
assert_eq!(float_col.values(), &[1.1f32, 2.2, 3.3, 4.4, 5.5]);
col_idx += 1;
let double_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(double_col.values(), &[1.11f64, 2.22, 3.33, 4.44, 5.55]);
col_idx += 1;
let bool_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
assert!(bool_col.value(0));
assert!(!bool_col.value(1));
col_idx += 1;
let string_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(string_col.value(0), "a");
assert_eq!(string_col.value(4), "e");
col_idx += 1;
let binary_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
assert_eq!(binary_col.value(0), b"bin1");
assert_eq!(binary_col.value(4), b"bin5");
col_idx += 1;
let date_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<Date32Array>()
.unwrap();
assert_eq!(date_col.values(), &[18262, 18263, 18264, 18265, 18266]);
col_idx += 1;
let timestamp_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
assert_eq!(timestamp_col.value(0), 1609459200000000i64);
assert_eq!(timestamp_col.value(4), 1609804800000000i64);
assert!(timestamp_col
.timezone()
.is_some_and(|tz| tz.eq_ignore_ascii_case("utc")));
col_idx += 1;
let timestamp_ntz_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
assert_eq!(timestamp_ntz_col.value(0), 1609459200000000i64);
assert_eq!(timestamp_ntz_col.value(4), 1609804800000000i64);
assert!(timestamp_ntz_col.timezone().is_none());
col_idx += 1;
let decimal_col = data[0]
.column(col_idx)
.as_any()
.downcast_ref::<Decimal128Array>()
.unwrap();
assert_eq!(decimal_col.value(0), 12345i128);
assert_eq!(decimal_col.value(4), 56789i128);
assert_eq!(decimal_col.precision(), 10);
assert_eq!(decimal_col.scale(), 2);
}
#[test]
fn test_parquet_footer_read_with_field_id() {
let field = Field::new("value", ArrowDataType::Int64, false).with_metadata(HashMap::from(
[(PARQUET_FIELD_ID_META_KEY.to_string(), "42".to_string())],
));
let arrow_schema = Arc::new(ArrowSchema::new(vec![field]));
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("field_id_test.parquet");
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
)
.unwrap();
let file = std::fs::File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema, None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let file_size = std::fs::metadata(&file_path).unwrap().len();
let file_meta = FileMeta {
location: Url::from_file_path(&file_path).unwrap(),
last_modified: 0,
size: file_size,
};
let footer = handler.read_parquet_footer(&file_meta).unwrap();
let field = footer
.schema
.fields()
.find(|f| f.name() == "value")
.unwrap();
assert_eq!(
field
.metadata()
.get(ColumnMetadataKey::ParquetFieldId.as_ref()),
Some(&"42".into())
);
let field_id = field.get_config_value(&ColumnMetadataKey::ParquetFieldId)
.expect("Field ID should be accessible via ColumnMetadataKey::ParquetFieldId per lib.rs:836-837");
match field_id {
crate::schema::MetadataValue::String(id) => assert_eq!(id, "42"),
crate::schema::MetadataValue::Number(id) => assert_eq!(*id, 42),
other => panic!("Expected String or Number, got {other:?}"),
}
}
#[test]
fn test_read_parquet_with_field_id_matching() {
use crate::schema::{ColumnMetadataKey, MetadataValue, StructField, StructType};
let fields = vec![
Field::new("id", ArrowDataType::Int64, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("name", ArrowDataType::Utf8, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
];
let arrow_schema = Arc::new(ArrowSchema::new(fields));
let temp_dir = tempfile::tempdir().unwrap();
let file_path = temp_dir.path().join("field_id_matching.parquet");
let batch = RecordBatch::try_new(
arrow_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["alice", "bob", "charlie"])),
],
)
.unwrap();
let file = std::fs::File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema, None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let kernel_schema = Arc::new(
StructType::try_new(vec![
StructField::new("user_id", crate::schema::DataType::LONG, false).with_metadata([
(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(1),
),
]),
StructField::new("user_name", crate::schema::DataType::STRING, false)
.with_metadata([(
ColumnMetadataKey::ParquetFieldId.as_ref(),
MetadataValue::Number(2),
)]),
])
.unwrap(),
);
let store = Arc::new(LocalFileSystem::new());
let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let file_meta = FileMeta {
location: Url::from_file_path(&file_path).unwrap(),
last_modified: 0,
size: std::fs::metadata(&file_path).unwrap().len(),
};
let data: Vec<RecordBatch> = handler
.read_parquet_files(slice::from_ref(&file_meta), kernel_schema, None)
.unwrap()
.map(into_record_batch)
.try_collect()
.unwrap();
assert_eq!(data.len(), 1);
let batch = &data[0];
let id_col = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(id_col.values(), &[1, 2, 3], "Should match by field ID 1");
let name_col = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(name_col.value(0), "alice", "Should match by field ID 2");
assert_eq!(name_col.value(1), "bob");
assert_eq!(name_col.value(2), "charlie");
}
#[tokio::test]
async fn write_parquet_omits_arrow_schema_metadata() {
let store = Arc::new(InMemory::new());
let parquet_handler =
DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new()));
let data = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![(
"a",
Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc<dyn Array>,
)])
.unwrap(),
));
let metadata = parquet_handler
.write_parquet(&Url::parse("memory:///data/").unwrap(), data, &[])
.await
.unwrap();
let path = Path::from_url_path(metadata.file_meta.location.path()).unwrap();
let reader = ParquetObjectReader::new(store, path);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let kv = builder.metadata().file_metadata().key_value_metadata();
let has = kv
.map(|kv| kv.iter().any(|e| e.key == ARROW_SCHEMA_META_KEY))
.unwrap_or(false);
assert!(
!has,
"Parquet file should not contain embedded Arrow schema metadata"
);
}
#[tokio::test]
async fn write_parquet_file_creates_parent_directories() {
let temp_dir = tempfile::tempdir().unwrap();
let nested_path = temp_dir.path().join("a/b/c/output.parquet");
assert!(!nested_path.parent().unwrap().exists());
let store = Arc::new(LocalFileSystem::new());
let parquet_handler: Arc<dyn ParquetHandler> = Arc::new(DefaultParquetHandler::new(
store.clone(),
Arc::new(TokioBackgroundExecutor::new()),
));
let engine_data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(
RecordBatch::try_from_iter(vec![(
"x",
Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc<dyn Array>,
)])
.unwrap(),
));
let data_iter: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send> =
Box::new(std::iter::once(Ok(engine_data)));
let file_url = Url::from_file_path(&nested_path).unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data_iter)
.unwrap();
assert!(nested_path.exists());
let path = Path::from_url_path(file_url.path()).unwrap();
let reader = ParquetObjectReader::new(store.clone(), path);
let batches: Vec<RecordBatch> = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.build()
.unwrap()
.try_collect()
.await
.unwrap();
assert_eq!(batches.len(), 1);
let col = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(col.values(), &[1, 2, 3]);
}
}