lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43// Local disks tend to do fine with a few threads
44// Note: the number of threads here also impacts the number of files
45// we need to read in some situations.  So keeping this at 8 keeps the
46// RAM on our scanner down.
47pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48// Cloud disks often need many many threads to saturate the network
49pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
52#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
54
55lazy_static::lazy_static! {
56    pub static ref DEFAULT_MAX_IOP_SIZE: u64 = std::env::var("LANCE_MAX_IOP_SIZE")
57        .map(|val| val.parse().unwrap()).unwrap_or(16 * 1024 * 1024);
58}
59
60pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
61
62pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
63
64#[async_trait]
65pub trait ObjectStoreExt {
66    /// Returns true if the file exists.
67    async fn exists(&self, path: &Path) -> Result<bool>;
68
69    /// Read all files (start from base directory) recursively
70    ///
71    /// unmodified_since can be specified to only return files that have not been modified since the given time.
72    async fn read_dir_all<'a>(
73        &'a self,
74        dir_path: impl Into<&Path> + Send,
75        unmodified_since: Option<DateTime<Utc>>,
76    ) -> Result<BoxStream<'a, Result<ObjectMeta>>>;
77}
78
79#[async_trait]
80impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
81    async fn read_dir_all<'a>(
82        &'a self,
83        dir_path: impl Into<&Path> + Send,
84        unmodified_since: Option<DateTime<Utc>>,
85    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
86        let mut output = self.list(Some(dir_path.into()));
87        if let Some(unmodified_since_val) = unmodified_since {
88            output = output
89                .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
90                .boxed();
91        }
92        Ok(output.map_err(|e| e.into()).boxed())
93    }
94
95    async fn exists(&self, path: &Path) -> Result<bool> {
96        match self.head(path).await {
97            Ok(_) => Ok(true),
98            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
99            Err(e) => Err(e.into()),
100        }
101    }
102}
103
104/// Wraps [ObjectStore](object_store::ObjectStore)
105#[derive(Debug, Clone)]
106pub struct ObjectStore {
107    // Inner object store
108    pub inner: Arc<dyn OSObjectStore>,
109    scheme: String,
110    block_size: usize,
111    max_iop_size: u64,
112    /// Whether to use constant size upload parts for multipart uploads. This
113    /// is only necessary for Cloudflare R2.
114    pub use_constant_size_upload_parts: bool,
115    /// Whether we can assume that the list of files is lexically ordered. This
116    /// is true for object stores, but not for local filesystems.
117    pub list_is_lexically_ordered: bool,
118    io_parallelism: usize,
119    /// Number of times to retry a failed download
120    download_retry_count: usize,
121}
122
123impl DeepSizeOf for ObjectStore {
124    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
125        // We aren't counting `inner` here which is problematic but an ObjectStore
126        // shouldn't be too big.  The only exception might be the write cache but, if
127        // the writer cache has data, it means we're using it somewhere else that isn't
128        // a cache and so that doesn't really count.
129        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
130    }
131}
132
133impl std::fmt::Display for ObjectStore {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        write!(f, "ObjectStore({})", self.scheme)
136    }
137}
138
139pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
140    fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
141}
142
143/// Parameters to create an [ObjectStore]
144///
145#[derive(Debug, Clone)]
146pub struct ObjectStoreParams {
147    pub block_size: Option<usize>,
148    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
149    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
150    pub s3_credentials_refresh_offset: Duration,
151    #[cfg(feature = "aws")]
152    pub aws_credentials: Option<AwsCredentialProvider>,
153    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
154    pub storage_options: Option<HashMap<String, String>>,
155    /// Use constant size upload parts for multipart uploads. Only necessary
156    /// for Cloudflare R2, which doesn't support variable size parts. When this
157    /// is false, max upload size is 2.5TB. When this is true, the max size is
158    /// 50GB.
159    pub use_constant_size_upload_parts: bool,
160    pub list_is_lexically_ordered: Option<bool>,
161}
162
163impl Default for ObjectStoreParams {
164    fn default() -> Self {
165        #[allow(deprecated)]
166        Self {
167            object_store: None,
168            block_size: None,
169            s3_credentials_refresh_offset: Duration::from_secs(60),
170            #[cfg(feature = "aws")]
171            aws_credentials: None,
172            object_store_wrapper: None,
173            storage_options: None,
174            use_constant_size_upload_parts: false,
175            list_is_lexically_ordered: None,
176        }
177    }
178}
179
180// We implement hash for caching
181impl std::hash::Hash for ObjectStoreParams {
182    #[allow(deprecated)]
183    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
184        // For hashing, we use pointer values for ObjectStore, S3 credentials, and wrapper
185        self.block_size.hash(state);
186        if let Some((store, url)) = &self.object_store {
187            Arc::as_ptr(store).hash(state);
188            url.hash(state);
189        }
190        self.s3_credentials_refresh_offset.hash(state);
191        #[cfg(feature = "aws")]
192        if let Some(aws_credentials) = &self.aws_credentials {
193            Arc::as_ptr(aws_credentials).hash(state);
194        }
195        if let Some(wrapper) = &self.object_store_wrapper {
196            Arc::as_ptr(wrapper).hash(state);
197        }
198        if let Some(storage_options) = &self.storage_options {
199            for (key, value) in storage_options {
200                key.hash(state);
201                value.hash(state);
202            }
203        }
204        self.use_constant_size_upload_parts.hash(state);
205        self.list_is_lexically_ordered.hash(state);
206    }
207}
208
209// We implement eq for caching
210impl Eq for ObjectStoreParams {}
211impl PartialEq for ObjectStoreParams {
212    #[allow(deprecated)]
213    fn eq(&self, other: &Self) -> bool {
214        // For equality, we use pointer comparison for ObjectStore, S3 credentials, and wrapper
215        self.block_size == other.block_size
216            && self
217                .object_store
218                .as_ref()
219                .map(|(store, url)| (Arc::as_ptr(store), url))
220                == other
221                    .object_store
222                    .as_ref()
223                    .map(|(store, url)| (Arc::as_ptr(store), url))
224            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
225            && self.aws_credentials.as_ref().map(Arc::as_ptr)
226                == other.aws_credentials.as_ref().map(Arc::as_ptr)
227            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
228                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
229            && self.storage_options == other.storage_options
230            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
231            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
232    }
233}
234
235fn uri_to_url(uri: &str) -> Result<Url> {
236    match Url::parse(uri) {
237        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
238            // On Windows, the drive is parsed as a scheme
239            local_path_to_url(uri)
240        }
241        Ok(url) => Ok(url),
242        Err(_) => local_path_to_url(uri),
243    }
244}
245
246fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
247    let expanded = tilde(str_path.as_ref()).to_string();
248
249    let mut expanded_path = path_abs::PathAbs::new(expanded)
250        .unwrap()
251        .as_path()
252        .to_path_buf();
253    // path_abs::PathAbs::new(".") returns an empty string.
254    if let Some(s) = expanded_path.as_path().to_str() {
255        if s.is_empty() {
256            expanded_path = std::env::current_dir()?;
257        }
258    }
259
260    Ok(expanded_path)
261}
262
263fn local_path_to_url(str_path: &str) -> Result<Url> {
264    let expanded_path = expand_path(str_path)?;
265
266    Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
267        source: format!("Invalid table location: '{}'", str_path).into(),
268        location: location!(),
269    })
270}
271
272impl ObjectStore {
273    /// Parse from a string URI.
274    ///
275    /// Returns the ObjectStore instance and the absolute path to the object.
276    ///
277    /// This uses the default [ObjectStoreRegistry] to find the object store. To
278    /// allow for potential re-use of object store instances, it's recommended to
279    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
280    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
281        let registry = Arc::new(ObjectStoreRegistry::default());
282
283        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
284    }
285
286    /// Parse from a string URI.
287    ///
288    /// Returns the ObjectStore instance and the absolute path to the object.
289    pub async fn from_uri_and_params(
290        registry: Arc<ObjectStoreRegistry>,
291        uri: &str,
292        params: &ObjectStoreParams,
293    ) -> Result<(Arc<Self>, Path)> {
294        #[allow(deprecated)]
295        if let Some((store, path)) = params.object_store.as_ref() {
296            let mut inner = store.clone();
297            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
298                inner = wrapper.wrap(inner);
299            }
300            let store = Self {
301                inner,
302                scheme: path.scheme().to_string(),
303                block_size: params.block_size.unwrap_or(64 * 1024),
304                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
305                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
306                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
307                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
308                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
309            };
310            let path = Path::from(path.path());
311            return Ok((Arc::new(store), path));
312        }
313        let url = uri_to_url(uri)?;
314        let store = registry.get_store(url.clone(), params).await?;
315        // We know the scheme is valid if we got a store back.
316        let provider = registry.get_provider(url.scheme()).expect_ok()?;
317        let path = provider.extract_path(&url);
318
319        Ok((store, path))
320    }
321
322    #[deprecated(note = "Use `from_uri` instead")]
323    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
324        Self::from_uri_and_params(
325            Arc::new(ObjectStoreRegistry::default()),
326            str_path,
327            &Default::default(),
328        )
329        .now_or_never()
330        .unwrap()
331    }
332
333    /// Local object store.
334    pub fn local() -> Self {
335        let provider = FileStoreProvider;
336        provider
337            .new_store(Url::parse("file:///").unwrap(), &Default::default())
338            .now_or_never()
339            .unwrap()
340            .unwrap()
341    }
342
343    /// Create a in-memory object store directly for testing.
344    pub fn memory() -> Self {
345        let provider = MemoryStoreProvider;
346        provider
347            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
348            .now_or_never()
349            .unwrap()
350            .unwrap()
351    }
352
353    /// Returns true if the object store pointed to a local file system.
354    pub fn is_local(&self) -> bool {
355        self.scheme == "file"
356    }
357
358    pub fn is_cloud(&self) -> bool {
359        self.scheme != "file" && self.scheme != "memory"
360    }
361
362    pub fn scheme(&self) -> &str {
363        &self.scheme
364    }
365
366    pub fn block_size(&self) -> usize {
367        self.block_size
368    }
369
370    pub fn max_iop_size(&self) -> u64 {
371        self.max_iop_size
372    }
373
374    pub fn io_parallelism(&self) -> usize {
375        std::env::var("LANCE_IO_THREADS")
376            .map(|val| val.parse::<usize>().unwrap())
377            .unwrap_or(self.io_parallelism)
378    }
379
380    /// Open a file for path.
381    ///
382    /// Parameters
383    /// - ``path``: Absolute path to the file.
384    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
385        match self.scheme.as_str() {
386            "file" => LocalObjectReader::open(path, self.block_size, None).await,
387            _ => Ok(Box::new(CloudObjectReader::new(
388                self.inner.clone(),
389                path.clone(),
390                self.block_size,
391                None,
392                self.download_retry_count,
393            )?)),
394        }
395    }
396
397    /// Open a reader for a file with known size.
398    ///
399    /// This size may either have been retrieved from a list operation or
400    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
401    /// call.
402    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
403        // If we know the file is really small, we can read the whole thing
404        // as a single request.
405        if known_size <= self.block_size {
406            return Ok(Box::new(SmallReader::new(
407                self.inner.clone(),
408                path.clone(),
409                self.download_retry_count,
410                known_size,
411            )));
412        }
413
414        match self.scheme.as_str() {
415            "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
416            _ => Ok(Box::new(CloudObjectReader::new(
417                self.inner.clone(),
418                path.clone(),
419                self.block_size,
420                Some(known_size),
421                self.download_retry_count,
422            )?)),
423        }
424    }
425
426    /// Create an [ObjectWriter] from local [std::path::Path]
427    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
428        let object_store = Self::local();
429        let absolute_path = expand_path(path.to_string_lossy())?;
430        let os_path = Path::from_absolute_path(absolute_path)?;
431        object_store.create(&os_path).await
432    }
433
434    /// Open an [Reader] from local [std::path::Path]
435    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
436        let object_store = Self::local();
437        let absolute_path = expand_path(path.to_string_lossy())?;
438        let os_path = Path::from_absolute_path(absolute_path)?;
439        object_store.open(&os_path).await
440    }
441
442    /// Create a new file.
443    pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
444        ObjectWriter::new(self, path).await
445    }
446
447    /// A helper function to create a file and write content to it.
448    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
449        let mut writer = self.create(path).await?;
450        writer.write_all(content).await?;
451        writer.shutdown().await
452    }
453
454    pub async fn delete(&self, path: &Path) -> Result<()> {
455        self.inner.delete(path).await?;
456        Ok(())
457    }
458
459    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
460        Ok(self.inner.copy(from, to).await?)
461    }
462
463    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
464    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
465        let path = dir_path.into();
466        let path = Path::parse(&path)?;
467        let output = self.inner.list_with_delimiter(Some(&path)).await?;
468        Ok(output
469            .common_prefixes
470            .iter()
471            .chain(output.objects.iter().map(|o| &o.location))
472            .map(|s| s.filename().unwrap().to_string())
473            .collect())
474    }
475
476    pub fn list(
477        &self,
478        path: Option<Path>,
479    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
480        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
481    }
482
483    /// Read all files (start from base directory) recursively
484    ///
485    /// unmodified_since can be specified to only return files that have not been modified since the given time.
486    pub async fn read_dir_all(
487        &self,
488        dir_path: impl Into<&Path> + Send,
489        unmodified_since: Option<DateTime<Utc>>,
490    ) -> Result<BoxStream<Result<ObjectMeta>>> {
491        self.inner.read_dir_all(dir_path, unmodified_since).await
492    }
493
494    /// Remove a directory recursively.
495    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
496        let path = dir_path.into();
497        let path = Path::parse(&path)?;
498
499        if self.is_local() {
500            // Local file system needs to delete directories as well.
501            return super::local::remove_dir_all(&path);
502        }
503        let sub_entries = self
504            .inner
505            .list(Some(&path))
506            .map(|m| m.map(|meta| meta.location))
507            .boxed();
508        self.inner
509            .delete_stream(sub_entries)
510            .try_collect::<Vec<_>>()
511            .await?;
512        Ok(())
513    }
514
515    pub fn remove_stream<'a>(
516        &'a self,
517        locations: BoxStream<'a, Result<Path>>,
518    ) -> BoxStream<'a, Result<Path>> {
519        self.inner
520            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
521            .err_into::<Error>()
522            .boxed()
523    }
524
525    /// Check a file exists.
526    pub async fn exists(&self, path: &Path) -> Result<bool> {
527        match self.inner.head(path).await {
528            Ok(_) => Ok(true),
529            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
530            Err(e) => Err(e.into()),
531        }
532    }
533
534    /// Get file size.
535    pub async fn size(&self, path: &Path) -> Result<usize> {
536        Ok(self.inner.head(path).await?.size)
537    }
538
539    /// Convenience function to open a reader and read all the bytes
540    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
541        let reader = self.open(path).await?;
542        Ok(reader.get_all().await?)
543    }
544
545    /// Convenience function open a reader and make a single request
546    ///
547    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
548    /// and then call [`Reader::get_range`] multiple times.
549    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
550        let reader = self.open(path).await?;
551        Ok(reader.get_range(range).await?)
552    }
553}
554
555/// Options that can be set for multiple object stores
556#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
557pub enum LanceConfigKey {
558    /// Number of times to retry a download that fails
559    DownloadRetryCount,
560}
561
562impl FromStr for LanceConfigKey {
563    type Err = Error;
564
565    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
566        match s.to_ascii_lowercase().as_str() {
567            "download_retry_count" => Ok(Self::DownloadRetryCount),
568            _ => Err(Error::InvalidInput {
569                source: format!("Invalid LanceConfigKey: {}", s).into(),
570                location: location!(),
571            }),
572        }
573    }
574}
575
576#[derive(Clone, Debug, Default)]
577pub struct StorageOptions(pub HashMap<String, String>);
578
579impl StorageOptions {
580    /// Create a new instance of [`StorageOptions`]
581    pub fn new(options: HashMap<String, String>) -> Self {
582        let mut options = options;
583        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
584            options.insert("allow_http".into(), value);
585        }
586        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
587            options.insert("allow_http".into(), value);
588        }
589        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
590            options.insert("allow_http".into(), value);
591        }
592        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
593            options.insert("client_max_retries".into(), value);
594        }
595        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
596            options.insert("client_retry_timeout".into(), value);
597        }
598        Self(options)
599    }
600
601    /// Denotes if unsecure connections via http are allowed
602    pub fn allow_http(&self) -> bool {
603        self.0.iter().any(|(key, value)| {
604            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
605        })
606    }
607
608    /// Number of times to retry a download that fails
609    pub fn download_retry_count(&self) -> usize {
610        self.0
611            .iter()
612            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
613            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
614            .unwrap_or(3)
615    }
616
617    /// Max retry times to set in RetryConfig for object store client
618    pub fn client_max_retries(&self) -> usize {
619        self.0
620            .iter()
621            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
622            .and_then(|(_, value)| value.parse::<usize>().ok())
623            .unwrap_or(10)
624    }
625
626    /// Seconds of timeout to set in RetryConfig for object store client
627    pub fn client_retry_timeout(&self) -> u64 {
628        self.0
629            .iter()
630            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
631            .and_then(|(_, value)| value.parse::<u64>().ok())
632            .unwrap_or(180)
633    }
634
635    pub fn get(&self, key: &str) -> Option<&String> {
636        self.0.get(key)
637    }
638}
639
640impl From<HashMap<String, String>> for StorageOptions {
641    fn from(value: HashMap<String, String>) -> Self {
642        Self::new(value)
643    }
644}
645
646impl ObjectStore {
647    #[allow(clippy::too_many_arguments)]
648    pub fn new(
649        store: Arc<DynObjectStore>,
650        location: Url,
651        block_size: Option<usize>,
652        wrapper: Option<Arc<dyn WrappingObjectStore>>,
653        use_constant_size_upload_parts: bool,
654        list_is_lexically_ordered: bool,
655        io_parallelism: usize,
656        download_retry_count: usize,
657    ) -> Self {
658        let scheme = location.scheme();
659        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
660
661        let store = match wrapper {
662            Some(wrapper) => wrapper.wrap(store),
663            None => store,
664        };
665
666        Self {
667            inner: store,
668            scheme: scheme.into(),
669            block_size,
670            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
671            use_constant_size_upload_parts,
672            list_is_lexically_ordered,
673            io_parallelism,
674            download_retry_count,
675        }
676    }
677}
678
679fn infer_block_size(scheme: &str) -> usize {
680    // Block size: On local file systems, we use 4KB block size. On cloud
681    // object stores, we use 64KB block size. This is generally the largest
682    // block size where we don't see a latency penalty.
683    match scheme {
684        "file" => 4 * 1024,
685        _ => 64 * 1024,
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692    use object_store::memory::InMemory;
693    use parquet::data_type::AsBytes;
694    use rstest::rstest;
695    use std::env::set_current_dir;
696    use std::fs::{create_dir_all, write};
697    use std::path::Path as StdPath;
698    use std::sync::atomic::{AtomicBool, Ordering};
699
700    /// Write test content to file.
701    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
702        let expanded = tilde(path_str).to_string();
703        let path = StdPath::new(&expanded);
704        std::fs::create_dir_all(path.parent().unwrap())?;
705        write(path, contents)
706    }
707
708    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
709        let test_file_store = store.open(path).await.unwrap();
710        let size = test_file_store.size().await.unwrap();
711        let bytes = test_file_store.get_range(0..size).await.unwrap();
712        let contents = String::from_utf8(bytes.to_vec()).unwrap();
713        Ok(contents)
714    }
715
716    #[tokio::test]
717    async fn test_absolute_paths() {
718        let tmp_dir = tempfile::tempdir().unwrap();
719        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
720        write_to_file(
721            &format!("{tmp_path}/bar/foo.lance/test_file"),
722            "TEST_CONTENT",
723        )
724        .unwrap();
725
726        // test a few variations of the same path
727        for uri in &[
728            format!("{tmp_path}/bar/foo.lance"),
729            format!("{tmp_path}/./bar/foo.lance"),
730            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
731        ] {
732            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
733            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
734                .await
735                .unwrap();
736            assert_eq!(contents, "TEST_CONTENT");
737        }
738    }
739
740    #[tokio::test]
741    async fn test_cloud_paths() {
742        let uri = "s3://bucket/foo.lance";
743        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
744        assert_eq!(store.scheme, "s3");
745        assert_eq!(path.to_string(), "foo.lance");
746
747        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
748            .await
749            .unwrap();
750        assert_eq!(store.scheme, "s3");
751        assert_eq!(path.to_string(), "foo.lance");
752
753        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
754            .await
755            .unwrap();
756        assert_eq!(store.scheme, "gs");
757        assert_eq!(path.to_string(), "foo.lance");
758    }
759
760    async fn test_block_size_used_test_helper(
761        uri: &str,
762        storage_options: Option<HashMap<String, String>>,
763        default_expected_block_size: usize,
764    ) {
765        // Test the default
766        let registry = Arc::new(ObjectStoreRegistry::default());
767        let params = ObjectStoreParams {
768            storage_options: storage_options.clone(),
769            ..ObjectStoreParams::default()
770        };
771        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
772            .await
773            .unwrap();
774        assert_eq!(store.block_size, default_expected_block_size);
775
776        // Ensure param is used
777        let registry = Arc::new(ObjectStoreRegistry::default());
778        let params = ObjectStoreParams {
779            block_size: Some(1024),
780            storage_options: storage_options.clone(),
781            ..ObjectStoreParams::default()
782        };
783        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
784            .await
785            .unwrap();
786        assert_eq!(store.block_size, 1024);
787    }
788
789    #[rstest]
790    #[case("s3://bucket/foo.lance", None)]
791    #[case("gs://bucket/foo.lance", None)]
792    #[case("az://account/bucket/foo.lance",
793      Some(HashMap::from([
794            (String::from("account_name"), String::from("account")),
795            (String::from("container_name"), String::from("container"))
796           ])))]
797    #[tokio::test]
798    async fn test_block_size_used_cloud(
799        #[case] uri: &str,
800        #[case] storage_options: Option<HashMap<String, String>>,
801    ) {
802        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
803    }
804
805    #[rstest]
806    #[case("file")]
807    #[case("file-object-store")]
808    #[case("memory:///bucket/foo.lance")]
809    #[tokio::test]
810    async fn test_block_size_used_file(#[case] prefix: &str) {
811        let tmp_dir = tempfile::tempdir().unwrap();
812        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
813        let path = format!("{tmp_path}/bar/foo.lance/test_file");
814        write_to_file(&path, "URL").unwrap();
815        let uri = format!("{prefix}:///{path}");
816        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
817    }
818
819    #[tokio::test]
820    async fn test_relative_paths() {
821        let tmp_dir = tempfile::tempdir().unwrap();
822        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
823        write_to_file(
824            &format!("{tmp_path}/bar/foo.lance/test_file"),
825            "RELATIVE_URL",
826        )
827        .unwrap();
828
829        set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
830        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
831
832        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
833            .await
834            .unwrap();
835        assert_eq!(contents, "RELATIVE_URL");
836    }
837
838    #[tokio::test]
839    async fn test_tilde_expansion() {
840        let uri = "~/foo.lance";
841        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
842        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
843        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
844            .await
845            .unwrap();
846        assert_eq!(contents, "TILDE");
847    }
848
849    #[tokio::test]
850    async fn test_read_directory() {
851        let tmp_dir = tempfile::tempdir().unwrap();
852        let path = tmp_dir.path();
853        create_dir_all(path.join("foo").join("bar")).unwrap();
854        create_dir_all(path.join("foo").join("zoo")).unwrap();
855        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
856        write_to_file(
857            path.join("foo").join("test_file").to_str().unwrap(),
858            "read_dir",
859        )
860        .unwrap();
861        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
862
863        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
864        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
865    }
866
867    #[tokio::test]
868    async fn test_delete_directory() {
869        let tmp_dir = tempfile::tempdir().unwrap();
870        let path = tmp_dir.path();
871        create_dir_all(path.join("foo").join("bar")).unwrap();
872        create_dir_all(path.join("foo").join("zoo")).unwrap();
873        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
874        write_to_file(
875            path.join("foo")
876                .join("bar")
877                .join("test_file")
878                .to_str()
879                .unwrap(),
880            "delete",
881        )
882        .unwrap();
883        write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
884        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
885        store.remove_dir_all(base.child("foo")).await.unwrap();
886
887        assert!(!path.join("foo").exists());
888    }
889
890    #[derive(Debug)]
891    struct TestWrapper {
892        called: AtomicBool,
893
894        return_value: Arc<dyn OSObjectStore>,
895    }
896
897    impl WrappingObjectStore for TestWrapper {
898        fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
899            self.called.store(true, Ordering::Relaxed);
900
901            // return a mocked value so we can check if the final store is the one we expect
902            self.return_value.clone()
903        }
904    }
905
906    impl TestWrapper {
907        fn called(&self) -> bool {
908            self.called.load(Ordering::Relaxed)
909        }
910    }
911
912    #[tokio::test]
913    async fn test_wrapping_object_store_option_is_used() {
914        // Make a store for the inner store first
915        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
916        let registry = Arc::new(ObjectStoreRegistry::default());
917
918        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
919
920        let wrapper = Arc::new(TestWrapper {
921            called: AtomicBool::new(false),
922            return_value: mock_inner_store.clone(),
923        });
924
925        let params = ObjectStoreParams {
926            object_store_wrapper: Some(wrapper.clone()),
927            ..ObjectStoreParams::default()
928        };
929
930        // not called yet
931        assert!(!wrapper.called());
932
933        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
934            .await
935            .unwrap();
936
937        // called after construction
938        assert!(wrapper.called());
939
940        // hard to compare two trait pointers as the point to vtables
941        // using the ref count as a proxy to make sure that the store is correctly kept
942        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
943    }
944
945    #[tokio::test]
946    async fn test_local_paths() {
947        let temp_dir = tempfile::tempdir().unwrap();
948
949        let file_path = temp_dir.path().join("test_file");
950        let mut writer = ObjectStore::create_local_writer(file_path.as_path())
951            .await
952            .unwrap();
953        writer.write_all(b"LOCAL").await.unwrap();
954        writer.shutdown().await.unwrap();
955
956        let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
957        let buf = reader.get_range(0..5).await.unwrap();
958        assert_eq!(buf.as_bytes(), b"LOCAL");
959    }
960
961    #[tokio::test]
962    async fn test_read_one() {
963        let temp_dir = tempfile::tempdir().unwrap();
964
965        let file_path = temp_dir.path().join("test_file");
966        let mut writer = ObjectStore::create_local_writer(file_path.as_path())
967            .await
968            .unwrap();
969        writer.write_all(b"LOCAL").await.unwrap();
970        writer.shutdown().await.unwrap();
971
972        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
973        let obj_store = ObjectStore::local();
974        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
975        assert_eq!(buf.as_bytes(), b"LOCAL");
976
977        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
978        assert_eq!(buf.as_bytes(), b"LOCAL");
979    }
980
981    #[tokio::test]
982    #[cfg(windows)]
983    async fn test_windows_paths() {
984        use std::path::Component;
985        use std::path::Prefix;
986        use std::path::Prefix::*;
987
988        fn get_path_prefix(path: &StdPath) -> Prefix {
989            match path.components().next().unwrap() {
990                Component::Prefix(prefix_component) => prefix_component.kind(),
991                _ => panic!(),
992            }
993        }
994
995        fn get_drive_letter(prefix: Prefix) -> String {
996            match prefix {
997                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
998                _ => panic!(),
999            }
1000        }
1001
1002        let tmp_dir = tempfile::tempdir().unwrap();
1003        let tmp_path = tmp_dir.path();
1004        let prefix = get_path_prefix(tmp_path);
1005        let drive_letter = get_drive_letter(prefix);
1006
1007        write_to_file(
1008            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1009            "WINDOWS",
1010        )
1011        .unwrap();
1012
1013        for uri in &[
1014            format!("{drive_letter}:/test_folder/test.lance"),
1015            format!("{drive_letter}:\\test_folder\\test.lance"),
1016        ] {
1017            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1018            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1019                .await
1020                .unwrap();
1021            assert_eq!(contents, "WINDOWS");
1022        }
1023    }
1024}