1use std::fs::File;
5use std::ops::Range;
6use std::sync::Arc;
7
8#[cfg(windows)]
9use crate::local::read_exact_at;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12
13use bytes::Bytes;
14use deepsize::DeepSizeOf;
15use futures::{
16 FutureExt,
17 future::{BoxFuture, Shared},
18 stream::{self, StreamExt},
19};
20use lance_core::{Error, Result, error::CloneableError};
21use object_store::ObjectStoreExt;
22use object_store::{GetOptions, GetResult, ObjectStore, Result as OSResult, path::Path};
23use tokio::sync::OnceCell;
24use tracing::instrument;
25
26use crate::{
27 object_store::DEFAULT_CLOUD_IO_PARALLELISM,
28 traits::{ByteStream, Reader},
29};
30
31trait StaticGetRange {
32 fn path(&self) -> &Path;
33 fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>>;
34}
35
36struct GetRequest {
39 object_store: Arc<dyn ObjectStore>,
40 path: Path,
41 options: GetOptions,
42}
43
44impl StaticGetRange for Arc<GetRequest> {
45 fn path(&self) -> &Path {
46 &self.path
47 }
48
49 fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>> {
50 let store_and_path = self.clone();
51 Box::pin(async move {
52 store_and_path
53 .object_store
54 .get_opts(&store_and_path.path, store_and_path.options.clone())
55 .await
56 })
57 }
58}
59
60#[derive(Debug)]
64pub struct CloudObjectReader {
65 pub object_store: Arc<dyn ObjectStore>,
67 pub path: Path,
69 size: OnceCell<usize>,
71
72 block_size: usize,
73 download_retry_count: usize,
74}
75
76impl DeepSizeOf for CloudObjectReader {
77 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
78 self.path.as_ref().deep_size_of_children(context)
80 }
81}
82
83impl CloudObjectReader {
84 pub fn new(
86 object_store: Arc<dyn ObjectStore>,
87 path: Path,
88 block_size: usize,
89 known_size: Option<usize>,
90 download_retry_count: usize,
91 ) -> Result<Self> {
92 Ok(Self {
93 object_store,
94 path,
95 size: OnceCell::new_with(known_size),
96 block_size,
97 download_retry_count,
98 })
99 }
100}
101
102async fn do_with_retry<'a, O>(f: impl Fn() -> BoxFuture<'a, OSResult<O>> + Clone) -> OSResult<O> {
106 let mut retries = 3;
107 loop {
108 let f = f.clone();
109 match f().await {
110 Ok(val) => return Ok(val),
111 Err(err) => {
112 if retries == 0 {
113 return Err(err);
114 }
115 retries -= 1;
116 }
117 }
118 }
119}
120
121async fn do_get_with_outer_retry(
128 download_retry_count: usize,
129 get_request: Arc<GetRequest>,
130 desc: impl Fn() -> String,
131) -> OSResult<Bytes> {
132 let mut retries = download_retry_count;
133 loop {
134 let get_request_clone = get_request.clone();
135 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
136 match get_result.bytes().await {
137 Ok(bytes) => return Ok(bytes),
138 Err(err) => {
139 if retries == 0 {
140 log::warn!(
141 "Failed to download {} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}",
142 desc(),
143 get_request.path(),
144 download_retry_count,
145 err
146 );
147 return Err(err);
148 }
149 log::debug!(
150 "Retrying {} from {} (remaining retries: {}). Error details: {:?}",
151 desc(),
152 get_request.path(),
153 retries,
154 err
155 );
156 retries -= 1;
157 }
158 }
159 }
160}
161
162impl Reader for CloudObjectReader {
163 fn path(&self) -> &Path {
164 &self.path
165 }
166
167 fn block_size(&self) -> usize {
168 self.block_size
169 }
170
171 fn io_parallelism(&self) -> usize {
172 DEFAULT_CLOUD_IO_PARALLELISM
173 }
174
175 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
177 Box::pin(async move {
178 self.size
179 .get_or_try_init(|| async move {
180 let meta =
181 do_with_retry(|| Box::pin(self.object_store.head(&self.path))).await?;
182 Ok(meta.size as usize)
183 })
184 .await
185 .cloned()
186 })
187 }
188
189 #[instrument(level = "debug", skip(self))]
190 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
191 let get_request = Arc::new(GetRequest {
192 object_store: self.object_store.clone(),
193 path: self.path.clone(),
194 options: GetOptions {
195 range: Some(
196 Range {
197 start: range.start as u64,
198 end: range.end as u64,
199 }
200 .into(),
201 ),
202 ..Default::default()
203 },
204 });
205 Box::pin(do_get_with_outer_retry(
206 self.download_retry_count,
207 get_request,
208 move || format!("range {:?}", range),
209 ))
210 }
211
212 #[instrument(level = "debug", skip_all)]
213 fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
214 let get_request = Arc::new(GetRequest {
215 object_store: self.object_store.clone(),
216 path: self.path.clone(),
217 options: GetOptions::default(),
218 });
219 Box::pin(async move {
220 do_get_with_outer_retry(self.download_retry_count, get_request, || {
221 "read_all".to_string()
222 })
223 .await
224 })
225 }
226
227 fn get_stream(&self) -> BoxFuture<'_, OSResult<ByteStream>> {
228 let get_request = Arc::new(GetRequest {
229 object_store: self.object_store.clone(),
230 path: self.path.clone(),
231 options: GetOptions::default(),
232 });
233 Box::pin(async move {
234 let get_request_clone = get_request.clone();
235 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
236 Ok(get_result.into_stream())
237 })
238 }
239
240 fn get_range_stream(&self, range: Range<usize>) -> BoxFuture<'_, OSResult<ByteStream>> {
241 let get_request = Arc::new(GetRequest {
242 object_store: self.object_store.clone(),
243 path: self.path.clone(),
244 options: GetOptions {
245 range: Some(
246 Range {
247 start: range.start as u64,
248 end: range.end as u64,
249 }
250 .into(),
251 ),
252 ..Default::default()
253 },
254 });
255 Box::pin(async move {
256 let get_request_clone = get_request.clone();
257 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
258 Ok(get_result.into_stream())
259 })
260 }
261}
262
263#[derive(Debug)]
264pub struct SmallReaderInner {
265 path: Path,
266 size: usize,
267 state: std::sync::Mutex<SmallReaderState>,
268}
269
270#[derive(Clone, Debug)]
278pub struct SmallReader {
279 inner: Arc<SmallReaderInner>,
280}
281
282enum SmallReaderState {
283 Loading(Shared<BoxFuture<'static, std::result::Result<Bytes, CloneableError>>>),
284 Finished(std::result::Result<Bytes, CloneableError>),
285}
286
287impl std::fmt::Debug for SmallReaderState {
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 match self {
290 Self::Loading(_) => write!(f, "Loading"),
291 Self::Finished(Ok(data)) => {
292 write!(f, "Finished({} bytes)", data.len())
293 }
294 Self::Finished(Err(err)) => {
295 write!(f, "Finished({})", err.0)
296 }
297 }
298 }
299}
300
301impl SmallReader {
302 pub fn new(
303 store: Arc<dyn ObjectStore>,
304 path: Path,
305 download_retry_count: usize,
306 size: usize,
307 ) -> Self {
308 let path_ref = path.clone();
309 let state = SmallReaderState::Loading(
310 Box::pin(async move {
311 let object_reader =
312 CloudObjectReader::new(store, path_ref, 0, None, download_retry_count)
313 .map_err(CloneableError)?;
314 object_reader
315 .get_all()
316 .await
317 .map_err(|err| CloneableError(Error::from(err)))
318 })
319 .boxed()
320 .shared(),
321 );
322 Self {
323 inner: Arc::new(SmallReaderInner {
324 path,
325 size,
326 state: std::sync::Mutex::new(state),
327 }),
328 }
329 }
330}
331
332impl SmallReaderInner {
333 async fn wait(&self) -> OSResult<Bytes> {
334 let future = {
335 let state = self.state.lock().unwrap();
336 match &*state {
337 SmallReaderState::Loading(future) => future.clone(),
338 SmallReaderState::Finished(result) => {
339 return result.clone().map_err(|err| err.0.into());
340 }
341 }
342 };
343
344 let result = future.await;
345 let result_to_return = result.clone().map_err(|err| err.0.into());
346 let mut state = self.state.lock().unwrap();
347 if matches!(*state, SmallReaderState::Loading(_)) {
348 *state = SmallReaderState::Finished(result);
349 }
350 result_to_return
351 }
352}
353
354impl Reader for SmallReader {
355 fn path(&self) -> &Path {
356 &self.inner.path
357 }
358
359 fn block_size(&self) -> usize {
360 64 * 1024
361 }
362
363 fn io_parallelism(&self) -> usize {
364 1024
365 }
366
367 fn size(&self) -> BoxFuture<'_, OSResult<usize>> {
369 let size = self.inner.size;
370 Box::pin(async move { Ok(size) })
371 }
372
373 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
374 let inner = self.inner.clone();
375 Box::pin(async move {
376 let bytes = inner.wait().await?;
377 let start = range.start;
378 let end = range.end;
379 if start >= bytes.len() || end > bytes.len() {
380 return Err(object_store::Error::Generic {
381 store: "memory",
382 source: format!(
383 "Invalid range {}..{} for object of size {} bytes",
384 start,
385 end,
386 bytes.len()
387 )
388 .into(),
389 });
390 }
391 Ok(bytes.slice(range))
392 })
393 }
394
395 fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
396 Box::pin(async move { self.inner.wait().await })
397 }
398}
399
400pub(crate) fn stream_local_range(
401 file: Arc<File>,
402 path: Path,
403 io_tracker: Arc<crate::utils::tracking_store::IOTracker>,
404 range: Range<usize>,
405 chunk_size: usize,
406) -> ByteStream {
407 stream::try_unfold(
408 (file, path, io_tracker, range.start, range.end),
409 move |state| async move {
410 let (file, path, io_tracker, start, end) = state;
411 if start >= end {
412 return Ok(None);
413 }
414
415 let next = (start + chunk_size).min(end);
416 let file_clone = file.clone();
417 let path_clone = path.clone();
418 let bytes = tokio::task::spawn_blocking(move || {
419 let mut buf = bytes::BytesMut::with_capacity(next - start);
420 unsafe { buf.set_len(next - start) };
422 #[cfg(unix)]
423 file_clone.read_exact_at(buf.as_mut(), start as u64)?;
424 #[cfg(windows)]
425 read_exact_at(file_clone, buf.as_mut(), start as u64)?;
426 Ok::<_, std::io::Error>(buf.freeze())
427 })
428 .await?
429 .map_err(|err: std::io::Error| object_store::Error::Generic {
430 store: "LocalFileSystem",
431 source: err.into(),
432 })?;
433
434 io_tracker.record_read(
435 "get_range_stream",
436 path_clone,
437 (next - start) as u64,
438 Some(start as u64..next as u64),
439 );
440
441 Ok(Some((bytes, (file, path, io_tracker, next, end))))
442 },
443 )
444 .boxed()
445}
446
447impl DeepSizeOf for SmallReader {
448 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
449 let mut size = self.inner.path.as_ref().deep_size_of_children(context);
450
451 if let Ok(guard) = self.inner.state.try_lock()
452 && let SmallReaderState::Finished(Ok(data)) = &*guard
453 {
454 size += data.len();
455 }
456
457 size
458 }
459}