use crate::errors::AvroError;
use crate::reader::async_reader::AsyncFileReader;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
use object_store::path::Path;
use std::error::Error;
use std::ops::Range;
use std::sync::Arc;
use tokio::runtime::Handle;
pub struct AvroObjectReader {
store: Arc<dyn ObjectStore>,
path: Path,
runtime: Option<Handle>,
}
impl AvroObjectReader {
pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self {
store,
path,
runtime: None,
}
}
pub fn with_runtime(self, handle: Handle) -> Self {
Self {
runtime: Some(handle),
..self
}
}
fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O, AvroError>>
where
F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, Result<O, E>>
+ Send
+ 'static,
O: Send + 'static,
E: Error + Send + 'static,
{
match &self.runtime {
Some(handle) => {
let path = self.path.clone();
let store = Arc::clone(&self.store);
handle
.spawn(async move { f(&store, &path).await })
.map_ok_or_else(
|e| match e.try_into_panic() {
Err(e) => Err(AvroError::External(Box::new(e))),
Ok(p) => std::panic::resume_unwind(p),
},
|res| res.map_err(|e| AvroError::General(e.to_string())),
)
.boxed()
}
None => f(&self.store, &self.path)
.map_err(|e| AvroError::General(e.to_string()))
.boxed(),
}
}
}
impl AsyncFileReader for AvroObjectReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, AvroError>> {
self.spawn(|store, path| async move { store.get_range(path, range).await }.boxed())
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, Result<Vec<Bytes>, AvroError>>
where
Self: Send,
{
self.spawn(|store, path| async move { store.get_ranges(path, &ranges).await }.boxed())
}
}