use crate::datasource::physical_plan::{FileMeta, ParquetFileMetrics};
use bytes::Bytes;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use object_store::ObjectStore;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
}
#[derive(Debug)]
pub struct DefaultParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
}
impl DefaultParquetFileReaderFactory {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
}
pub(crate) struct ParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
}
impl AsyncFileReader for ParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.file_metrics.bytes_scanned.add(range.end - range.start);
self.inner.get_bytes(range)
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
where
Self: Send,
{
let total = ranges.iter().map(|r| r.end - r.start).sum();
self.file_metrics.bytes_scanned.add(total);
self.inner.get_byte_ranges(ranges)
}
fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
self.inner.get_metadata()
}
}
impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
);
let store = Arc::clone(&self.store);
let mut inner = ParquetObjectReader::new(store, file_meta.object_meta);
if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
};
Ok(Box::new(ParquetFileReader {
inner,
file_metrics,
}))
}
}