use std::{ops::Range, sync::Arc};
use crate::arrow::arrow_reader::ArrowReaderOptions;
use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use bytes::Bytes;
use futures::{FutureExt, TryFutureExt, future::BoxFuture};
use object_store::ObjectStoreExt;
use object_store::{GetOptions, GetRange};
use object_store::{ObjectStore, path::Path};
use tokio::runtime::Handle;
#[derive(Clone, Debug)]
pub struct ParquetObjectReader {
store: Arc<dyn ObjectStore>,
path: Path,
file_size: Option<u64>,
metadata_size_hint: Option<usize>,
preload_column_index: bool,
preload_offset_index: bool,
runtime: Option<Handle>,
}
impl ParquetObjectReader {
pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self {
store,
path,
file_size: None,
metadata_size_hint: None,
preload_column_index: false,
preload_offset_index: false,
runtime: None,
}
}
pub fn with_footer_size_hint(self, hint: usize) -> Self {
Self {
metadata_size_hint: Some(hint),
..self
}
}
pub fn with_file_size(self, file_size: u64) -> Self {
Self {
file_size: Some(file_size),
..self
}
}
pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
Self {
preload_column_index,
..self
}
}
pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
Self {
preload_offset_index,
..self
}
}
pub fn with_runtime(self, handle: Handle) -> Self {
Self {
runtime: Some(handle),
..self
}
}
fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O>>
where
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
+ Send
+ 'static,
O: Send + 'static,
E: Into<ParquetError> + Send + 'static,
{
match &self.runtime {
Some(handle) => {
let path = self.path.clone();
let store = Arc::clone(&self.store);
handle
.spawn(async move { f(&store, &path).await })
.map_ok_or_else(
|e| match e.try_into_panic() {
Err(e) => Err(ParquetError::External(Box::new(e))),
Ok(p) => std::panic::resume_unwind(p),
},
|res| res.map_err(|e| e.into()),
)
.boxed()
}
None => f(&self.store, &self.path).map_err(|e| e.into()).boxed(),
}
}
}
impl MetadataSuffixFetch for &mut ParquetObjectReader {
fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
let options = GetOptions {
range: Some(GetRange::Suffix(suffix as u64)),
..Default::default()
};
self.spawn(|store, path| {
async move {
let resp = store.get_opts(path, options).await?;
Ok::<_, ParquetError>(resp.bytes().await?)
}
.boxed()
})
}
}
impl AsyncFileReader for ParquetObjectReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
self.spawn(|store, path| store.get_range(path, range).boxed())
}
fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let metadata_opts = options.map(|o| o.metadata_options().clone());
let mut metadata = ParquetMetaDataReader::new()
.with_metadata_options(metadata_opts)
.with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
.with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
.with_prefetch_hint(self.metadata_size_hint);
#[cfg(feature = "encryption")]
if let Some(options) = options {
metadata = metadata.with_decryption_properties(
options.file_decryption_properties.as_ref().map(Arc::clone),
);
}
if let Some(options) = options {
if options.column_index_policy() != PageIndexPolicy::Skip
|| options.offset_index_policy() != PageIndexPolicy::Skip
{
metadata = metadata
.with_column_index_policy(options.column_index_policy())
.with_offset_index_policy(options.offset_index_policy());
}
}
let metadata = if let Some(file_size) = self.file_size {
metadata.load_and_finish(self, file_size).await?
} else {
metadata.load_via_suffix_and_finish(self).await?
};
Ok(Arc::new(metadata))
})
}
}
#[cfg(test)]
mod tests {
use crate::arrow::async_reader::ArrowReaderOptions;
use crate::file::metadata::PageIndexPolicy;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use futures::TryStreamExt;
use crate::arrow::ParquetRecordBatchStreamBuilder;
use crate::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use crate::errors::ParquetError;
use arrow::util::test_util::parquet_test_data;
use futures::FutureExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
async fn get_meta_store() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();
let meta = store
.head(&Path::from("alltypes_plain.parquet"))
.await
.unwrap();
(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}
async fn get_meta_store_with_page_index() -> (ObjectMeta, Arc<dyn ObjectStore>) {
let res = parquet_test_data();
let store = LocalFileSystem::new_with_prefix(res).unwrap();
let meta = store
.head(&Path::from("alltypes_tiny_pages_plain.parquet"))
.await
.unwrap();
(meta, Arc::new(store) as Arc<dyn ObjectStore>)
}
#[tokio::test]
async fn test_simple() {
let (meta, store) = get_meta_store().await;
let object_reader =
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);
}
#[tokio::test]
async fn test_simple_without_file_length() {
let (meta, store) = get_meta_store().await;
let object_reader = ParquetObjectReader::new(store, meta.location);
let builder = ParquetRecordBatchStreamBuilder::new(object_reader)
.await
.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);
}
#[tokio::test]
async fn test_not_found() {
let (mut meta, store) = get_meta_store().await;
meta.location = Path::from("I don't exist.parquet");
let object_reader =
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
match ParquetRecordBatchStreamBuilder::new(object_reader).await {
Ok(_) => panic!("expected failure"),
Err(e) => {
let err = e.to_string();
assert!(err.contains("I don't exist.parquet not found:"), "{err}",);
}
}
}
#[tokio::test]
async fn test_runtime_is_used() {
let num_actions = Arc::new(AtomicUsize::new(0));
let (a1, a2) = (num_actions.clone(), num_actions.clone());
let rt = tokio::runtime::Builder::new_multi_thread()
.on_thread_park(move || {
a1.fetch_add(1, Ordering::Relaxed);
})
.on_thread_unpark(move || {
a2.fetch_add(1, Ordering::Relaxed);
})
.build()
.unwrap();
let (meta, store) = get_meta_store().await;
let initial_actions = num_actions.load(Ordering::Relaxed);
let reader = ParquetObjectReader::new(store, meta.location)
.with_file_size(meta.size)
.with_runtime(rt.handle().clone());
let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let batches: Vec<_> = builder.build().unwrap().try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 8);
assert!(num_actions.load(Ordering::Relaxed) - initial_actions > 0);
tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
}
#[tokio::test]
async fn test_runtime_thread_id_different() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
let (meta, store) = get_meta_store().await;
let reader = ParquetObjectReader::new(store, meta.location)
.with_file_size(meta.size)
.with_runtime(rt.handle().clone());
let current_id = std::thread::current().id();
let other_id = reader
.spawn(|_, _| async move { Ok::<_, ParquetError>(std::thread::current().id()) }.boxed())
.await
.unwrap();
assert_ne!(current_id, other_id);
tokio::runtime::Handle::current().spawn_blocking(move || drop(rt));
}
#[tokio::test]
async fn io_fails_on_shutdown_runtime() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();
let (meta, store) = get_meta_store().await;
let mut reader = ParquetObjectReader::new(store, meta.location)
.with_file_size(meta.size)
.with_runtime(rt.handle().clone());
rt.shutdown_background();
let err = reader.get_bytes(0..1).await.unwrap_err().to_string();
assert!(err.to_string().contains("was cancelled"));
}
#[tokio::test]
async fn test_page_index_policy_skip_uses_preload_true() {
let (meta, store) = get_meta_store_with_page_index().await;
let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
.with_file_size(meta.size)
.with_preload_column_index(true)
.with_preload_offset_index(true);
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
let metadata = reader.get_metadata(Some(&options)).await.unwrap();
assert!(metadata.column_index().is_some());
}
#[tokio::test]
async fn test_page_index_policy_optional_overrides_preload_false() {
let (meta, store) = get_meta_store_with_page_index().await;
let mut reader = ParquetObjectReader::new(store.clone(), meta.location.clone())
.with_file_size(meta.size)
.with_preload_column_index(false)
.with_preload_offset_index(false);
let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Optional);
let metadata = reader.get_metadata(Some(&options)).await.unwrap();
assert!(metadata.column_index().is_some());
}
#[tokio::test]
async fn test_page_index_policy_optional_vs_skip() {
let (meta, store) = get_meta_store_with_page_index().await;
let mut reader1 = ParquetObjectReader::new(store.clone(), meta.location.clone())
.with_file_size(meta.size)
.with_preload_column_index(false)
.with_preload_offset_index(false);
let options1 = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
let metadata1 = reader1.get_metadata(Some(&options1)).await.unwrap();
let mut reader2 = ParquetObjectReader::new(store.clone(), meta.location.clone())
.with_file_size(meta.size)
.with_preload_column_index(false)
.with_preload_offset_index(false);
let options2 = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Optional);
let metadata2 = reader2.get_metadata(Some(&options2)).await.unwrap();
assert!(metadata1.column_index().is_none());
assert!(metadata2.column_index().is_some());
}
#[tokio::test]
async fn test_page_index_policy_no_options_uses_preload() {
let (meta, store) = get_meta_store_with_page_index().await;
let mut reader = ParquetObjectReader::new(store, meta.location)
.with_file_size(meta.size)
.with_preload_column_index(true)
.with_preload_offset_index(true);
let metadata = reader.get_metadata(None).await.unwrap();
assert!(metadata.column_index().is_some() && metadata.column_index().is_some());
}
}