lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43// Local disks tend to do fine with a few threads
44// Note: the number of threads here also impacts the number of files
45// we need to read in some situations.  So keeping this at 8 keeps the
46// RAM on our scanner down.
47pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48// Cloud disks often need many many threads to saturate the network
49pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
52#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
54
55lazy_static::lazy_static! {
56    pub static ref DEFAULT_MAX_IOP_SIZE: u64 = std::env::var("LANCE_MAX_IOP_SIZE")
57        .map(|val| val.parse().unwrap()).unwrap_or(16 * 1024 * 1024);
58}
59
60pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
61
62pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
63
64#[async_trait]
65pub trait ObjectStoreExt {
66    /// Returns true if the file exists.
67    async fn exists(&self, path: &Path) -> Result<bool>;
68
69    /// Read all files (start from base directory) recursively
70    ///
71    /// unmodified_since can be specified to only return files that have not been modified since the given time.
72    fn read_dir_all<'a, 'b>(
73        &'a self,
74        dir_path: impl Into<&'b Path> + Send,
75        unmodified_since: Option<DateTime<Utc>>,
76    ) -> BoxStream<'a, Result<ObjectMeta>>;
77}
78
79#[async_trait]
80impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
81    fn read_dir_all<'a, 'b>(
82        &'a self,
83        dir_path: impl Into<&'b Path> + Send,
84        unmodified_since: Option<DateTime<Utc>>,
85    ) -> BoxStream<'a, Result<ObjectMeta>> {
86        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
87        if let Some(unmodified_since_val) = unmodified_since {
88            output
89                .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
90                .boxed()
91        } else {
92            output.boxed()
93        }
94    }
95
96    async fn exists(&self, path: &Path) -> Result<bool> {
97        match self.head(path).await {
98            Ok(_) => Ok(true),
99            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
100            Err(e) => Err(e.into()),
101        }
102    }
103}
104
105/// Wraps [ObjectStore](object_store::ObjectStore)
106#[derive(Debug, Clone)]
107pub struct ObjectStore {
108    // Inner object store
109    pub inner: Arc<dyn OSObjectStore>,
110    scheme: String,
111    block_size: usize,
112    max_iop_size: u64,
113    /// Whether to use constant size upload parts for multipart uploads. This
114    /// is only necessary for Cloudflare R2.
115    pub use_constant_size_upload_parts: bool,
116    /// Whether we can assume that the list of files is lexically ordered. This
117    /// is true for object stores, but not for local filesystems.
118    pub list_is_lexically_ordered: bool,
119    io_parallelism: usize,
120    /// Number of times to retry a failed download
121    download_retry_count: usize,
122}
123
124impl DeepSizeOf for ObjectStore {
125    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
126        // We aren't counting `inner` here which is problematic but an ObjectStore
127        // shouldn't be too big.  The only exception might be the write cache but, if
128        // the writer cache has data, it means we're using it somewhere else that isn't
129        // a cache and so that doesn't really count.
130        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
131    }
132}
133
134impl std::fmt::Display for ObjectStore {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        write!(f, "ObjectStore({})", self.scheme)
137    }
138}
139
140pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
141    fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
142}
143
144#[derive(Debug, Clone)]
145pub struct ChainedWrappingObjectStore {
146    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
147}
148
149impl ChainedWrappingObjectStore {
150    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
151        Self { wrappers }
152    }
153
154    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
155        self.wrappers.push(wrapper);
156    }
157}
158
159impl WrappingObjectStore for ChainedWrappingObjectStore {
160    fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
161        self.wrappers
162            .iter()
163            .fold(original, |acc, wrapper| wrapper.wrap(acc))
164    }
165}
166
167/// Parameters to create an [ObjectStore]
168///
169#[derive(Debug, Clone)]
170pub struct ObjectStoreParams {
171    pub block_size: Option<usize>,
172    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
173    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
174    pub s3_credentials_refresh_offset: Duration,
175    #[cfg(feature = "aws")]
176    pub aws_credentials: Option<AwsCredentialProvider>,
177    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
178    pub storage_options: Option<HashMap<String, String>>,
179    /// Use constant size upload parts for multipart uploads. Only necessary
180    /// for Cloudflare R2, which doesn't support variable size parts. When this
181    /// is false, max upload size is 2.5TB. When this is true, the max size is
182    /// 50GB.
183    pub use_constant_size_upload_parts: bool,
184    pub list_is_lexically_ordered: Option<bool>,
185}
186
187impl Default for ObjectStoreParams {
188    fn default() -> Self {
189        #[allow(deprecated)]
190        Self {
191            object_store: None,
192            block_size: None,
193            s3_credentials_refresh_offset: Duration::from_secs(60),
194            #[cfg(feature = "aws")]
195            aws_credentials: None,
196            object_store_wrapper: None,
197            storage_options: None,
198            use_constant_size_upload_parts: false,
199            list_is_lexically_ordered: None,
200        }
201    }
202}
203
204// We implement hash for caching
205impl std::hash::Hash for ObjectStoreParams {
206    #[allow(deprecated)]
207    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
208        // For hashing, we use pointer values for ObjectStore, S3 credentials, and wrapper
209        self.block_size.hash(state);
210        if let Some((store, url)) = &self.object_store {
211            Arc::as_ptr(store).hash(state);
212            url.hash(state);
213        }
214        self.s3_credentials_refresh_offset.hash(state);
215        #[cfg(feature = "aws")]
216        if let Some(aws_credentials) = &self.aws_credentials {
217            Arc::as_ptr(aws_credentials).hash(state);
218        }
219        if let Some(wrapper) = &self.object_store_wrapper {
220            Arc::as_ptr(wrapper).hash(state);
221        }
222        if let Some(storage_options) = &self.storage_options {
223            for (key, value) in storage_options {
224                key.hash(state);
225                value.hash(state);
226            }
227        }
228        self.use_constant_size_upload_parts.hash(state);
229        self.list_is_lexically_ordered.hash(state);
230    }
231}
232
233// We implement eq for caching
234impl Eq for ObjectStoreParams {}
235impl PartialEq for ObjectStoreParams {
236    #[allow(deprecated)]
237    fn eq(&self, other: &Self) -> bool {
238        // For equality, we use pointer comparison for ObjectStore, S3 credentials, and wrapper
239        self.block_size == other.block_size
240            && self
241                .object_store
242                .as_ref()
243                .map(|(store, url)| (Arc::as_ptr(store), url))
244                == other
245                    .object_store
246                    .as_ref()
247                    .map(|(store, url)| (Arc::as_ptr(store), url))
248            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
249            && self.aws_credentials.as_ref().map(Arc::as_ptr)
250                == other.aws_credentials.as_ref().map(Arc::as_ptr)
251            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
252                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
253            && self.storage_options == other.storage_options
254            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
255            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
256    }
257}
258
259fn uri_to_url(uri: &str) -> Result<Url> {
260    match Url::parse(uri) {
261        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
262            // On Windows, the drive is parsed as a scheme
263            local_path_to_url(uri)
264        }
265        Ok(url) => Ok(url),
266        Err(_) => local_path_to_url(uri),
267    }
268}
269
270fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
271    let expanded = tilde(str_path.as_ref()).to_string();
272
273    let mut expanded_path = path_abs::PathAbs::new(expanded)
274        .unwrap()
275        .as_path()
276        .to_path_buf();
277    // path_abs::PathAbs::new(".") returns an empty string.
278    if let Some(s) = expanded_path.as_path().to_str() {
279        if s.is_empty() {
280            expanded_path = std::env::current_dir()?;
281        }
282    }
283
284    Ok(expanded_path)
285}
286
287fn local_path_to_url(str_path: &str) -> Result<Url> {
288    let expanded_path = expand_path(str_path)?;
289
290    Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
291        source: format!("Invalid table location: '{}'", str_path).into(),
292        location: location!(),
293    })
294}
295
296impl ObjectStore {
297    /// Parse from a string URI.
298    ///
299    /// Returns the ObjectStore instance and the absolute path to the object.
300    ///
301    /// This uses the default [ObjectStoreRegistry] to find the object store. To
302    /// allow for potential re-use of object store instances, it's recommended to
303    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
304    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
305        let registry = Arc::new(ObjectStoreRegistry::default());
306
307        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
308    }
309
310    /// Parse from a string URI.
311    ///
312    /// Returns the ObjectStore instance and the absolute path to the object.
313    pub async fn from_uri_and_params(
314        registry: Arc<ObjectStoreRegistry>,
315        uri: &str,
316        params: &ObjectStoreParams,
317    ) -> Result<(Arc<Self>, Path)> {
318        #[allow(deprecated)]
319        if let Some((store, path)) = params.object_store.as_ref() {
320            let mut inner = store.clone();
321            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
322                inner = wrapper.wrap(inner);
323            }
324            let store = Self {
325                inner,
326                scheme: path.scheme().to_string(),
327                block_size: params.block_size.unwrap_or(64 * 1024),
328                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
329                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
330                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
331                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
332                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
333            };
334            let path = Path::from(path.path());
335            return Ok((Arc::new(store), path));
336        }
337        let url = uri_to_url(uri)?;
338        let store = registry.get_store(url.clone(), params).await?;
339        // We know the scheme is valid if we got a store back.
340        let provider = registry.get_provider(url.scheme()).expect_ok()?;
341        let path = provider.extract_path(&url);
342
343        Ok((store, path))
344    }
345
346    #[deprecated(note = "Use `from_uri` instead")]
347    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
348        Self::from_uri_and_params(
349            Arc::new(ObjectStoreRegistry::default()),
350            str_path,
351            &Default::default(),
352        )
353        .now_or_never()
354        .unwrap()
355    }
356
357    /// Local object store.
358    pub fn local() -> Self {
359        let provider = FileStoreProvider;
360        provider
361            .new_store(Url::parse("file:///").unwrap(), &Default::default())
362            .now_or_never()
363            .unwrap()
364            .unwrap()
365    }
366
367    /// Create a in-memory object store directly for testing.
368    pub fn memory() -> Self {
369        let provider = MemoryStoreProvider;
370        provider
371            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
372            .now_or_never()
373            .unwrap()
374            .unwrap()
375    }
376
377    /// Returns true if the object store pointed to a local file system.
378    pub fn is_local(&self) -> bool {
379        self.scheme == "file"
380    }
381
382    pub fn is_cloud(&self) -> bool {
383        self.scheme != "file" && self.scheme != "memory"
384    }
385
386    pub fn scheme(&self) -> &str {
387        &self.scheme
388    }
389
390    pub fn block_size(&self) -> usize {
391        self.block_size
392    }
393
394    pub fn max_iop_size(&self) -> u64 {
395        self.max_iop_size
396    }
397
398    pub fn io_parallelism(&self) -> usize {
399        std::env::var("LANCE_IO_THREADS")
400            .map(|val| val.parse::<usize>().unwrap())
401            .unwrap_or(self.io_parallelism)
402    }
403
404    /// Open a file for path.
405    ///
406    /// Parameters
407    /// - ``path``: Absolute path to the file.
408    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
409        match self.scheme.as_str() {
410            "file" => LocalObjectReader::open(path, self.block_size, None).await,
411            _ => Ok(Box::new(CloudObjectReader::new(
412                self.inner.clone(),
413                path.clone(),
414                self.block_size,
415                None,
416                self.download_retry_count,
417            )?)),
418        }
419    }
420
421    /// Open a reader for a file with known size.
422    ///
423    /// This size may either have been retrieved from a list operation or
424    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
425    /// call.
426    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
427        // If we know the file is really small, we can read the whole thing
428        // as a single request.
429        if known_size <= self.block_size {
430            return Ok(Box::new(SmallReader::new(
431                self.inner.clone(),
432                path.clone(),
433                self.download_retry_count,
434                known_size,
435            )));
436        }
437
438        match self.scheme.as_str() {
439            "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
440            _ => Ok(Box::new(CloudObjectReader::new(
441                self.inner.clone(),
442                path.clone(),
443                self.block_size,
444                Some(known_size),
445                self.download_retry_count,
446            )?)),
447        }
448    }
449
450    /// Create an [ObjectWriter] from local [std::path::Path]
451    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
452        let object_store = Self::local();
453        let absolute_path = expand_path(path.to_string_lossy())?;
454        let os_path = Path::from_absolute_path(absolute_path)?;
455        object_store.create(&os_path).await
456    }
457
458    /// Open an [Reader] from local [std::path::Path]
459    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
460        let object_store = Self::local();
461        let absolute_path = expand_path(path.to_string_lossy())?;
462        let os_path = Path::from_absolute_path(absolute_path)?;
463        object_store.open(&os_path).await
464    }
465
466    /// Create a new file.
467    pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
468        ObjectWriter::new(self, path).await
469    }
470
471    /// A helper function to create a file and write content to it.
472    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
473        let mut writer = self.create(path).await?;
474        writer.write_all(content).await?;
475        writer.shutdown().await
476    }
477
478    pub async fn delete(&self, path: &Path) -> Result<()> {
479        self.inner.delete(path).await?;
480        Ok(())
481    }
482
483    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
484        Ok(self.inner.copy(from, to).await?)
485    }
486
487    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
488    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
489        let path = dir_path.into();
490        let path = Path::parse(&path)?;
491        let output = self.inner.list_with_delimiter(Some(&path)).await?;
492        Ok(output
493            .common_prefixes
494            .iter()
495            .chain(output.objects.iter().map(|o| &o.location))
496            .map(|s| s.filename().unwrap().to_string())
497            .collect())
498    }
499
500    pub fn list(
501        &self,
502        path: Option<Path>,
503    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
504        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
505    }
506
507    /// Read all files (start from base directory) recursively
508    ///
509    /// unmodified_since can be specified to only return files that have not been modified since the given time.
510    pub fn read_dir_all<'a, 'b>(
511        &'a self,
512        dir_path: impl Into<&'b Path> + Send,
513        unmodified_since: Option<DateTime<Utc>>,
514    ) -> BoxStream<'a, Result<ObjectMeta>> {
515        self.inner.read_dir_all(dir_path, unmodified_since)
516    }
517
518    /// Remove a directory recursively.
519    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
520        let path = dir_path.into();
521        let path = Path::parse(&path)?;
522
523        if self.is_local() {
524            // Local file system needs to delete directories as well.
525            return super::local::remove_dir_all(&path);
526        }
527        let sub_entries = self
528            .inner
529            .list(Some(&path))
530            .map(|m| m.map(|meta| meta.location))
531            .boxed();
532        self.inner
533            .delete_stream(sub_entries)
534            .try_collect::<Vec<_>>()
535            .await?;
536        Ok(())
537    }
538
539    pub fn remove_stream<'a>(
540        &'a self,
541        locations: BoxStream<'a, Result<Path>>,
542    ) -> BoxStream<'a, Result<Path>> {
543        self.inner
544            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
545            .err_into::<Error>()
546            .boxed()
547    }
548
549    /// Check a file exists.
550    pub async fn exists(&self, path: &Path) -> Result<bool> {
551        match self.inner.head(path).await {
552            Ok(_) => Ok(true),
553            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
554            Err(e) => Err(e.into()),
555        }
556    }
557
558    /// Get file size.
559    pub async fn size(&self, path: &Path) -> Result<usize> {
560        Ok(self.inner.head(path).await?.size)
561    }
562
563    /// Convenience function to open a reader and read all the bytes
564    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
565        let reader = self.open(path).await?;
566        Ok(reader.get_all().await?)
567    }
568
569    /// Convenience function open a reader and make a single request
570    ///
571    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
572    /// and then call [`Reader::get_range`] multiple times.
573    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
574        let reader = self.open(path).await?;
575        Ok(reader.get_range(range).await?)
576    }
577}
578
579/// Options that can be set for multiple object stores
580#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
581pub enum LanceConfigKey {
582    /// Number of times to retry a download that fails
583    DownloadRetryCount,
584}
585
586impl FromStr for LanceConfigKey {
587    type Err = Error;
588
589    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
590        match s.to_ascii_lowercase().as_str() {
591            "download_retry_count" => Ok(Self::DownloadRetryCount),
592            _ => Err(Error::InvalidInput {
593                source: format!("Invalid LanceConfigKey: {}", s).into(),
594                location: location!(),
595            }),
596        }
597    }
598}
599
600#[derive(Clone, Debug, Default)]
601pub struct StorageOptions(pub HashMap<String, String>);
602
603impl StorageOptions {
604    /// Create a new instance of [`StorageOptions`]
605    pub fn new(options: HashMap<String, String>) -> Self {
606        let mut options = options;
607        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
608            options.insert("allow_http".into(), value);
609        }
610        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
611            options.insert("allow_http".into(), value);
612        }
613        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
614            options.insert("allow_http".into(), value);
615        }
616        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
617            options.insert("client_max_retries".into(), value);
618        }
619        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
620            options.insert("client_retry_timeout".into(), value);
621        }
622        Self(options)
623    }
624
625    /// Denotes if unsecure connections via http are allowed
626    pub fn allow_http(&self) -> bool {
627        self.0.iter().any(|(key, value)| {
628            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
629        })
630    }
631
632    /// Number of times to retry a download that fails
633    pub fn download_retry_count(&self) -> usize {
634        self.0
635            .iter()
636            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
637            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
638            .unwrap_or(3)
639    }
640
641    /// Max retry times to set in RetryConfig for object store client
642    pub fn client_max_retries(&self) -> usize {
643        self.0
644            .iter()
645            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
646            .and_then(|(_, value)| value.parse::<usize>().ok())
647            .unwrap_or(10)
648    }
649
650    /// Seconds of timeout to set in RetryConfig for object store client
651    pub fn client_retry_timeout(&self) -> u64 {
652        self.0
653            .iter()
654            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
655            .and_then(|(_, value)| value.parse::<u64>().ok())
656            .unwrap_or(180)
657    }
658
659    pub fn get(&self, key: &str) -> Option<&String> {
660        self.0.get(key)
661    }
662}
663
664impl From<HashMap<String, String>> for StorageOptions {
665    fn from(value: HashMap<String, String>) -> Self {
666        Self::new(value)
667    }
668}
669
670impl ObjectStore {
671    #[allow(clippy::too_many_arguments)]
672    pub fn new(
673        store: Arc<DynObjectStore>,
674        location: Url,
675        block_size: Option<usize>,
676        wrapper: Option<Arc<dyn WrappingObjectStore>>,
677        use_constant_size_upload_parts: bool,
678        list_is_lexically_ordered: bool,
679        io_parallelism: usize,
680        download_retry_count: usize,
681    ) -> Self {
682        let scheme = location.scheme();
683        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
684
685        let store = match wrapper {
686            Some(wrapper) => wrapper.wrap(store),
687            None => store,
688        };
689
690        Self {
691            inner: store,
692            scheme: scheme.into(),
693            block_size,
694            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
695            use_constant_size_upload_parts,
696            list_is_lexically_ordered,
697            io_parallelism,
698            download_retry_count,
699        }
700    }
701}
702
703fn infer_block_size(scheme: &str) -> usize {
704    // Block size: On local file systems, we use 4KB block size. On cloud
705    // object stores, we use 64KB block size. This is generally the largest
706    // block size where we don't see a latency penalty.
707    match scheme {
708        "file" => 4 * 1024,
709        _ => 64 * 1024,
710    }
711}
712
713#[cfg(test)]
714mod tests {
715    use super::*;
716    use object_store::memory::InMemory;
717    use rstest::rstest;
718    use std::env::set_current_dir;
719    use std::fs::{create_dir_all, write};
720    use std::path::Path as StdPath;
721    use std::sync::atomic::{AtomicBool, Ordering};
722
723    /// Write test content to file.
724    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
725        let expanded = tilde(path_str).to_string();
726        let path = StdPath::new(&expanded);
727        std::fs::create_dir_all(path.parent().unwrap())?;
728        write(path, contents)
729    }
730
731    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
732        let test_file_store = store.open(path).await.unwrap();
733        let size = test_file_store.size().await.unwrap();
734        let bytes = test_file_store.get_range(0..size).await.unwrap();
735        let contents = String::from_utf8(bytes.to_vec()).unwrap();
736        Ok(contents)
737    }
738
739    #[tokio::test]
740    async fn test_absolute_paths() {
741        let tmp_dir = tempfile::tempdir().unwrap();
742        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
743        write_to_file(
744            &format!("{tmp_path}/bar/foo.lance/test_file"),
745            "TEST_CONTENT",
746        )
747        .unwrap();
748
749        // test a few variations of the same path
750        for uri in &[
751            format!("{tmp_path}/bar/foo.lance"),
752            format!("{tmp_path}/./bar/foo.lance"),
753            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
754        ] {
755            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
756            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
757                .await
758                .unwrap();
759            assert_eq!(contents, "TEST_CONTENT");
760        }
761    }
762
763    #[tokio::test]
764    async fn test_cloud_paths() {
765        let uri = "s3://bucket/foo.lance";
766        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
767        assert_eq!(store.scheme, "s3");
768        assert_eq!(path.to_string(), "foo.lance");
769
770        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
771            .await
772            .unwrap();
773        assert_eq!(store.scheme, "s3");
774        assert_eq!(path.to_string(), "foo.lance");
775
776        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
777            .await
778            .unwrap();
779        assert_eq!(store.scheme, "gs");
780        assert_eq!(path.to_string(), "foo.lance");
781    }
782
783    async fn test_block_size_used_test_helper(
784        uri: &str,
785        storage_options: Option<HashMap<String, String>>,
786        default_expected_block_size: usize,
787    ) {
788        // Test the default
789        let registry = Arc::new(ObjectStoreRegistry::default());
790        let params = ObjectStoreParams {
791            storage_options: storage_options.clone(),
792            ..ObjectStoreParams::default()
793        };
794        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
795            .await
796            .unwrap();
797        assert_eq!(store.block_size, default_expected_block_size);
798
799        // Ensure param is used
800        let registry = Arc::new(ObjectStoreRegistry::default());
801        let params = ObjectStoreParams {
802            block_size: Some(1024),
803            storage_options: storage_options.clone(),
804            ..ObjectStoreParams::default()
805        };
806        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
807            .await
808            .unwrap();
809        assert_eq!(store.block_size, 1024);
810    }
811
812    #[rstest]
813    #[case("s3://bucket/foo.lance", None)]
814    #[case("gs://bucket/foo.lance", None)]
815    #[case("az://account/bucket/foo.lance",
816      Some(HashMap::from([
817            (String::from("account_name"), String::from("account")),
818            (String::from("container_name"), String::from("container"))
819           ])))]
820    #[tokio::test]
821    async fn test_block_size_used_cloud(
822        #[case] uri: &str,
823        #[case] storage_options: Option<HashMap<String, String>>,
824    ) {
825        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
826    }
827
828    #[rstest]
829    #[case("file")]
830    #[case("file-object-store")]
831    #[case("memory:///bucket/foo.lance")]
832    #[tokio::test]
833    async fn test_block_size_used_file(#[case] prefix: &str) {
834        let tmp_dir = tempfile::tempdir().unwrap();
835        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
836        let path = format!("{tmp_path}/bar/foo.lance/test_file");
837        write_to_file(&path, "URL").unwrap();
838        let uri = format!("{prefix}:///{path}");
839        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
840    }
841
842    #[tokio::test]
843    async fn test_relative_paths() {
844        let tmp_dir = tempfile::tempdir().unwrap();
845        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
846        write_to_file(
847            &format!("{tmp_path}/bar/foo.lance/test_file"),
848            "RELATIVE_URL",
849        )
850        .unwrap();
851
852        set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
853        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
854
855        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
856            .await
857            .unwrap();
858        assert_eq!(contents, "RELATIVE_URL");
859    }
860
861    #[tokio::test]
862    async fn test_tilde_expansion() {
863        let uri = "~/foo.lance";
864        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
865        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
866        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
867            .await
868            .unwrap();
869        assert_eq!(contents, "TILDE");
870    }
871
872    #[tokio::test]
873    async fn test_read_directory() {
874        let tmp_dir = tempfile::tempdir().unwrap();
875        let path = tmp_dir.path();
876        create_dir_all(path.join("foo").join("bar")).unwrap();
877        create_dir_all(path.join("foo").join("zoo")).unwrap();
878        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
879        write_to_file(
880            path.join("foo").join("test_file").to_str().unwrap(),
881            "read_dir",
882        )
883        .unwrap();
884        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
885
886        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
887        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
888    }
889
890    #[tokio::test]
891    async fn test_delete_directory() {
892        let tmp_dir = tempfile::tempdir().unwrap();
893        let path = tmp_dir.path();
894        create_dir_all(path.join("foo").join("bar")).unwrap();
895        create_dir_all(path.join("foo").join("zoo")).unwrap();
896        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
897        write_to_file(
898            path.join("foo")
899                .join("bar")
900                .join("test_file")
901                .to_str()
902                .unwrap(),
903            "delete",
904        )
905        .unwrap();
906        write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
907        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
908        store.remove_dir_all(base.child("foo")).await.unwrap();
909
910        assert!(!path.join("foo").exists());
911    }
912
913    #[derive(Debug)]
914    struct TestWrapper {
915        called: AtomicBool,
916
917        return_value: Arc<dyn OSObjectStore>,
918    }
919
920    impl WrappingObjectStore for TestWrapper {
921        fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
922            self.called.store(true, Ordering::Relaxed);
923
924            // return a mocked value so we can check if the final store is the one we expect
925            self.return_value.clone()
926        }
927    }
928
929    impl TestWrapper {
930        fn called(&self) -> bool {
931            self.called.load(Ordering::Relaxed)
932        }
933    }
934
935    #[tokio::test]
936    async fn test_wrapping_object_store_option_is_used() {
937        // Make a store for the inner store first
938        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
939        let registry = Arc::new(ObjectStoreRegistry::default());
940
941        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
942
943        let wrapper = Arc::new(TestWrapper {
944            called: AtomicBool::new(false),
945            return_value: mock_inner_store.clone(),
946        });
947
948        let params = ObjectStoreParams {
949            object_store_wrapper: Some(wrapper.clone()),
950            ..ObjectStoreParams::default()
951        };
952
953        // not called yet
954        assert!(!wrapper.called());
955
956        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
957            .await
958            .unwrap();
959
960        // called after construction
961        assert!(wrapper.called());
962
963        // hard to compare two trait pointers as the point to vtables
964        // using the ref count as a proxy to make sure that the store is correctly kept
965        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
966    }
967
968    #[tokio::test]
969    async fn test_local_paths() {
970        let temp_dir = tempfile::tempdir().unwrap();
971
972        let file_path = temp_dir.path().join("test_file");
973        let mut writer = ObjectStore::create_local_writer(file_path.as_path())
974            .await
975            .unwrap();
976        writer.write_all(b"LOCAL").await.unwrap();
977        writer.shutdown().await.unwrap();
978
979        let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
980        let buf = reader.get_range(0..5).await.unwrap();
981        assert_eq!(buf.as_ref(), b"LOCAL");
982    }
983
984    #[tokio::test]
985    async fn test_read_one() {
986        let temp_dir = tempfile::tempdir().unwrap();
987
988        let file_path = temp_dir.path().join("test_file");
989        let mut writer = ObjectStore::create_local_writer(file_path.as_path())
990            .await
991            .unwrap();
992        writer.write_all(b"LOCAL").await.unwrap();
993        writer.shutdown().await.unwrap();
994
995        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
996        let obj_store = ObjectStore::local();
997        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
998        assert_eq!(buf.as_ref(), b"LOCAL");
999
1000        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1001        assert_eq!(buf.as_ref(), b"LOCAL");
1002    }
1003
1004    #[tokio::test]
1005    #[cfg(windows)]
1006    async fn test_windows_paths() {
1007        use std::path::Component;
1008        use std::path::Prefix;
1009        use std::path::Prefix::*;
1010
1011        fn get_path_prefix(path: &StdPath) -> Prefix {
1012            match path.components().next().unwrap() {
1013                Component::Prefix(prefix_component) => prefix_component.kind(),
1014                _ => panic!(),
1015            }
1016        }
1017
1018        fn get_drive_letter(prefix: Prefix) -> String {
1019            match prefix {
1020                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1021                _ => panic!(),
1022            }
1023        }
1024
1025        let tmp_dir = tempfile::tempdir().unwrap();
1026        let tmp_path = tmp_dir.path();
1027        let prefix = get_path_prefix(tmp_path);
1028        let drive_letter = get_drive_letter(prefix);
1029
1030        write_to_file(
1031            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1032            "WINDOWS",
1033        )
1034        .unwrap();
1035
1036        for uri in &[
1037            format!("{drive_letter}:/test_folder/test.lance"),
1038            format!("{drive_letter}:\\test_folder\\test.lance"),
1039        ] {
1040            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1041            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1042                .await
1043                .unwrap();
1044            assert_eq!(contents, "WINDOWS");
1045        }
1046    }
1047}