1use std::ops::Range;
5use std::sync::Arc;
6
7use bytes::Bytes;
8use deepsize::DeepSizeOf;
9use futures::{
10 FutureExt,
11 future::{BoxFuture, Shared},
12};
13use lance_core::{Error, Result, error::CloneableError};
14use object_store::{GetOptions, GetResult, ObjectStore, Result as OSResult, path::Path};
15use tokio::sync::OnceCell;
16use tracing::instrument;
17
18use crate::{object_store::DEFAULT_CLOUD_IO_PARALLELISM, traits::Reader};
19
20trait StaticGetRange {
21 fn path(&self) -> &Path;
22 fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>>;
23}
24
25struct GetRequest {
28 object_store: Arc<dyn ObjectStore>,
29 path: Path,
30 options: GetOptions,
31}
32
33impl StaticGetRange for Arc<GetRequest> {
34 fn path(&self) -> &Path {
35 &self.path
36 }
37
38 fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>> {
39 let store_and_path = self.clone();
40 Box::pin(async move {
41 store_and_path
42 .object_store
43 .get_opts(&store_and_path.path, store_and_path.options.clone())
44 .await
45 })
46 }
47}
48
49#[derive(Debug)]
53pub struct CloudObjectReader {
54 pub object_store: Arc<dyn ObjectStore>,
56 pub path: Path,
58 size: OnceCell<usize>,
60
61 block_size: usize,
62 download_retry_count: usize,
63}
64
65impl DeepSizeOf for CloudObjectReader {
66 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
67 self.path.as_ref().deep_size_of_children(context)
69 }
70}
71
72impl CloudObjectReader {
73 pub fn new(
75 object_store: Arc<dyn ObjectStore>,
76 path: Path,
77 block_size: usize,
78 known_size: Option<usize>,
79 download_retry_count: usize,
80 ) -> Result<Self> {
81 Ok(Self {
82 object_store,
83 path,
84 size: OnceCell::new_with(known_size),
85 block_size,
86 download_retry_count,
87 })
88 }
89}
90
91async fn do_with_retry<'a, O>(f: impl Fn() -> BoxFuture<'a, OSResult<O>> + Clone) -> OSResult<O> {
95 let mut retries = 3;
96 loop {
97 let f = f.clone();
98 match f().await {
99 Ok(val) => return Ok(val),
100 Err(err) => {
101 if retries == 0 {
102 return Err(err);
103 }
104 retries -= 1;
105 }
106 }
107 }
108}
109
110async fn do_get_with_outer_retry(
117 download_retry_count: usize,
118 get_request: Arc<GetRequest>,
119 desc: impl Fn() -> String,
120) -> OSResult<Bytes> {
121 let mut retries = download_retry_count;
122 loop {
123 let get_request_clone = get_request.clone();
124 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
125 match get_result.bytes().await {
126 Ok(bytes) => return Ok(bytes),
127 Err(err) => {
128 if retries == 0 {
129 log::warn!(
130 "Failed to download {} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}",
131 desc(),
132 get_request.path(),
133 download_retry_count,
134 err
135 );
136 return Err(err);
137 }
138 log::debug!(
139 "Retrying {} from {} (remaining retries: {}). Error details: {:?}",
140 desc(),
141 get_request.path(),
142 retries,
143 err
144 );
145 retries -= 1;
146 }
147 }
148 }
149}
150
151impl Reader for CloudObjectReader {
152 fn path(&self) -> &Path {
153 &self.path
154 }
155
156 fn block_size(&self) -> usize {
157 self.block_size
158 }
159
160 fn io_parallelism(&self) -> usize {
161 DEFAULT_CLOUD_IO_PARALLELISM
162 }
163
164 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
166 Box::pin(async move {
167 self.size
168 .get_or_try_init(|| async move {
169 let meta = do_with_retry(|| self.object_store.head(&self.path)).await?;
170 Ok(meta.size as usize)
171 })
172 .await
173 .cloned()
174 })
175 }
176
177 #[instrument(level = "debug", skip(self))]
178 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
179 let get_request = Arc::new(GetRequest {
180 object_store: self.object_store.clone(),
181 path: self.path.clone(),
182 options: GetOptions {
183 range: Some(
184 Range {
185 start: range.start as u64,
186 end: range.end as u64,
187 }
188 .into(),
189 ),
190 ..Default::default()
191 },
192 });
193 Box::pin(do_get_with_outer_retry(
194 self.download_retry_count,
195 get_request,
196 move || format!("range {:?}", range),
197 ))
198 }
199
200 #[instrument(level = "debug", skip_all)]
201 fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
202 let get_request = Arc::new(GetRequest {
203 object_store: self.object_store.clone(),
204 path: self.path.clone(),
205 options: GetOptions::default(),
206 });
207 Box::pin(async move {
208 do_get_with_outer_retry(self.download_retry_count, get_request, || {
209 "read_all".to_string()
210 })
211 .await
212 })
213 }
214}
215
216#[derive(Debug)]
217pub struct SmallReaderInner {
218 path: Path,
219 size: usize,
220 state: std::sync::Mutex<SmallReaderState>,
221}
222
223#[derive(Clone, Debug)]
231pub struct SmallReader {
232 inner: Arc<SmallReaderInner>,
233}
234
235enum SmallReaderState {
236 Loading(Shared<BoxFuture<'static, std::result::Result<Bytes, CloneableError>>>),
237 Finished(std::result::Result<Bytes, CloneableError>),
238}
239
240impl std::fmt::Debug for SmallReaderState {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 Self::Loading(_) => write!(f, "Loading"),
244 Self::Finished(Ok(data)) => {
245 write!(f, "Finished({} bytes)", data.len())
246 }
247 Self::Finished(Err(err)) => {
248 write!(f, "Finished({})", err.0)
249 }
250 }
251 }
252}
253
254impl SmallReader {
255 pub fn new(
256 store: Arc<dyn ObjectStore>,
257 path: Path,
258 download_retry_count: usize,
259 size: usize,
260 ) -> Self {
261 let path_ref = path.clone();
262 let state = SmallReaderState::Loading(
263 Box::pin(async move {
264 let object_reader =
265 CloudObjectReader::new(store, path_ref, 0, None, download_retry_count)
266 .map_err(CloneableError)?;
267 object_reader
268 .get_all()
269 .await
270 .map_err(|err| CloneableError(Error::from(err)))
271 })
272 .boxed()
273 .shared(),
274 );
275 Self {
276 inner: Arc::new(SmallReaderInner {
277 path,
278 size,
279 state: std::sync::Mutex::new(state),
280 }),
281 }
282 }
283}
284
285impl SmallReaderInner {
286 async fn wait(&self) -> OSResult<Bytes> {
287 let future = {
288 let state = self.state.lock().unwrap();
289 match &*state {
290 SmallReaderState::Loading(future) => future.clone(),
291 SmallReaderState::Finished(result) => {
292 return result.clone().map_err(|err| err.0.into());
293 }
294 }
295 };
296
297 let result = future.await;
298 let result_to_return = result.clone().map_err(|err| err.0.into());
299 let mut state = self.state.lock().unwrap();
300 if matches!(*state, SmallReaderState::Loading(_)) {
301 *state = SmallReaderState::Finished(result);
302 }
303 result_to_return
304 }
305}
306
307impl Reader for SmallReader {
308 fn path(&self) -> &Path {
309 &self.inner.path
310 }
311
312 fn block_size(&self) -> usize {
313 64 * 1024
314 }
315
316 fn io_parallelism(&self) -> usize {
317 1024
318 }
319
320 fn size(&self) -> BoxFuture<'_, OSResult<usize>> {
322 let size = self.inner.size;
323 Box::pin(async move { Ok(size) })
324 }
325
326 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
327 let inner = self.inner.clone();
328 Box::pin(async move {
329 let bytes = inner.wait().await?;
330 let start = range.start;
331 let end = range.end;
332 if start >= bytes.len() || end > bytes.len() {
333 return Err(object_store::Error::Generic {
334 store: "memory",
335 source: format!(
336 "Invalid range {}..{} for object of size {} bytes",
337 start,
338 end,
339 bytes.len()
340 )
341 .into(),
342 });
343 }
344 Ok(bytes.slice(range))
345 })
346 }
347
348 fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
349 Box::pin(async move { self.inner.wait().await })
350 }
351}
352
353impl DeepSizeOf for SmallReader {
354 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
355 let mut size = self.inner.path.as_ref().deep_size_of_children(context);
356
357 if let Ok(guard) = self.inner.state.try_lock()
358 && let SmallReaderState::Finished(Ok(data)) = &*guard
359 {
360 size += data.len();
361 }
362
363 size
364 }
365}