1use std::fs::File;
5use std::ops::Range;
6use std::sync::Arc;
7
8#[cfg(windows)]
9use crate::local::read_exact_at;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12
13use bytes::Bytes;
14use futures::{
15 FutureExt,
16 future::{BoxFuture, Shared},
17 stream::{self, StreamExt},
18};
19use lance_core::deepsize::DeepSizeOf;
20use lance_core::{Error, Result, error::CloneableError};
21use object_store::ObjectStoreExt;
22use object_store::{GetOptions, GetResult, ObjectStore, Result as OSResult, path::Path};
23use tokio::sync::OnceCell;
24use tracing::instrument;
25
26use crate::{
27 object_store::DEFAULT_CLOUD_IO_PARALLELISM,
28 traits::{ByteStream, Reader},
29};
30
31trait StaticGetRange {
32 fn path(&self) -> &Path;
33 fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>>;
34}
35
36struct GetRequest {
39 object_store: Arc<dyn ObjectStore>,
40 path: Path,
41 options: GetOptions,
42}
43
44impl StaticGetRange for Arc<GetRequest> {
45 fn path(&self) -> &Path {
46 &self.path
47 }
48
49 fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>> {
50 let store_and_path = self.clone();
51 Box::pin(async move {
52 store_and_path
53 .object_store
54 .get_opts(&store_and_path.path, store_and_path.options.clone())
55 .await
56 })
57 }
58}
59
60#[derive(Debug)]
64pub struct CloudObjectReader {
65 pub object_store: Arc<dyn ObjectStore>,
67 pub path: Path,
69 size: OnceCell<usize>,
71
72 block_size: usize,
73 download_retry_count: usize,
74}
75
76impl DeepSizeOf for CloudObjectReader {
77 fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
78 self.path.as_ref().deep_size_of_children(context)
80 }
81}
82
83impl CloudObjectReader {
84 pub fn new(
86 object_store: Arc<dyn ObjectStore>,
87 path: Path,
88 block_size: usize,
89 known_size: Option<usize>,
90 download_retry_count: usize,
91 ) -> Result<Self> {
92 Ok(Self {
93 object_store,
94 path,
95 size: OnceCell::new_with(known_size),
96 block_size,
97 download_retry_count,
98 })
99 }
100}
101
102async fn do_with_retry<'a, O>(f: impl Fn() -> BoxFuture<'a, OSResult<O>> + Clone) -> OSResult<O> {
106 let mut retries = 3;
107 loop {
108 let f = f.clone();
109 match f().await {
110 Ok(val) => return Ok(val),
111 Err(err) => {
112 if retries == 0 {
113 return Err(err);
114 }
115 retries -= 1;
116 }
117 }
118 }
119}
120
121async fn do_get_with_outer_retry(
128 download_retry_count: usize,
129 get_request: Arc<GetRequest>,
130 desc: impl Fn() -> String,
131) -> OSResult<Bytes> {
132 let mut retries = download_retry_count;
133 loop {
134 let get_request_clone = get_request.clone();
135 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
136 match get_result.bytes().await {
137 Ok(bytes) => return Ok(bytes),
138 Err(err) => {
139 if retries == 0 {
140 log::warn!(
141 "Failed to download {} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}",
142 desc(),
143 get_request.path(),
144 download_retry_count,
145 err
146 );
147 return Err(err);
148 }
149 log::debug!(
150 "Retrying {} from {} (remaining retries: {}). Error details: {:?}",
151 desc(),
152 get_request.path(),
153 retries,
154 err
155 );
156 retries -= 1;
157 }
158 }
159 }
160}
161
162impl Reader for CloudObjectReader {
163 fn path(&self) -> &Path {
164 &self.path
165 }
166
167 fn block_size(&self) -> usize {
168 self.block_size
169 }
170
171 fn io_parallelism(&self) -> usize {
172 DEFAULT_CLOUD_IO_PARALLELISM
173 }
174
175 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
177 Box::pin(async move {
178 self.size
179 .get_or_try_init(|| async move {
180 let meta =
181 do_with_retry(|| Box::pin(self.object_store.head(&self.path))).await?;
182 Ok(meta.size as usize)
183 })
184 .await
185 .cloned()
186 })
187 }
188
189 #[instrument(level = "debug", skip(self))]
190 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
191 let object_store = self.object_store.clone();
192 let path = self.path.clone();
193 let get_range = Range {
194 start: range.start as u64,
195 end: range.end as u64,
196 };
197 Box::pin(async move {
198 let bytes = do_with_retry(move || {
199 let object_store = object_store.clone();
200 let path = path.clone();
201 let get_range = get_range.clone();
202 Box::pin(async move { object_store.get_ranges(&path, &[get_range]).await })
203 })
204 .await?;
205
206 bytes
207 .into_iter()
208 .next()
209 .ok_or_else(|| object_store::Error::Generic {
210 store: "CloudObjectReader",
211 source: "get_ranges returned no bytes".into(),
212 })
213 })
214 }
215
216 #[instrument(level = "debug", skip_all)]
217 fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
218 let get_request = Arc::new(GetRequest {
219 object_store: self.object_store.clone(),
220 path: self.path.clone(),
221 options: GetOptions::default(),
222 });
223 Box::pin(async move {
224 do_get_with_outer_retry(self.download_retry_count, get_request, || {
225 "read_all".to_string()
226 })
227 .await
228 })
229 }
230
231 fn get_stream(&self) -> BoxFuture<'_, OSResult<ByteStream>> {
232 let get_request = Arc::new(GetRequest {
233 object_store: self.object_store.clone(),
234 path: self.path.clone(),
235 options: GetOptions::default(),
236 });
237 Box::pin(async move {
238 let get_request_clone = get_request.clone();
239 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
240 Ok(get_result.into_stream())
241 })
242 }
243
244 fn get_range_stream(&self, range: Range<usize>) -> BoxFuture<'_, OSResult<ByteStream>> {
245 let get_request = Arc::new(GetRequest {
246 object_store: self.object_store.clone(),
247 path: self.path.clone(),
248 options: GetOptions {
249 range: Some(
250 Range {
251 start: range.start as u64,
252 end: range.end as u64,
253 }
254 .into(),
255 ),
256 ..Default::default()
257 },
258 });
259 Box::pin(async move {
260 let get_request_clone = get_request.clone();
261 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
262 Ok(get_result.into_stream())
263 })
264 }
265}
266
267#[derive(Debug)]
268pub struct SmallReaderInner {
269 path: Path,
270 size: usize,
271 state: std::sync::Mutex<SmallReaderState>,
272}
273
274#[derive(Clone, Debug)]
282pub struct SmallReader {
283 inner: Arc<SmallReaderInner>,
284}
285
286enum SmallReaderState {
287 Loading(Shared<BoxFuture<'static, std::result::Result<Bytes, CloneableError>>>),
288 Finished(std::result::Result<Bytes, CloneableError>),
289}
290
291impl std::fmt::Debug for SmallReaderState {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 match self {
294 Self::Loading(_) => write!(f, "Loading"),
295 Self::Finished(Ok(data)) => {
296 write!(f, "Finished({} bytes)", data.len())
297 }
298 Self::Finished(Err(err)) => {
299 write!(f, "Finished({})", err.0)
300 }
301 }
302 }
303}
304
305impl SmallReader {
306 pub fn new(
307 store: Arc<dyn ObjectStore>,
308 path: Path,
309 download_retry_count: usize,
310 size: usize,
311 ) -> Self {
312 let path_ref = path.clone();
313 let state = SmallReaderState::Loading(
314 Box::pin(async move {
315 let object_reader =
316 CloudObjectReader::new(store, path_ref, 0, None, download_retry_count)
317 .map_err(CloneableError)?;
318 object_reader
319 .get_all()
320 .await
321 .map_err(|err| CloneableError(Error::from(err)))
322 })
323 .boxed()
324 .shared(),
325 );
326 Self {
327 inner: Arc::new(SmallReaderInner {
328 path,
329 size,
330 state: std::sync::Mutex::new(state),
331 }),
332 }
333 }
334}
335
336impl SmallReaderInner {
337 async fn wait(&self) -> OSResult<Bytes> {
338 let future = {
339 let state = self.state.lock().unwrap();
340 match &*state {
341 SmallReaderState::Loading(future) => future.clone(),
342 SmallReaderState::Finished(result) => {
343 return result.clone().map_err(|err| err.0.into());
344 }
345 }
346 };
347
348 let result = future.await;
349 let result_to_return = result.clone().map_err(|err| err.0.into());
350 let mut state = self.state.lock().unwrap();
351 if matches!(*state, SmallReaderState::Loading(_)) {
352 *state = SmallReaderState::Finished(result);
353 }
354 result_to_return
355 }
356}
357
358impl Reader for SmallReader {
359 fn path(&self) -> &Path {
360 &self.inner.path
361 }
362
363 fn block_size(&self) -> usize {
364 64 * 1024
365 }
366
367 fn io_parallelism(&self) -> usize {
368 1024
369 }
370
371 fn size(&self) -> BoxFuture<'_, OSResult<usize>> {
373 let size = self.inner.size;
374 Box::pin(async move { Ok(size) })
375 }
376
377 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
378 let inner = self.inner.clone();
379 Box::pin(async move {
380 let bytes = inner.wait().await?;
381 let start = range.start;
382 let end = range.end;
383 if start >= bytes.len() || end > bytes.len() {
384 return Err(object_store::Error::Generic {
385 store: "memory",
386 source: format!(
387 "Invalid range {}..{} for object of size {} bytes",
388 start,
389 end,
390 bytes.len()
391 )
392 .into(),
393 });
394 }
395 Ok(bytes.slice(range))
396 })
397 }
398
399 fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
400 Box::pin(async move { self.inner.wait().await })
401 }
402}
403
404pub(crate) fn stream_local_range(
405 file: Arc<File>,
406 path: Path,
407 io_tracker: Arc<crate::utils::tracking_store::IOTracker>,
408 range: Range<usize>,
409 chunk_size: usize,
410) -> ByteStream {
411 stream::try_unfold(
412 (file, path, io_tracker, range.start, range.end),
413 move |state| async move {
414 let (file, path, io_tracker, start, end) = state;
415 if start >= end {
416 return Ok(None);
417 }
418
419 let next = (start + chunk_size).min(end);
420 let file_clone = file.clone();
421 let path_clone = path.clone();
422 let bytes = tokio::task::spawn_blocking(move || {
423 let mut buf = bytes::BytesMut::with_capacity(next - start);
424 unsafe { buf.set_len(next - start) };
426 #[cfg(unix)]
427 file_clone.read_exact_at(buf.as_mut(), start as u64)?;
428 #[cfg(windows)]
429 read_exact_at(file_clone, buf.as_mut(), start as u64)?;
430 Ok::<_, std::io::Error>(buf.freeze())
431 })
432 .await?
433 .map_err(|err: std::io::Error| object_store::Error::Generic {
434 store: "LocalFileSystem",
435 source: err.into(),
436 })?;
437
438 io_tracker.record_read(
439 "get_range_stream",
440 path_clone,
441 (next - start) as u64,
442 Some(start as u64..next as u64),
443 );
444
445 Ok(Some((bytes, (file, path, io_tracker, next, end))))
446 },
447 )
448 .boxed()
449}
450
451impl DeepSizeOf for SmallReader {
452 fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
453 let mut size = self.inner.path.as_ref().deep_size_of_children(context);
454
455 if let Ok(guard) = self.inner.state.try_lock()
456 && let SmallReaderState::Finished(Ok(data)) = &*guard
457 {
458 size += data.len();
459 }
460
461 size
462 }
463}