use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, ObjectStore};
use tokio::sync::OnceCell;
use tracing::instrument;
use crate::traits::Reader;
#[derive(Debug)]
pub struct CloudObjectReader {
pub object_store: Arc<dyn ObjectStore>,
pub path: Path,
size: OnceCell<usize>,
block_size: usize,
}
impl DeepSizeOf for CloudObjectReader {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
self.path.as_ref().deep_size_of_children(context)
}
}
impl CloudObjectReader {
pub fn new(
object_store: Arc<dyn ObjectStore>,
path: Path,
block_size: usize,
known_size: Option<usize>,
) -> Result<Self> {
Ok(Self {
object_store,
path,
size: OnceCell::new_with(known_size),
block_size,
})
}
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> object_store::Result<O> {
let mut retries = 3;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(err) => {
if retries == 0 {
return Err(err);
}
retries -= 1;
}
}
}
}
}
#[async_trait]
impl Reader for CloudObjectReader {
fn path(&self) -> &Path {
&self.path
}
fn block_size(&self) -> usize {
self.block_size
}
async fn size(&self) -> object_store::Result<usize> {
self.size
.get_or_try_init(|| async move {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
})
.await
.cloned()
}
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
self.do_with_retry(|| self.object_store.get_range(&self.path, range.clone()))
.await
}
}