Skip to main content

lance_io/
object_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use bytes::Bytes;
8use deepsize::DeepSizeOf;
9use futures::{
10    FutureExt,
11    future::{BoxFuture, Shared},
12};
13use lance_core::{Error, Result, error::CloneableError};
14use object_store::{GetOptions, GetResult, ObjectStore, Result as OSResult, path::Path};
15use tokio::sync::OnceCell;
16use tracing::instrument;
17
18use crate::{object_store::DEFAULT_CLOUD_IO_PARALLELISM, traits::Reader};
19
20trait StaticGetRange {
21    fn path(&self) -> &Path;
22    fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>>;
23}
24
25/// A wrapper around an object store and a path that implements a static
26/// get_range method by assuming self is stored in an Arc.
27struct GetRequest {
28    object_store: Arc<dyn ObjectStore>,
29    path: Path,
30    options: GetOptions,
31}
32
33impl StaticGetRange for Arc<GetRequest> {
34    fn path(&self) -> &Path {
35        &self.path
36    }
37
38    fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>> {
39        let store_and_path = self.clone();
40        Box::pin(async move {
41            store_and_path
42                .object_store
43                .get_opts(&store_and_path.path, store_and_path.options.clone())
44                .await
45        })
46    }
47}
48
49/// Object Reader
50///
51/// Object Store + Base Path
52#[derive(Debug)]
53pub struct CloudObjectReader {
54    // Object Store.
55    pub object_store: Arc<dyn ObjectStore>,
56    // File path
57    pub path: Path,
58    // File size, if known.
59    size: OnceCell<usize>,
60
61    block_size: usize,
62    download_retry_count: usize,
63}
64
65impl DeepSizeOf for CloudObjectReader {
66    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
67        // Skipping object_store because there is no easy way to do that and it shouldn't be too big
68        self.path.as_ref().deep_size_of_children(context)
69    }
70}
71
72impl CloudObjectReader {
73    /// Create an ObjectReader from URI
74    pub fn new(
75        object_store: Arc<dyn ObjectStore>,
76        path: Path,
77        block_size: usize,
78        known_size: Option<usize>,
79        download_retry_count: usize,
80    ) -> Result<Self> {
81        Ok(Self {
82            object_store,
83            path,
84            size: OnceCell::new_with(known_size),
85            block_size,
86            download_retry_count,
87        })
88    }
89}
90
91// Retries for the initial request are handled by object store, but
92// there are no retries for failures that occur during the streaming
93// of the response body. Thus we add an outer retry loop here.
94async fn do_with_retry<'a, O>(f: impl Fn() -> BoxFuture<'a, OSResult<O>> + Clone) -> OSResult<O> {
95    let mut retries = 3;
96    loop {
97        let f = f.clone();
98        match f().await {
99            Ok(val) => return Ok(val),
100            Err(err) => {
101                if retries == 0 {
102                    return Err(err);
103                }
104                retries -= 1;
105            }
106        }
107    }
108}
109
110// We have a separate retry loop here.  This is because object_store does not
111// attempt retries on downloads that fail during streaming of the response body.
112//
113// However, this failure is pretty common (e.g. timeout) and we want to retry in these
114// situations.  In addition, we provide additional logging information in these
115// failures cases.
116async fn do_get_with_outer_retry(
117    download_retry_count: usize,
118    get_request: Arc<GetRequest>,
119    desc: impl Fn() -> String,
120) -> OSResult<Bytes> {
121    let mut retries = download_retry_count;
122    loop {
123        let get_request_clone = get_request.clone();
124        let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
125        match get_result.bytes().await {
126            Ok(bytes) => return Ok(bytes),
127            Err(err) => {
128                if retries == 0 {
129                    log::warn!(
130                        "Failed to download {} from {} after {} attempts.  This may indicate that cloud storage is overloaded or your timeout settings are too restrictive.  Error details: {:?}",
131                        desc(),
132                        get_request.path(),
133                        download_retry_count,
134                        err
135                    );
136                    return Err(err);
137                }
138                log::debug!(
139                    "Retrying {} from {} (remaining retries: {}).  Error details: {:?}",
140                    desc(),
141                    get_request.path(),
142                    retries,
143                    err
144                );
145                retries -= 1;
146            }
147        }
148    }
149}
150
151impl Reader for CloudObjectReader {
152    fn path(&self) -> &Path {
153        &self.path
154    }
155
156    fn block_size(&self) -> usize {
157        self.block_size
158    }
159
160    fn io_parallelism(&self) -> usize {
161        DEFAULT_CLOUD_IO_PARALLELISM
162    }
163
164    /// Object/File Size.
165    fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
166        Box::pin(async move {
167            self.size
168                .get_or_try_init(|| async move {
169                    let meta = do_with_retry(|| self.object_store.head(&self.path)).await?;
170                    Ok(meta.size as usize)
171                })
172                .await
173                .cloned()
174        })
175    }
176
177    #[instrument(level = "debug", skip(self))]
178    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
179        let get_request = Arc::new(GetRequest {
180            object_store: self.object_store.clone(),
181            path: self.path.clone(),
182            options: GetOptions {
183                range: Some(
184                    Range {
185                        start: range.start as u64,
186                        end: range.end as u64,
187                    }
188                    .into(),
189                ),
190                ..Default::default()
191            },
192        });
193        Box::pin(do_get_with_outer_retry(
194            self.download_retry_count,
195            get_request,
196            move || format!("range {:?}", range),
197        ))
198    }
199
200    #[instrument(level = "debug", skip_all)]
201    fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
202        let get_request = Arc::new(GetRequest {
203            object_store: self.object_store.clone(),
204            path: self.path.clone(),
205            options: GetOptions::default(),
206        });
207        Box::pin(async move {
208            do_get_with_outer_retry(self.download_retry_count, get_request, || {
209                "read_all".to_string()
210            })
211            .await
212        })
213    }
214}
215
216#[derive(Debug)]
217pub struct SmallReaderInner {
218    path: Path,
219    size: usize,
220    state: std::sync::Mutex<SmallReaderState>,
221}
222
223/// A reader for a file so small, we just eagerly read it all into memory.
224///
225/// When created, it represents a future that will read the whole file into memory.
226///
227/// On the first read call, it will start the read. Multiple threads can call read at the same time.
228///
229/// Once the read is complete, any thread can call read again to get the result.
230#[derive(Clone, Debug)]
231pub struct SmallReader {
232    inner: Arc<SmallReaderInner>,
233}
234
235enum SmallReaderState {
236    Loading(Shared<BoxFuture<'static, std::result::Result<Bytes, CloneableError>>>),
237    Finished(std::result::Result<Bytes, CloneableError>),
238}
239
240impl std::fmt::Debug for SmallReaderState {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        match self {
243            Self::Loading(_) => write!(f, "Loading"),
244            Self::Finished(Ok(data)) => {
245                write!(f, "Finished({} bytes)", data.len())
246            }
247            Self::Finished(Err(err)) => {
248                write!(f, "Finished({})", err.0)
249            }
250        }
251    }
252}
253
254impl SmallReader {
255    pub fn new(
256        store: Arc<dyn ObjectStore>,
257        path: Path,
258        download_retry_count: usize,
259        size: usize,
260    ) -> Self {
261        let path_ref = path.clone();
262        let state = SmallReaderState::Loading(
263            Box::pin(async move {
264                let object_reader =
265                    CloudObjectReader::new(store, path_ref, 0, None, download_retry_count)
266                        .map_err(CloneableError)?;
267                object_reader
268                    .get_all()
269                    .await
270                    .map_err(|err| CloneableError(Error::from(err)))
271            })
272            .boxed()
273            .shared(),
274        );
275        Self {
276            inner: Arc::new(SmallReaderInner {
277                path,
278                size,
279                state: std::sync::Mutex::new(state),
280            }),
281        }
282    }
283}
284
285impl SmallReaderInner {
286    async fn wait(&self) -> OSResult<Bytes> {
287        let future = {
288            let state = self.state.lock().unwrap();
289            match &*state {
290                SmallReaderState::Loading(future) => future.clone(),
291                SmallReaderState::Finished(result) => {
292                    return result.clone().map_err(|err| err.0.into());
293                }
294            }
295        };
296
297        let result = future.await;
298        let result_to_return = result.clone().map_err(|err| err.0.into());
299        let mut state = self.state.lock().unwrap();
300        if matches!(*state, SmallReaderState::Loading(_)) {
301            *state = SmallReaderState::Finished(result);
302        }
303        result_to_return
304    }
305}
306
307impl Reader for SmallReader {
308    fn path(&self) -> &Path {
309        &self.inner.path
310    }
311
312    fn block_size(&self) -> usize {
313        64 * 1024
314    }
315
316    fn io_parallelism(&self) -> usize {
317        1024
318    }
319
320    /// Object/File Size.
321    fn size(&self) -> BoxFuture<'_, OSResult<usize>> {
322        let size = self.inner.size;
323        Box::pin(async move { Ok(size) })
324    }
325
326    fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
327        let inner = self.inner.clone();
328        Box::pin(async move {
329            let bytes = inner.wait().await?;
330            let start = range.start;
331            let end = range.end;
332            if start >= bytes.len() || end > bytes.len() {
333                return Err(object_store::Error::Generic {
334                    store: "memory",
335                    source: format!(
336                        "Invalid range {}..{} for object of size {} bytes",
337                        start,
338                        end,
339                        bytes.len()
340                    )
341                    .into(),
342                });
343            }
344            Ok(bytes.slice(range))
345        })
346    }
347
348    fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
349        Box::pin(async move { self.inner.wait().await })
350    }
351}
352
353impl DeepSizeOf for SmallReader {
354    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
355        let mut size = self.inner.path.as_ref().deep_size_of_children(context);
356
357        if let Ok(guard) = self.inner.state.try_lock()
358            && let SmallReaderState::Finished(Ok(data)) = &*guard
359        {
360            size += data.len();
361        }
362
363        size
364    }
365}