1use std::fs::File;
5use std::ops::Range;
6use std::sync::Arc;
7
8#[cfg(windows)]
9use crate::local::read_exact_at;
10#[cfg(unix)]
11use std::os::unix::fs::FileExt;
12
13use bytes::Bytes;
14use deepsize::DeepSizeOf;
15use futures::{
16 FutureExt,
17 future::{BoxFuture, Shared},
18 stream::{self, StreamExt},
19};
20use lance_core::{Error, Result, error::CloneableError};
21use object_store::{GetOptions, GetResult, ObjectStore, Result as OSResult, path::Path};
22use tokio::sync::OnceCell;
23use tracing::instrument;
24
25use crate::{
26 object_store::DEFAULT_CLOUD_IO_PARALLELISM,
27 traits::{ByteStream, Reader},
28};
29
30trait StaticGetRange {
31 fn path(&self) -> &Path;
32 fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>>;
33}
34
35struct GetRequest {
38 object_store: Arc<dyn ObjectStore>,
39 path: Path,
40 options: GetOptions,
41}
42
43impl StaticGetRange for Arc<GetRequest> {
44 fn path(&self) -> &Path {
45 &self.path
46 }
47
48 fn get_range(&self) -> BoxFuture<'static, OSResult<GetResult>> {
49 let store_and_path = self.clone();
50 Box::pin(async move {
51 store_and_path
52 .object_store
53 .get_opts(&store_and_path.path, store_and_path.options.clone())
54 .await
55 })
56 }
57}
58
59#[derive(Debug)]
63pub struct CloudObjectReader {
64 pub object_store: Arc<dyn ObjectStore>,
66 pub path: Path,
68 size: OnceCell<usize>,
70
71 block_size: usize,
72 download_retry_count: usize,
73}
74
75impl DeepSizeOf for CloudObjectReader {
76 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
77 self.path.as_ref().deep_size_of_children(context)
79 }
80}
81
82impl CloudObjectReader {
83 pub fn new(
85 object_store: Arc<dyn ObjectStore>,
86 path: Path,
87 block_size: usize,
88 known_size: Option<usize>,
89 download_retry_count: usize,
90 ) -> Result<Self> {
91 Ok(Self {
92 object_store,
93 path,
94 size: OnceCell::new_with(known_size),
95 block_size,
96 download_retry_count,
97 })
98 }
99}
100
101async fn do_with_retry<'a, O>(f: impl Fn() -> BoxFuture<'a, OSResult<O>> + Clone) -> OSResult<O> {
105 let mut retries = 3;
106 loop {
107 let f = f.clone();
108 match f().await {
109 Ok(val) => return Ok(val),
110 Err(err) => {
111 if retries == 0 {
112 return Err(err);
113 }
114 retries -= 1;
115 }
116 }
117 }
118}
119
120async fn do_get_with_outer_retry(
127 download_retry_count: usize,
128 get_request: Arc<GetRequest>,
129 desc: impl Fn() -> String,
130) -> OSResult<Bytes> {
131 let mut retries = download_retry_count;
132 loop {
133 let get_request_clone = get_request.clone();
134 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
135 match get_result.bytes().await {
136 Ok(bytes) => return Ok(bytes),
137 Err(err) => {
138 if retries == 0 {
139 log::warn!(
140 "Failed to download {} from {} after {} attempts. This may indicate that cloud storage is overloaded or your timeout settings are too restrictive. Error details: {:?}",
141 desc(),
142 get_request.path(),
143 download_retry_count,
144 err
145 );
146 return Err(err);
147 }
148 log::debug!(
149 "Retrying {} from {} (remaining retries: {}). Error details: {:?}",
150 desc(),
151 get_request.path(),
152 retries,
153 err
154 );
155 retries -= 1;
156 }
157 }
158 }
159}
160
161impl Reader for CloudObjectReader {
162 fn path(&self) -> &Path {
163 &self.path
164 }
165
166 fn block_size(&self) -> usize {
167 self.block_size
168 }
169
170 fn io_parallelism(&self) -> usize {
171 DEFAULT_CLOUD_IO_PARALLELISM
172 }
173
174 fn size(&self) -> BoxFuture<'_, object_store::Result<usize>> {
176 Box::pin(async move {
177 self.size
178 .get_or_try_init(|| async move {
179 let meta = do_with_retry(|| self.object_store.head(&self.path)).await?;
180 Ok(meta.size as usize)
181 })
182 .await
183 .cloned()
184 })
185 }
186
187 #[instrument(level = "debug", skip(self))]
188 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
189 let get_request = Arc::new(GetRequest {
190 object_store: self.object_store.clone(),
191 path: self.path.clone(),
192 options: GetOptions {
193 range: Some(
194 Range {
195 start: range.start as u64,
196 end: range.end as u64,
197 }
198 .into(),
199 ),
200 ..Default::default()
201 },
202 });
203 Box::pin(do_get_with_outer_retry(
204 self.download_retry_count,
205 get_request,
206 move || format!("range {:?}", range),
207 ))
208 }
209
210 #[instrument(level = "debug", skip_all)]
211 fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
212 let get_request = Arc::new(GetRequest {
213 object_store: self.object_store.clone(),
214 path: self.path.clone(),
215 options: GetOptions::default(),
216 });
217 Box::pin(async move {
218 do_get_with_outer_retry(self.download_retry_count, get_request, || {
219 "read_all".to_string()
220 })
221 .await
222 })
223 }
224
225 fn get_stream(&self) -> BoxFuture<'_, OSResult<ByteStream>> {
226 let get_request = Arc::new(GetRequest {
227 object_store: self.object_store.clone(),
228 path: self.path.clone(),
229 options: GetOptions::default(),
230 });
231 Box::pin(async move {
232 let get_request_clone = get_request.clone();
233 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
234 Ok(get_result.into_stream())
235 })
236 }
237
238 fn get_range_stream(&self, range: Range<usize>) -> BoxFuture<'_, OSResult<ByteStream>> {
239 let get_request = Arc::new(GetRequest {
240 object_store: self.object_store.clone(),
241 path: self.path.clone(),
242 options: GetOptions {
243 range: Some(
244 Range {
245 start: range.start as u64,
246 end: range.end as u64,
247 }
248 .into(),
249 ),
250 ..Default::default()
251 },
252 });
253 Box::pin(async move {
254 let get_request_clone = get_request.clone();
255 let get_result = do_with_retry(move || get_request_clone.get_range()).await?;
256 Ok(get_result.into_stream())
257 })
258 }
259}
260
261#[derive(Debug)]
262pub struct SmallReaderInner {
263 path: Path,
264 size: usize,
265 state: std::sync::Mutex<SmallReaderState>,
266}
267
268#[derive(Clone, Debug)]
276pub struct SmallReader {
277 inner: Arc<SmallReaderInner>,
278}
279
280enum SmallReaderState {
281 Loading(Shared<BoxFuture<'static, std::result::Result<Bytes, CloneableError>>>),
282 Finished(std::result::Result<Bytes, CloneableError>),
283}
284
285impl std::fmt::Debug for SmallReaderState {
286 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 match self {
288 Self::Loading(_) => write!(f, "Loading"),
289 Self::Finished(Ok(data)) => {
290 write!(f, "Finished({} bytes)", data.len())
291 }
292 Self::Finished(Err(err)) => {
293 write!(f, "Finished({})", err.0)
294 }
295 }
296 }
297}
298
299impl SmallReader {
300 pub fn new(
301 store: Arc<dyn ObjectStore>,
302 path: Path,
303 download_retry_count: usize,
304 size: usize,
305 ) -> Self {
306 let path_ref = path.clone();
307 let state = SmallReaderState::Loading(
308 Box::pin(async move {
309 let object_reader =
310 CloudObjectReader::new(store, path_ref, 0, None, download_retry_count)
311 .map_err(CloneableError)?;
312 object_reader
313 .get_all()
314 .await
315 .map_err(|err| CloneableError(Error::from(err)))
316 })
317 .boxed()
318 .shared(),
319 );
320 Self {
321 inner: Arc::new(SmallReaderInner {
322 path,
323 size,
324 state: std::sync::Mutex::new(state),
325 }),
326 }
327 }
328}
329
330impl SmallReaderInner {
331 async fn wait(&self) -> OSResult<Bytes> {
332 let future = {
333 let state = self.state.lock().unwrap();
334 match &*state {
335 SmallReaderState::Loading(future) => future.clone(),
336 SmallReaderState::Finished(result) => {
337 return result.clone().map_err(|err| err.0.into());
338 }
339 }
340 };
341
342 let result = future.await;
343 let result_to_return = result.clone().map_err(|err| err.0.into());
344 let mut state = self.state.lock().unwrap();
345 if matches!(*state, SmallReaderState::Loading(_)) {
346 *state = SmallReaderState::Finished(result);
347 }
348 result_to_return
349 }
350}
351
352impl Reader for SmallReader {
353 fn path(&self) -> &Path {
354 &self.inner.path
355 }
356
357 fn block_size(&self) -> usize {
358 64 * 1024
359 }
360
361 fn io_parallelism(&self) -> usize {
362 1024
363 }
364
365 fn size(&self) -> BoxFuture<'_, OSResult<usize>> {
367 let size = self.inner.size;
368 Box::pin(async move { Ok(size) })
369 }
370
371 fn get_range(&self, range: Range<usize>) -> BoxFuture<'static, OSResult<Bytes>> {
372 let inner = self.inner.clone();
373 Box::pin(async move {
374 let bytes = inner.wait().await?;
375 let start = range.start;
376 let end = range.end;
377 if start >= bytes.len() || end > bytes.len() {
378 return Err(object_store::Error::Generic {
379 store: "memory",
380 source: format!(
381 "Invalid range {}..{} for object of size {} bytes",
382 start,
383 end,
384 bytes.len()
385 )
386 .into(),
387 });
388 }
389 Ok(bytes.slice(range))
390 })
391 }
392
393 fn get_all(&self) -> BoxFuture<'_, OSResult<Bytes>> {
394 Box::pin(async move { self.inner.wait().await })
395 }
396}
397
398pub(crate) fn stream_local_range(
399 file: Arc<File>,
400 path: Path,
401 io_tracker: Arc<crate::utils::tracking_store::IOTracker>,
402 range: Range<usize>,
403 chunk_size: usize,
404) -> ByteStream {
405 stream::try_unfold(
406 (file, path, io_tracker, range.start, range.end),
407 move |state| async move {
408 let (file, path, io_tracker, start, end) = state;
409 if start >= end {
410 return Ok(None);
411 }
412
413 let next = (start + chunk_size).min(end);
414 let file_clone = file.clone();
415 let path_clone = path.clone();
416 let bytes = tokio::task::spawn_blocking(move || {
417 let mut buf = bytes::BytesMut::with_capacity(next - start);
418 unsafe { buf.set_len(next - start) };
420 #[cfg(unix)]
421 file_clone.read_exact_at(buf.as_mut(), start as u64)?;
422 #[cfg(windows)]
423 read_exact_at(file_clone, buf.as_mut(), start as u64)?;
424 Ok::<_, std::io::Error>(buf.freeze())
425 })
426 .await?
427 .map_err(|err: std::io::Error| object_store::Error::Generic {
428 store: "LocalFileSystem",
429 source: err.into(),
430 })?;
431
432 io_tracker.record_read(
433 "get_range_stream",
434 path_clone,
435 (next - start) as u64,
436 Some(start as u64..next as u64),
437 );
438
439 Ok(Some((bytes, (file, path, io_tracker, next, end))))
440 },
441 )
442 .boxed()
443}
444
445impl DeepSizeOf for SmallReader {
446 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
447 let mut size = self.inner.path.as_ref().deep_size_of_children(context);
448
449 if let Ok(guard) = self.inner.state.try_lock()
450 && let SmallReaderState::Finished(Ok(data)) = &*guard
451 {
452 size += data.len();
453 }
454
455 size
456 }
457}