Skip to main content

lance_io/
object_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::fs::File;
5use std::ops::Range;
6use std::sync::Arc;
7
8#[cfg(windows)]
9use crate::local::read_exact_at;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12
13use bytes::Bytes;
14use deepsize::DeepSizeOf;
15use futures::{
16    FutureExt,
17    future::{BoxFuture, Shared},
18    stream::{self, StreamExt},
19};
20use lance_core::{Error, Result, error::CloneableError};
21use object_store::{GetOptions, GetResult, ObjectStore, Result as OSResult, path::Path};
22use tokio::sync::OnceCell;
23use tracing::instrument;
24
25use crate::{
26    object_store::DEFAULT_CLOUD_IO_PARALLELISM,
27    traits::{ByteStream, Reader},
28};
29
30trait StaticGetRange {
31    fn path(&self) -> &Path;
32    fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>>;
33}
34
35/// A wrapper around an object store and a path that implements a static
36/// get_range method by assuming self is stored in an Arc.
37struct GetRequest {
38    object_store: Arc<dyn ObjectStore>,
39    path: Path,
40    options: GetOptions,
41}
42
43impl StaticGetRange for Arc<GetRequest> {
44    fn path(&self) -> &Path {
45        &self.path
46    }
47
48    fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>> {
49        let store_and_path = self.clone();
50        Box::pin(async move {
51            store_and_path
52                .object_store
53                .get_opts(&store_and_path.path, store_and_path.options.clone())
54                .await
55        })
56    }
57}
58
59/// Object Reader
60///
61/// Object Store + Base Path
62#[derive(Debug)]
63pub struct CloudObjectReader {
64    // Object Store.
65    pub object_store: Arc<dyn ObjectStore>,
66    // File path
67    pub path: Path,
68    // File size, if known.
69    size: OnceCell<usize>,
70
71    block_size: usize,
72    download_retry_count: usize,
73}
74
75impl DeepSizeOf for CloudObjectReader {
76    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
77        // Skipping object_store because there is no easy way to do that and it shouldn't be too big
78        self.path.as_ref().deep_size_of_children(context)
79    }
80}
81
82impl CloudObjectReader {
83    /// Create an ObjectReader from URI
84    pub fn new(
85        object_store: Arc<dyn ObjectStore>,
86        path: Path,
87        block_size: usize,
88        known_size: Option<usize>,
89        download_retry_count: usize,
90    ) -> Result<Self> {
91        Ok(Self {
92            object_store,
93            path,
94            size: OnceCell::new_with(known_size),
95            block_size,
96            download_retry_count,
97        })
98    }
99}
100
101// Retries for the initial request are handled by object store, but
102// there are no retries for failures that occur during the streaming
103// of the response body. Thus we add an outer retry loop here.
104async fn do_with_retry<'a, O>(f: impl Fn() -> BoxFuture<'a, OSResult<O>> + Clone) -> OSResult<O> {
105    let mut retries = 3;
106    loop {
107        let f = f.clone();
108        match f().await {
109            Ok(val) => return Ok(val),
110            Err(err) => {
111                if retries == 0 {
112                    return Err(err);
113                }
114                retries -= 1;
115            }
116        }
117    }
118}
119
120// We have a separate retry loop here.  This is because object_store does not
121// attempt retries on downloads that fail during streaming of the response body.
122//
123// However, this failure is pretty common (e.g. timeout) and we want to retry in these
124// situations.  In addition, we provide additional logging information in these
125// failures cases.
126async fn do_get_with_outer_retry(
127    download_retry_count: usize,
128    get_request: Arc<GetRequest>,
129    desc: impl Fn() -> String,
130) -> OSResult<Bytes> {
131    let mut retries = download_retry_count;
132    loop {
133        let get_request_clone = get_request.clone();
134        let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
135        match get_result.bytes().await {
136            Ok(bytes) => return Ok(bytes),
137            Err(err) => {
138                if retries == 0 {
139                    log::warn!(
140                        "Failed to download {} from {} after {} attempts.  This may indicate that cloud storage is overloaded or your timeout settings are too restrictive.  Error details: {:?}",
141                        desc(),
142                        get_request.path(),
143                        download_retry_count,
144                        err
145                    );
146                    return Err(err);
147                }
148                log::debug!(
149                    "Retrying {} from {} (remaining retries: {}).  Error details: {:?}",
150                    desc(),
151                    get_request.path(),
152                    retries,
153                    err
154                );
155                retries -= 1;
156            }
157        }
158    }
159}
160
161impl Reader for CloudObjectReader {
162    fn path(&self) -> &Path {
163        &self.path
164    }
165
166    fn block_size(&self) -> usize {
167        self.block_size
168    }
169
170    fn io_parallelism(&self) -> usize {
171        DEFAULT_CLOUD_IO_PARALLELISM
172    }
173
174    /// Object/File Size.
175    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
176        Box::pin(async move {
177            self.size
178                .get_or_try_init(|| async move {
179                    let meta = do_with_retry(|| self.object_store.head(&self.path)).await?;
180                    Ok(meta.size as usize)
181                })
182                .await
183                .cloned()
184        })
185    }
186
187    #[instrument(level = "debug", skip(self))]
188    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
189        let get_request = Arc::new(GetRequest {
190            object_store: self.object_store.clone(),
191            path: self.path.clone(),
192            options: GetOptions {
193                range: Some(
194                    Range {
195                        start: range.start as u64,
196                        end: range.end as u64,
197                    }
198                    .into(),
199                ),
200                ..Default::default()
201            },
202        });
203        Box::pin(do_get_with_outer_retry(
204            self.download_retry_count,
205            get_request,
206            move || format!("range {:?}", range),
207        ))
208    }
209
210    #[instrument(level = "debug", skip_all)]
211    fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
212        let get_request = Arc::new(GetRequest {
213            object_store: self.object_store.clone(),
214            path: self.path.clone(),
215            options: GetOptions::default(),
216        });
217        Box::pin(async move {
218            do_get_with_outer_retry(self.download_retry_count, get_request, || {
219                "read_all".to_string()
220            })
221            .await
222        })
223    }
224
225    fn get_stream(&self) -> BoxFuture<'_, OSResult<ByteStream>> {
226        let get_request = Arc::new(GetRequest {
227            object_store: self.object_store.clone(),
228            path: self.path.clone(),
229            options: GetOptions::default(),
230        });
231        Box::pin(async move {
232            let get_request_clone = get_request.clone();
233            let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
234            Ok(get_result.into_stream())
235        })
236    }
237
238    fn get_range_stream(&self, range: Range<usize>) -> BoxFuture<'_, OSResult<ByteStream>> {
239        let get_request = Arc::new(GetRequest {
240            object_store: self.object_store.clone(),
241            path: self.path.clone(),
242            options: GetOptions {
243                range: Some(
244                    Range {
245                        start: range.start as u64,
246                        end: range.end as u64,
247                    }
248                    .into(),
249                ),
250                ..Default::default()
251            },
252        });
253        Box::pin(async move {
254            let get_request_clone = get_request.clone();
255            let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
256            Ok(get_result.into_stream())
257        })
258    }
259}
260
261#[derive(Debug)]
262pub struct SmallReaderInner {
263    path: Path,
264    size: usize,
265    state: std::sync::Mutex<SmallReaderState>,
266}
267
268/// A reader for a file so small, we just eagerly read it all into memory.
269///
270/// When created, it represents a future that will read the whole file into memory.
271///
272/// On the first read call, it will start the read. Multiple threads can call read at the same time.
273///
274/// Once the read is complete, any thread can call read again to get the result.
275#[derive(Clone, Debug)]
276pub struct SmallReader {
277    inner: Arc<SmallReaderInner>,
278}
279
280enum SmallReaderState {
281    Loading(Shared<BoxFuture<'static, std::result::Result<Bytes, CloneableError>>>),
282    Finished(std::result::Result<Bytes, CloneableError>),
283}
284
285impl std::fmt::Debug for SmallReaderState {
286    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287        match self {
288            Self::Loading(_) => write!(f, "Loading"),
289            Self::Finished(Ok(data)) => {
290                write!(f, "Finished({} bytes)", data.len())
291            }
292            Self::Finished(Err(err)) => {
293                write!(f, "Finished({})", err.0)
294            }
295        }
296    }
297}
298
299impl SmallReader {
300    pub fn new(
301        store: Arc<dyn ObjectStore>,
302        path: Path,
303        download_retry_count: usize,
304        size: usize,
305    ) -> Self {
306        let path_ref = path.clone();
307        let state = SmallReaderState::Loading(
308            Box::pin(async move {
309                let object_reader =
310                    CloudObjectReader::new(store, path_ref, 0, None, download_retry_count)
311                        .map_err(CloneableError)?;
312                object_reader
313                    .get_all()
314                    .await
315                    .map_err(|err| CloneableError(Error::from(err)))
316            })
317            .boxed()
318            .shared(),
319        );
320        Self {
321            inner: Arc::new(SmallReaderInner {
322                path,
323                size,
324                state: std::sync::Mutex::new(state),
325            }),
326        }
327    }
328}
329
330impl SmallReaderInner {
331    async fn wait(&self) -> OSResult<Bytes> {
332        let future = {
333            let state = self.state.lock().unwrap();
334            match &*state {
335                SmallReaderState::Loading(future) => future.clone(),
336                SmallReaderState::Finished(result) => {
337                    return result.clone().map_err(|err| err.0.into());
338                }
339            }
340        };
341
342        let result = future.await;
343        let result_to_return = result.clone().map_err(|err| err.0.into());
344        let mut state = self.state.lock().unwrap();
345        if matches!(*state, SmallReaderState::Loading(_)) {
346            *state = SmallReaderState::Finished(result);
347        }
348        result_to_return
349    }
350}
351
352impl Reader for SmallReader {
353    fn path(&self) -> &Path {
354        &self.inner.path
355    }
356
357    fn block_size(&self) -> usize {
358        64 * 1024
359    }
360
361    fn io_parallelism(&self) -> usize {
362        1024
363    }
364
365    /// Object/File Size.
366    fn size(&self) -> BoxFuture<'_, OSResult<usize>> {
367        let size = self.inner.size;
368        Box::pin(async move { Ok(size) })
369    }
370
371    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
372        let inner = self.inner.clone();
373        Box::pin(async move {
374            let bytes = inner.wait().await?;
375            let start = range.start;
376            let end = range.end;
377            if start >= bytes.len() || end > bytes.len() {
378                return Err(object_store::Error::Generic {
379                    store: "memory",
380                    source: format!(
381                        "Invalid range {}..{} for object of size {} bytes",
382                        start,
383                        end,
384                        bytes.len()
385                    )
386                    .into(),
387                });
388            }
389            Ok(bytes.slice(range))
390        })
391    }
392
393    fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
394        Box::pin(async move { self.inner.wait().await })
395    }
396}
397
398pub(crate) fn stream_local_range(
399    file: Arc<File>,
400    path: Path,
401    io_tracker: Arc<crate::utils::tracking_store::IOTracker>,
402    range: Range<usize>,
403    chunk_size: usize,
404) -> ByteStream {
405    stream::try_unfold(
406        (file, path, io_tracker, range.start, range.end),
407        move |state| async move {
408            let (file, path, io_tracker, start, end) = state;
409            if start >= end {
410                return Ok(None);
411            }
412
413            let next = (start + chunk_size).min(end);
414            let file_clone = file.clone();
415            let path_clone = path.clone();
416            let bytes = tokio::task::spawn_blocking(move || {
417                let mut buf = bytes::BytesMut::with_capacity(next - start);
418                // Safety: buffer capacity matches the exact number of bytes we read below.
419                unsafe { buf.set_len(next - start) };
420                #[cfg(unix)]
421                file_clone.read_exact_at(buf.as_mut(), start as u64)?;
422                #[cfg(windows)]
423                read_exact_at(file_clone, buf.as_mut(), start as u64)?;
424                Ok::<_, std::io::Error>(buf.freeze())
425            })
426            .await?
427            .map_err(|err: std::io::Error| object_store::Error::Generic {
428                store: "LocalFileSystem",
429                source: err.into(),
430            })?;
431
432            io_tracker.record_read(
433                "get_range_stream",
434                path_clone,
435                (next - start) as u64,
436                Some(start as u64..next as u64),
437            );
438
439            Ok(Some((bytes, (file, path, io_tracker, next, end))))
440        },
441    )
442    .boxed()
443}
444
445impl DeepSizeOf for SmallReader {
446    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
447        let mut size = self.inner.path.as_ref().deep_size_of_children(context);
448
449        if let Ok(guard) = self.inner.state.try_lock()
450            && let SmallReaderState::Finished(Ok(data)) = &*guard
451        {
452            size += data.len();
453        }
454
455        size
456    }
457}