Skip to main content

lance_io/
object_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::fs::File;
5use std::ops::Range;
6use std::sync::Arc;
7
8#[cfg(windows)]
9use crate::local::read_exact_at;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12
13use bytes::Bytes;
14use deepsize::DeepSizeOf;
15use futures::{
16    FutureExt,
17    future::{BoxFuture, Shared},
18    stream::{self, StreamExt},
19};
20use lance_core::{Error, Result, error::CloneableError};
21use object_store::ObjectStoreExt;
22use object_store::{GetOptions, GetResult, ObjectStore, Result as OSResult, path::Path};
23use tokio::sync::OnceCell;
24use tracing::instrument;
25
26use crate::{
27    object_store::DEFAULT_CLOUD_IO_PARALLELISM,
28    traits::{ByteStream, Reader},
29};
30
31trait StaticGetRange {
32    fn path(&self) -> &Path;
33    fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>>;
34}
35
36/// A wrapper around an object store and a path that implements a static
37/// get_range method by assuming self is stored in an Arc.
38struct GetRequest {
39    object_store: Arc<dyn ObjectStore>,
40    path: Path,
41    options: GetOptions,
42}
43
44impl StaticGetRange for Arc<GetRequest> {
45    fn path(&self) -> &Path {
46        &self.path
47    }
48
49    fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>> {
50        let store_and_path = self.clone();
51        Box::pin(async move {
52            store_and_path
53                .object_store
54                .get_opts(&store_and_path.path, store_and_path.options.clone())
55                .await
56        })
57    }
58}
59
60/// Object Reader
61///
62/// Object Store + Base Path
63#[derive(Debug)]
64pub struct CloudObjectReader {
65    // Object Store.
66    pub object_store: Arc<dyn ObjectStore>,
67    // File path
68    pub path: Path,
69    // File size, if known.
70    size: OnceCell<usize>,
71
72    block_size: usize,
73    download_retry_count: usize,
74}
75
76impl DeepSizeOf for CloudObjectReader {
77    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
78        // Skipping object_store because there is no easy way to do that and it shouldn't be too big
79        self.path.as_ref().deep_size_of_children(context)
80    }
81}
82
83impl CloudObjectReader {
84    /// Create an ObjectReader from URI
85    pub fn new(
86        object_store: Arc<dyn ObjectStore>,
87        path: Path,
88        block_size: usize,
89        known_size: Option<usize>,
90        download_retry_count: usize,
91    ) -> Result<Self> {
92        Ok(Self {
93            object_store,
94            path,
95            size: OnceCell::new_with(known_size),
96            block_size,
97            download_retry_count,
98        })
99    }
100}
101
102// Retries for the initial request are handled by object store, but
103// there are no retries for failures that occur during the streaming
104// of the response body. Thus we add an outer retry loop here.
105async fn do_with_retry<'a, O>(f: impl Fn() -> BoxFuture<'a, OSResult<O>> + Clone) -> OSResult<O> {
106    let mut retries = 3;
107    loop {
108        let f = f.clone();
109        match f().await {
110            Ok(val) => return Ok(val),
111            Err(err) => {
112                if retries == 0 {
113                    return Err(err);
114                }
115                retries -= 1;
116            }
117        }
118    }
119}
120
121// We have a separate retry loop here.  This is because object_store does not
122// attempt retries on downloads that fail during streaming of the response body.
123//
124// However, this failure is pretty common (e.g. timeout) and we want to retry in these
125// situations.  In addition, we provide additional logging information in these
126// failures cases.
127async fn do_get_with_outer_retry(
128    download_retry_count: usize,
129    get_request: Arc<GetRequest>,
130    desc: impl Fn() -> String,
131) -> OSResult<Bytes> {
132    let mut retries = download_retry_count;
133    loop {
134        let get_request_clone = get_request.clone();
135        let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
136        match get_result.bytes().await {
137            Ok(bytes) => return Ok(bytes),
138            Err(err) => {
139                if retries == 0 {
140                    log::warn!(
141                        "Failed to download {} from {} after {} attempts.  This may indicate that cloud storage is overloaded or your timeout settings are too restrictive.  Error details: {:?}",
142                        desc(),
143                        get_request.path(),
144                        download_retry_count,
145                        err
146                    );
147                    return Err(err);
148                }
149                log::debug!(
150                    "Retrying {} from {} (remaining retries: {}).  Error details: {:?}",
151                    desc(),
152                    get_request.path(),
153                    retries,
154                    err
155                );
156                retries -= 1;
157            }
158        }
159    }
160}
161
162impl Reader for CloudObjectReader {
163    fn path(&self) -> &Path {
164        &self.path
165    }
166
167    fn block_size(&self) -> usize {
168        self.block_size
169    }
170
171    fn io_parallelism(&self) -> usize {
172        DEFAULT_CLOUD_IO_PARALLELISM
173    }
174
175    /// Object/File Size.
176    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
177        Box::pin(async move {
178            self.size
179                .get_or_try_init(|| async move {
180                    let meta =
181                        do_with_retry(|| Box::pin(self.object_store.head(&self.path))).await?;
182                    Ok(meta.size as usize)
183                })
184                .await
185                .cloned()
186        })
187    }
188
189    #[instrument(level = "debug", skip(self))]
190    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
191        let get_request = Arc::new(GetRequest {
192            object_store: self.object_store.clone(),
193            path: self.path.clone(),
194            options: GetOptions {
195                range: Some(
196                    Range {
197                        start: range.start as u64,
198                        end: range.end as u64,
199                    }
200                    .into(),
201                ),
202                ..Default::default()
203            },
204        });
205        Box::pin(do_get_with_outer_retry(
206            self.download_retry_count,
207            get_request,
208            move || format!("range {:?}", range),
209        ))
210    }
211
212    #[instrument(level = "debug", skip_all)]
213    fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
214        let get_request = Arc::new(GetRequest {
215            object_store: self.object_store.clone(),
216            path: self.path.clone(),
217            options: GetOptions::default(),
218        });
219        Box::pin(async move {
220            do_get_with_outer_retry(self.download_retry_count, get_request, || {
221                "read_all".to_string()
222            })
223            .await
224        })
225    }
226
227    fn get_stream(&self) -> BoxFuture<'_, OSResult<ByteStream>> {
228        let get_request = Arc::new(GetRequest {
229            object_store: self.object_store.clone(),
230            path: self.path.clone(),
231            options: GetOptions::default(),
232        });
233        Box::pin(async move {
234            let get_request_clone = get_request.clone();
235            let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
236            Ok(get_result.into_stream())
237        })
238    }
239
240    fn get_range_stream(&self, range: Range<usize>) -> BoxFuture<'_, OSResult<ByteStream>> {
241        let get_request = Arc::new(GetRequest {
242            object_store: self.object_store.clone(),
243            path: self.path.clone(),
244            options: GetOptions {
245                range: Some(
246                    Range {
247                        start: range.start as u64,
248                        end: range.end as u64,
249                    }
250                    .into(),
251                ),
252                ..Default::default()
253            },
254        });
255        Box::pin(async move {
256            let get_request_clone = get_request.clone();
257            let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
258            Ok(get_result.into_stream())
259        })
260    }
261}
262
263#[derive(Debug)]
264pub struct SmallReaderInner {
265    path: Path,
266    size: usize,
267    state: std::sync::Mutex<SmallReaderState>,
268}
269
270/// A reader for a file so small, we just eagerly read it all into memory.
271///
272/// When created, it represents a future that will read the whole file into memory.
273///
274/// On the first read call, it will start the read. Multiple threads can call read at the same time.
275///
276/// Once the read is complete, any thread can call read again to get the result.
277#[derive(Clone, Debug)]
278pub struct SmallReader {
279    inner: Arc<SmallReaderInner>,
280}
281
282enum SmallReaderState {
283    Loading(Shared<BoxFuture<'static, std::result::Result<Bytes, CloneableError>>>),
284    Finished(std::result::Result<Bytes, CloneableError>),
285}
286
287impl std::fmt::Debug for SmallReaderState {
288    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289        match self {
290            Self::Loading(_) => write!(f, "Loading"),
291            Self::Finished(Ok(data)) => {
292                write!(f, "Finished({} bytes)", data.len())
293            }
294            Self::Finished(Err(err)) => {
295                write!(f, "Finished({})", err.0)
296            }
297        }
298    }
299}
300
301impl SmallReader {
302    pub fn new(
303        store: Arc<dyn ObjectStore>,
304        path: Path,
305        download_retry_count: usize,
306        size: usize,
307    ) -> Self {
308        let path_ref = path.clone();
309        let state = SmallReaderState::Loading(
310            Box::pin(async move {
311                let object_reader =
312                    CloudObjectReader::new(store, path_ref, 0, None, download_retry_count)
313                        .map_err(CloneableError)?;
314                object_reader
315                    .get_all()
316                    .await
317                    .map_err(|err| CloneableError(Error::from(err)))
318            })
319            .boxed()
320            .shared(),
321        );
322        Self {
323            inner: Arc::new(SmallReaderInner {
324                path,
325                size,
326                state: std::sync::Mutex::new(state),
327            }),
328        }
329    }
330}
331
332impl SmallReaderInner {
333    async fn wait(&self) -> OSResult<Bytes> {
334        let future = {
335            let state = self.state.lock().unwrap();
336            match &*state {
337                SmallReaderState::Loading(future) => future.clone(),
338                SmallReaderState::Finished(result) => {
339                    return result.clone().map_err(|err| err.0.into());
340                }
341            }
342        };
343
344        let result = future.await;
345        let result_to_return = result.clone().map_err(|err| err.0.into());
346        let mut state = self.state.lock().unwrap();
347        if matches!(*state, SmallReaderState::Loading(_)) {
348            *state = SmallReaderState::Finished(result);
349        }
350        result_to_return
351    }
352}
353
354impl Reader for SmallReader {
355    fn path(&self) -> &Path {
356        &self.inner.path
357    }
358
359    fn block_size(&self) -> usize {
360        64 * 1024
361    }
362
363    fn io_parallelism(&self) -> usize {
364        1024
365    }
366
367    /// Object/File Size.
368    fn size(&self) -> BoxFuture<'_, OSResult<usize>> {
369        let size = self.inner.size;
370        Box::pin(async move { Ok(size) })
371    }
372
373    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
374        let inner = self.inner.clone();
375        Box::pin(async move {
376            let bytes = inner.wait().await?;
377            let start = range.start;
378            let end = range.end;
379            if start >= bytes.len() || end > bytes.len() {
380                return Err(object_store::Error::Generic {
381                    store: "memory",
382                    source: format!(
383                        "Invalid range {}..{} for object of size {} bytes",
384                        start,
385                        end,
386                        bytes.len()
387                    )
388                    .into(),
389                });
390            }
391            Ok(bytes.slice(range))
392        })
393    }
394
395    fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
396        Box::pin(async move { self.inner.wait().await })
397    }
398}
399
400pub(crate) fn stream_local_range(
401    file: Arc<File>,
402    path: Path,
403    io_tracker: Arc<crate::utils::tracking_store::IOTracker>,
404    range: Range<usize>,
405    chunk_size: usize,
406) -> ByteStream {
407    stream::try_unfold(
408        (file, path, io_tracker, range.start, range.end),
409        move |state| async move {
410            let (file, path, io_tracker, start, end) = state;
411            if start >= end {
412                return Ok(None);
413            }
414
415            let next = (start + chunk_size).min(end);
416            let file_clone = file.clone();
417            let path_clone = path.clone();
418            let bytes = tokio::task::spawn_blocking(move || {
419                let mut buf = bytes::BytesMut::with_capacity(next - start);
420                // Safety: buffer capacity matches the exact number of bytes we read below.
421                unsafe { buf.set_len(next - start) };
422                #[cfg(unix)]
423                file_clone.read_exact_at(buf.as_mut(), start as u64)?;
424                #[cfg(windows)]
425                read_exact_at(file_clone, buf.as_mut(), start as u64)?;
426                Ok::<_, std::io::Error>(buf.freeze())
427            })
428            .await?
429            .map_err(|err: std::io::Error| object_store::Error::Generic {
430                store: "LocalFileSystem",
431                source: err.into(),
432            })?;
433
434            io_tracker.record_read(
435                "get_range_stream",
436                path_clone,
437                (next - start) as u64,
438                Some(start as u64..next as u64),
439            );
440
441            Ok(Some((bytes, (file, path, io_tracker, next, end))))
442        },
443    )
444    .boxed()
445}
446
447impl DeepSizeOf for SmallReader {
448    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
449        let mut size = self.inner.path.as_ref().deep_size_of_children(context);
450
451        if let Ok(guard) = self.inner.state.try_lock()
452            && let SmallReaderState::Finished(Ok(data)) = &*guard
453        {
454            size += data.len();
455        }
456
457        size
458    }
459}