use std::sync::Arc;
use bytes::Bytes;
use futures::TryStreamExt;
use futures::stream::BoxStream;
use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;
use opendal::raw::*;
use opendal::*;
use super::core::parse_op_read;
use super::error::parse_error;
pub struct ObjectStoreReader {
bytes_stream: BoxStream<'static, object_store::Result<Bytes>>,
meta: object_store::ObjectMeta,
args: OpRead,
}
impl ObjectStoreReader {
pub(crate) async fn new(
store: Arc<dyn ObjectStore + 'static>,
path: &str,
args: OpRead,
) -> Result<Self> {
let path = ObjectStorePath::from(path);
let opts = parse_op_read(&args)?;
let result = store.get_opts(&path, opts).await.map_err(parse_error)?;
let meta = result.meta.clone();
let bytes_stream = result.into_stream();
Ok(Self {
bytes_stream,
meta,
args,
})
}
pub(crate) fn rp(&self) -> RpRead {
let mut rp = RpRead::new().with_size(Some(self.meta.size));
if !self.args.range().is_full() {
let range = self.args.range();
let size = match range.size() {
Some(size) => size,
None => self.meta.size,
};
rp = rp.with_range(Some(
BytesContentRange::default().with_range(range.offset(), range.offset() + size - 1),
));
}
rp
}
}
unsafe impl Sync for ObjectStoreReader {}
impl oio::Read for ObjectStoreReader {
async fn read(&mut self) -> Result<Buffer> {
let bs = self.bytes_stream.try_next().await.map_err(parse_error)?;
Ok(bs.map(Buffer::from).unwrap_or_default())
}
}