Skip to main content

lance_io/
object_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::fs::File;
5use std::ops::Range;
6use std::sync::Arc;
7
8#[cfg(windows)]
9use crate::local::read_exact_at;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12
13use bytes::Bytes;
14use futures::{
15    FutureExt,
16    future::{BoxFuture, Shared},
17    stream::{self, StreamExt},
18};
19use lance_core::deepsize::DeepSizeOf;
20use lance_core::{Error, Result, error::CloneableError};
21use object_store::ObjectStoreExt;
22use object_store::{GetOptions, GetResult, ObjectStore, Result as OSResult, path::Path};
23use tokio::sync::OnceCell;
24use tracing::instrument;
25
26use crate::{
27    object_store::DEFAULT_CLOUD_IO_PARALLELISM,
28    traits::{ByteStream, Reader},
29};
30
31trait StaticGetRange {
32    fn path(&self) -> &Path;
33    fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>>;
34}
35
36/// A wrapper around an object store and a path that implements a static
37/// get_range method by assuming self is stored in an Arc.
38struct GetRequest {
39    object_store: Arc<dyn ObjectStore>,
40    path: Path,
41    options: GetOptions,
42}
43
44impl StaticGetRange for Arc<GetRequest> {
45    fn path(&self) -> &Path {
46        &self.path
47    }
48
49    fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>> {
50        let store_and_path = self.clone();
51        Box::pin(async move {
52            store_and_path
53                .object_store
54                .get_opts(&store_and_path.path, store_and_path.options.clone())
55                .await
56        })
57    }
58}
59
60/// Object Reader
61///
62/// Object Store + Base Path
63#[derive(Debug)]
64pub struct CloudObjectReader {
65    // Object Store.
66    pub object_store: Arc<dyn ObjectStore>,
67    // File path
68    pub path: Path,
69    // File size, if known.
70    size: OnceCell<usize>,
71
72    block_size: usize,
73    download_retry_count: usize,
74}
75
76impl DeepSizeOf for CloudObjectReader {
77    fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
78        // Skipping object_store because there is no easy way to do that and it shouldn't be too big
79        self.path.as_ref().deep_size_of_children(context)
80    }
81}
82
83impl CloudObjectReader {
84    /// Create an ObjectReader from URI
85    pub fn new(
86        object_store: Arc<dyn ObjectStore>,
87        path: Path,
88        block_size: usize,
89        known_size: Option<usize>,
90        download_retry_count: usize,
91    ) -> Result<Self> {
92        Ok(Self {
93            object_store,
94            path,
95            size: OnceCell::new_with(known_size),
96            block_size,
97            download_retry_count,
98        })
99    }
100}
101
102// Retries for the initial request are handled by object store, but
103// there are no retries for failures that occur during the streaming
104// of the response body. Thus we add an outer retry loop here.
105async fn do_with_retry<'a, O>(f: impl Fn() -> BoxFuture<'a, OSResult<O>> + Clone) -> OSResult<O> {
106    let mut retries = 3;
107    loop {
108        let f = f.clone();
109        match f().await {
110            Ok(val) => return Ok(val),
111            Err(err) => {
112                if retries == 0 {
113                    return Err(err);
114                }
115                retries -= 1;
116            }
117        }
118    }
119}
120
121// We have a separate retry loop here.  This is because object_store does not
122// attempt retries on downloads that fail during streaming of the response body.
123//
124// However, this failure is pretty common (e.g. timeout) and we want to retry in these
125// situations.  In addition, we provide additional logging information in these
126// failures cases.
127async fn do_get_with_outer_retry(
128    download_retry_count: usize,
129    get_request: Arc<GetRequest>,
130    desc: impl Fn() -> String,
131) -> OSResult<Bytes> {
132    let mut retries = download_retry_count;
133    loop {
134        let get_request_clone = get_request.clone();
135        let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
136        match get_result.bytes().await {
137            Ok(bytes) => return Ok(bytes),
138            Err(err) => {
139                if retries == 0 {
140                    log::warn!(
141                        "Failed to download {} from {} after {} attempts.  This may indicate that cloud storage is overloaded or your timeout settings are too restrictive.  Error details: {:?}",
142                        desc(),
143                        get_request.path(),
144                        download_retry_count,
145                        err
146                    );
147                    return Err(err);
148                }
149                log::debug!(
150                    "Retrying {} from {} (remaining retries: {}).  Error details: {:?}",
151                    desc(),
152                    get_request.path(),
153                    retries,
154                    err
155                );
156                retries -= 1;
157            }
158        }
159    }
160}
161
162impl Reader for CloudObjectReader {
163    fn path(&self) -> &Path {
164        &self.path
165    }
166
167    fn block_size(&self) -> usize {
168        self.block_size
169    }
170
171    fn io_parallelism(&self) -> usize {
172        DEFAULT_CLOUD_IO_PARALLELISM
173    }
174
175    /// Object/File Size.
176    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
177        Box::pin(async move {
178            self.size
179                .get_or_try_init(|| async move {
180                    let meta =
181                        do_with_retry(|| Box::pin(self.object_store.head(&self.path))).await?;
182                    Ok(meta.size as usize)
183                })
184                .await
185                .cloned()
186        })
187    }
188
189    #[instrument(level = "debug", skip(self))]
190    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
191        let object_store = self.object_store.clone();
192        let path = self.path.clone();
193        let get_range = Range {
194            start: range.start as u64,
195            end: range.end as u64,
196        };
197        Box::pin(async move {
198            let bytes = do_with_retry(move || {
199                let object_store = object_store.clone();
200                let path = path.clone();
201                let get_range = get_range.clone();
202                Box::pin(async move { object_store.get_ranges(&path, &[get_range]).await })
203            })
204            .await?;
205
206            bytes
207                .into_iter()
208                .next()
209                .ok_or_else(|| object_store::Error::Generic {
210                    store: "CloudObjectReader",
211                    source: "get_ranges returned no bytes".into(),
212                })
213        })
214    }
215
216    #[instrument(level = "debug", skip_all)]
217    fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
218        let get_request = Arc::new(GetRequest {
219            object_store: self.object_store.clone(),
220            path: self.path.clone(),
221            options: GetOptions::default(),
222        });
223        Box::pin(async move {
224            do_get_with_outer_retry(self.download_retry_count, get_request, || {
225                "read_all".to_string()
226            })
227            .await
228        })
229    }
230
231    fn get_stream(&self) -> BoxFuture<'_, OSResult<ByteStream>> {
232        let get_request = Arc::new(GetRequest {
233            object_store: self.object_store.clone(),
234            path: self.path.clone(),
235            options: GetOptions::default(),
236        });
237        Box::pin(async move {
238            let get_request_clone = get_request.clone();
239            let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
240            Ok(get_result.into_stream())
241        })
242    }
243
244    fn get_range_stream(&self, range: Range<usize>) -> BoxFuture<'_, OSResult<ByteStream>> {
245        let get_request = Arc::new(GetRequest {
246            object_store: self.object_store.clone(),
247            path: self.path.clone(),
248            options: GetOptions {
249                range: Some(
250                    Range {
251                        start: range.start as u64,
252                        end: range.end as u64,
253                    }
254                    .into(),
255                ),
256                ..Default::default()
257            },
258        });
259        Box::pin(async move {
260            let get_request_clone = get_request.clone();
261            let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
262            Ok(get_result.into_stream())
263        })
264    }
265}
266
267#[derive(Debug)]
268pub struct SmallReaderInner {
269    path: Path,
270    size: usize,
271    state: std::sync::Mutex<SmallReaderState>,
272}
273
274/// A reader for a file so small, we just eagerly read it all into memory.
275///
276/// When created, it represents a future that will read the whole file into memory.
277///
278/// On the first read call, it will start the read. Multiple threads can call read at the same time.
279///
280/// Once the read is complete, any thread can call read again to get the result.
281#[derive(Clone, Debug)]
282pub struct SmallReader {
283    inner: Arc<SmallReaderInner>,
284}
285
286enum SmallReaderState {
287    Loading(Shared<BoxFuture<'static, std::result::Result<Bytes, CloneableError>>>),
288    Finished(std::result::Result<Bytes, CloneableError>),
289}
290
291impl std::fmt::Debug for SmallReaderState {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        match self {
294            Self::Loading(_) => write!(f, "Loading"),
295            Self::Finished(Ok(data)) => {
296                write!(f, "Finished({} bytes)", data.len())
297            }
298            Self::Finished(Err(err)) => {
299                write!(f, "Finished({})", err.0)
300            }
301        }
302    }
303}
304
305impl SmallReader {
306    pub fn new(
307        store: Arc<dyn ObjectStore>,
308        path: Path,
309        download_retry_count: usize,
310        size: usize,
311    ) -> Self {
312        let path_ref = path.clone();
313        let state = SmallReaderState::Loading(
314            Box::pin(async move {
315                let object_reader =
316                    CloudObjectReader::new(store, path_ref, 0, None, download_retry_count)
317                        .map_err(CloneableError)?;
318                object_reader
319                    .get_all()
320                    .await
321                    .map_err(|err| CloneableError(Error::from(err)))
322            })
323            .boxed()
324            .shared(),
325        );
326        Self {
327            inner: Arc::new(SmallReaderInner {
328                path,
329                size,
330                state: std::sync::Mutex::new(state),
331            }),
332        }
333    }
334}
335
336impl SmallReaderInner {
337    async fn wait(&self) -> OSResult<Bytes> {
338        let future = {
339            let state = self.state.lock().unwrap();
340            match &*state {
341                SmallReaderState::Loading(future) => future.clone(),
342                SmallReaderState::Finished(result) => {
343                    return result.clone().map_err(|err| err.0.into());
344                }
345            }
346        };
347
348        let result = future.await;
349        let result_to_return = result.clone().map_err(|err| err.0.into());
350        let mut state = self.state.lock().unwrap();
351        if matches!(*state, SmallReaderState::Loading(_)) {
352            *state = SmallReaderState::Finished(result);
353        }
354        result_to_return
355    }
356}
357
358impl Reader for SmallReader {
359    fn path(&self) -> &Path {
360        &self.inner.path
361    }
362
363    fn block_size(&self) -> usize {
364        64 * 1024
365    }
366
367    fn io_parallelism(&self) -> usize {
368        1024
369    }
370
371    /// Object/File Size.
372    fn size(&self) -> BoxFuture<'_, OSResult<usize>> {
373        let size = self.inner.size;
374        Box::pin(async move { Ok(size) })
375    }
376
377    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
378        let inner = self.inner.clone();
379        Box::pin(async move {
380            let bytes = inner.wait().await?;
381            let start = range.start;
382            let end = range.end;
383            if start >= bytes.len() || end > bytes.len() {
384                return Err(object_store::Error::Generic {
385                    store: "memory",
386                    source: format!(
387                        "Invalid range {}..{} for object of size {} bytes",
388                        start,
389                        end,
390                        bytes.len()
391                    )
392                    .into(),
393                });
394            }
395            Ok(bytes.slice(range))
396        })
397    }
398
399    fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
400        Box::pin(async move { self.inner.wait().await })
401    }
402}
403
404pub(crate) fn stream_local_range(
405    file: Arc<File>,
406    path: Path,
407    io_tracker: Arc<crate::utils::tracking_store::IOTracker>,
408    range: Range<usize>,
409    chunk_size: usize,
410) -> ByteStream {
411    stream::try_unfold(
412        (file, path, io_tracker, range.start, range.end),
413        move |state| async move {
414            let (file, path, io_tracker, start, end) = state;
415            if start >= end {
416                return Ok(None);
417            }
418
419            let next = (start + chunk_size).min(end);
420            let file_clone = file.clone();
421            let path_clone = path.clone();
422            let bytes = tokio::task::spawn_blocking(move || {
423                let mut buf = bytes::BytesMut::with_capacity(next - start);
424                // Safety: buffer capacity matches the exact number of bytes we read below.
425                unsafe { buf.set_len(next - start) };
426                #[cfg(unix)]
427                file_clone.read_exact_at(buf.as_mut(), start as u64)?;
428                #[cfg(windows)]
429                read_exact_at(file_clone, buf.as_mut(), start as u64)?;
430                Ok::<_, std::io::Error>(buf.freeze())
431            })
432            .await?
433            .map_err(|err: std::io::Error| object_store::Error::Generic {
434                store: "LocalFileSystem",
435                source: err.into(),
436            })?;
437
438            io_tracker.record_read(
439                "get_range_stream",
440                path_clone,
441                (next - start) as u64,
442                Some(start as u64..next as u64),
443            );
444
445            Ok(Some((bytes, (file, path, io_tracker, next, end))))
446        },
447    )
448    .boxed()
449}
450
451impl DeepSizeOf for SmallReader {
452    fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
453        let mut size = self.inner.path.as_ref().deep_size_of_children(context);
454
455        if let Ok(guard) = self.inner.state.try_lock()
456            && let SmallReaderState::Finished(Ok(data)) = &*guard
457        {
458            size += data.len();
459        }
460
461        size
462    }
463}