1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use deepsize::DeepSizeOf;
use futures::future::BoxFuture;
use lance_core::Result;
use object_store::{path::Path, GetOptions, ObjectStore};
use tokio::sync::OnceCell;
use tracing::instrument;
use crate::{object_store::DEFAULT_CLOUD_IO_PARALLELISM, traits::Reader};
/// Object Reader
///
/// Object Store + Base Path
#[derive(Debug)]
pub struct CloudObjectReader {
// Object Store.
pub object_store: Arc<dyn ObjectStore>,
// File path
pub path: Path,
// File size, if known.
size: OnceCell<usize>,
block_size: usize,
download_retry_count: usize,
}
impl DeepSizeOf for CloudObjectReader {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
// Skipping object_store because there is no easy way to do that and it shouldn't be too big
self.path.as_ref().deep_size_of_children(context)
}
}
impl CloudObjectReader {
/// Create an ObjectReader from URI
pub fn new(
object_store: Arc<dyn ObjectStore>,
path: Path,
block_size: usize,
known_size: Option<usize>,
download_retry_count: usize,
) -> Result<Self> {
Ok(Self {
object_store,
path,
size: OnceCell::new_with(known_size),
block_size,
download_retry_count,
})
}
// Retries for the initial request are handled by object store, but
// there are no retries for failures that occur during the streaming
// of the response body. Thus we add an outer retry loop here.
async fn do_with_retry<'a, O>(
&self,
f: impl Fn() -> BoxFuture<'a, std::result::Result<O, object_store::Error>>,
) -> object_store::Result<O> {
let mut retries = 3;
loop {
match f().await {
Ok(val) => return Ok(val),
Err(err) => {
if retries == 0 {
return Err(err);
}
retries -= 1;
}
}
}
}
}
#[async_trait]
impl Reader for CloudObjectReader {
fn path(&self) -> &Path {
&self.path
}
fn block_size(&self) -> usize {
self.block_size
}
fn io_parallelism(&self) -> usize {
DEFAULT_CLOUD_IO_PARALLELISM
}
/// Object/File Size.
async fn size(&self) -> object_store::Result<usize> {
self.size
.get_or_try_init(|| async move {
let meta = self
.do_with_retry(|| self.object_store.head(&self.path))
.await?;
Ok(meta.size)
})
.await
.cloned()
}
#[instrument(level = "debug", skip(self))]
async fn get_range(&self, range: Range<usize>) -> object_store::Result<Bytes> {
// We have a separate retry loop here. This is because object_store does not
// attempt retries on downloads that fail during streaming of the response body.
//
// However, this failure is pretty common (e.g. timeout) and we want to retry in these
// situations. In addition, we provide additional logging information in these
// failures cases.
let mut retries = self.download_retry_count;
loop {
let get_result = self
.do_with_retry(|| {
let options = GetOptions {
range: Some(range.clone().into()),
..Default::default()
};
self.object_store.get_opts(&self.path, options)
})
.await?;
match get_result.bytes().await {
Ok(bytes) => return Ok(bytes),
Err(err) => {
if retries == 0 {
log::warn!("Failed to download range {:?} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}", range, self.path, self.download_retry_count, err);
return Err(err);
}
log::debug!(
"Retrying range {:?} from {} (remaining retries: {}). Error details: {:?}",
range,
self.path,
retries,
err
);
retries -= 1;
}
}
}
}
}