use std::fs::File;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder};
use super::read_files;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler};
pub(crate) struct SyncParquetHandler;
fn try_create_from_parquet(
file: File,
schema: SchemaRef,
_arrow_schema: ArrowSchemaRef,
predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ArrowEngineData>>> {
let metadata = ArrowReaderMetadata::load(&file, Default::default())?;
let parquet_schema = metadata.schema();
let mut builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let (indicies, requested_ordering) = get_requested_indices(&schema, parquet_schema)?;
if let Some(mask) = generate_mask(&schema, parquet_schema, builder.parquet_schema(), &indicies)
{
builder = builder.with_projection(mask);
}
if let Some(predicate) = predicate {
builder = builder.with_row_group_filter(predicate.as_ref());
}
Ok(builder.build()?.map(move |data| {
let reordered = reorder_struct_array(data?.into(), &requested_ordering)?;
Ok(ArrowEngineData::new(reordered.into()))
}))
}
impl ParquetHandler for SyncParquetHandler {
fn read_parquet_files(
&self,
files: &[FileMeta],
schema: SchemaRef,
predicate: Option<ExpressionRef>,
) -> DeltaResult<FileDataReadResultIterator> {
read_files(files, schema, predicate, try_create_from_parquet)
}
}