lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43// Local disks tend to do fine with a few threads
44// Note: the number of threads here also impacts the number of files
45// we need to read in some situations.  So keeping this at 8 keeps the
46// RAM on our scanner down.
47pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48// Cloud disks often need many many threads to saturate the network
49pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
52#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
54
55pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
56    std::env::var("LANCE_MAX_IOP_SIZE")
57        .map(|val| val.parse().unwrap())
58        .unwrap_or(16 * 1024 * 1024)
59});
60
61pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
62
63pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
64
65#[async_trait]
66pub trait ObjectStoreExt {
67    /// Returns true if the file exists.
68    async fn exists(&self, path: &Path) -> Result<bool>;
69
70    /// Read all files (start from base directory) recursively
71    ///
72    /// unmodified_since can be specified to only return files that have not been modified since the given time.
73    fn read_dir_all<'a, 'b>(
74        &'a self,
75        dir_path: impl Into<&'b Path> + Send,
76        unmodified_since: Option<DateTime<Utc>>,
77    ) -> BoxStream<'a, Result<ObjectMeta>>;
78}
79
80#[async_trait]
81impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
82    fn read_dir_all<'a, 'b>(
83        &'a self,
84        dir_path: impl Into<&'b Path> + Send,
85        unmodified_since: Option<DateTime<Utc>>,
86    ) -> BoxStream<'a, Result<ObjectMeta>> {
87        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
88        if let Some(unmodified_since_val) = unmodified_since {
89            output
90                .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
91                .boxed()
92        } else {
93            output.boxed()
94        }
95    }
96
97    async fn exists(&self, path: &Path) -> Result<bool> {
98        match self.head(path).await {
99            Ok(_) => Ok(true),
100            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
101            Err(e) => Err(e.into()),
102        }
103    }
104}
105
106/// Wraps [ObjectStore](object_store::ObjectStore)
107#[derive(Debug, Clone)]
108pub struct ObjectStore {
109    // Inner object store
110    pub inner: Arc<dyn OSObjectStore>,
111    scheme: String,
112    block_size: usize,
113    max_iop_size: u64,
114    /// Whether to use constant size upload parts for multipart uploads. This
115    /// is only necessary for Cloudflare R2.
116    pub use_constant_size_upload_parts: bool,
117    /// Whether we can assume that the list of files is lexically ordered. This
118    /// is true for object stores, but not for local filesystems.
119    pub list_is_lexically_ordered: bool,
120    io_parallelism: usize,
121    /// Number of times to retry a failed download
122    download_retry_count: usize,
123}
124
125impl DeepSizeOf for ObjectStore {
126    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
127        // We aren't counting `inner` here which is problematic but an ObjectStore
128        // shouldn't be too big.  The only exception might be the write cache but, if
129        // the writer cache has data, it means we're using it somewhere else that isn't
130        // a cache and so that doesn't really count.
131        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
132    }
133}
134
135impl std::fmt::Display for ObjectStore {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        write!(f, "ObjectStore({})", self.scheme)
138    }
139}
140
141pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
142    /// Wrap an object store with additional functionality
143    ///
144    /// The storage_options contain namespace information (e.g., azure_storage_account_name)
145    /// that wrappers may need for proper isolation
146    fn wrap(
147        &self,
148        original: Arc<dyn OSObjectStore>,
149        storage_options: Option<&HashMap<String, String>>,
150    ) -> Arc<dyn OSObjectStore>;
151}
152
153#[derive(Debug, Clone)]
154pub struct ChainedWrappingObjectStore {
155    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
156}
157
158impl ChainedWrappingObjectStore {
159    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
160        Self { wrappers }
161    }
162
163    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
164        self.wrappers.push(wrapper);
165    }
166}
167
168impl WrappingObjectStore for ChainedWrappingObjectStore {
169    fn wrap(
170        &self,
171        original: Arc<dyn OSObjectStore>,
172        storage_options: Option<&HashMap<String, String>>,
173    ) -> Arc<dyn OSObjectStore> {
174        self.wrappers
175            .iter()
176            .fold(original, |acc, wrapper| wrapper.wrap(acc, storage_options))
177    }
178}
179
180/// Parameters to create an [ObjectStore]
181///
182#[derive(Debug, Clone)]
183pub struct ObjectStoreParams {
184    pub block_size: Option<usize>,
185    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
186    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
187    pub s3_credentials_refresh_offset: Duration,
188    #[cfg(feature = "aws")]
189    pub aws_credentials: Option<AwsCredentialProvider>,
190    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
191    pub storage_options: Option<HashMap<String, String>>,
192    /// Use constant size upload parts for multipart uploads. Only necessary
193    /// for Cloudflare R2, which doesn't support variable size parts. When this
194    /// is false, max upload size is 2.5TB. When this is true, the max size is
195    /// 50GB.
196    pub use_constant_size_upload_parts: bool,
197    pub list_is_lexically_ordered: Option<bool>,
198}
199
200impl Default for ObjectStoreParams {
201    fn default() -> Self {
202        #[allow(deprecated)]
203        Self {
204            object_store: None,
205            block_size: None,
206            s3_credentials_refresh_offset: Duration::from_secs(60),
207            #[cfg(feature = "aws")]
208            aws_credentials: None,
209            object_store_wrapper: None,
210            storage_options: None,
211            use_constant_size_upload_parts: false,
212            list_is_lexically_ordered: None,
213        }
214    }
215}
216
217// We implement hash for caching
218impl std::hash::Hash for ObjectStoreParams {
219    #[allow(deprecated)]
220    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
221        // For hashing, we use pointer values for ObjectStore, S3 credentials, and wrapper
222        self.block_size.hash(state);
223        if let Some((store, url)) = &self.object_store {
224            Arc::as_ptr(store).hash(state);
225            url.hash(state);
226        }
227        self.s3_credentials_refresh_offset.hash(state);
228        #[cfg(feature = "aws")]
229        if let Some(aws_credentials) = &self.aws_credentials {
230            Arc::as_ptr(aws_credentials).hash(state);
231        }
232        if let Some(wrapper) = &self.object_store_wrapper {
233            Arc::as_ptr(wrapper).hash(state);
234        }
235        if let Some(storage_options) = &self.storage_options {
236            for (key, value) in storage_options {
237                key.hash(state);
238                value.hash(state);
239            }
240        }
241        self.use_constant_size_upload_parts.hash(state);
242        self.list_is_lexically_ordered.hash(state);
243    }
244}
245
246// We implement eq for caching
247impl Eq for ObjectStoreParams {}
248impl PartialEq for ObjectStoreParams {
249    #[allow(deprecated)]
250    fn eq(&self, other: &Self) -> bool {
251        #[cfg(feature = "aws")]
252        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
253            return false;
254        }
255
256        // For equality, we use pointer comparison for ObjectStore, S3 credentials, and wrapper
257        self.block_size == other.block_size
258            && self
259                .object_store
260                .as_ref()
261                .map(|(store, url)| (Arc::as_ptr(store), url))
262                == other
263                    .object_store
264                    .as_ref()
265                    .map(|(store, url)| (Arc::as_ptr(store), url))
266            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
267            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
268                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
269            && self.storage_options == other.storage_options
270            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
271            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
272    }
273}
274
275fn uri_to_url(uri: &str) -> Result<Url> {
276    match Url::parse(uri) {
277        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
278            // On Windows, the drive is parsed as a scheme
279            local_path_to_url(uri)
280        }
281        Ok(url) => Ok(url),
282        Err(_) => local_path_to_url(uri),
283    }
284}
285
286fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
287    let expanded = tilde(str_path.as_ref()).to_string();
288
289    let mut expanded_path = path_abs::PathAbs::new(expanded)
290        .unwrap()
291        .as_path()
292        .to_path_buf();
293    // path_abs::PathAbs::new(".") returns an empty string.
294    if let Some(s) = expanded_path.as_path().to_str() {
295        if s.is_empty() {
296            expanded_path = std::env::current_dir()?;
297        }
298    }
299
300    Ok(expanded_path)
301}
302
303fn local_path_to_url(str_path: &str) -> Result<Url> {
304    let expanded_path = expand_path(str_path)?;
305
306    Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
307        source: format!("Invalid table location: '{}'", str_path).into(),
308        location: location!(),
309    })
310}
311
312impl ObjectStore {
313    /// Parse from a string URI.
314    ///
315    /// Returns the ObjectStore instance and the absolute path to the object.
316    ///
317    /// This uses the default [ObjectStoreRegistry] to find the object store. To
318    /// allow for potential re-use of object store instances, it's recommended to
319    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
320    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
321        let registry = Arc::new(ObjectStoreRegistry::default());
322
323        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
324    }
325
326    /// Parse from a string URI.
327    ///
328    /// Returns the ObjectStore instance and the absolute path to the object.
329    pub async fn from_uri_and_params(
330        registry: Arc<ObjectStoreRegistry>,
331        uri: &str,
332        params: &ObjectStoreParams,
333    ) -> Result<(Arc<Self>, Path)> {
334        #[allow(deprecated)]
335        if let Some((store, path)) = params.object_store.as_ref() {
336            let mut inner = store.clone();
337            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
338                inner = wrapper.wrap(inner, params.storage_options.as_ref());
339            }
340            let store = Self {
341                inner,
342                scheme: path.scheme().to_string(),
343                block_size: params.block_size.unwrap_or(64 * 1024),
344                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
345                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
346                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
347                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
348                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
349            };
350            let path = Path::parse(path.path())?;
351            return Ok((Arc::new(store), path));
352        }
353        let url = uri_to_url(uri)?;
354        let store = registry.get_store(url.clone(), params).await?;
355        // We know the scheme is valid if we got a store back.
356        let provider = registry.get_provider(url.scheme()).expect_ok()?;
357        let path = provider.extract_path(&url)?;
358
359        Ok((store, path))
360    }
361
362    /// Extract the path component from a URI without initializing the object store.
363    ///
364    /// This is a synchronous operation that only parses the URI and extracts the path,
365    /// without creating or initializing any object store instance.
366    ///
367    /// # Arguments
368    ///
369    /// * `registry` - The object store registry to get the provider
370    /// * `uri` - The URI to extract the path from
371    ///
372    /// # Returns
373    ///
374    /// The extracted path component
375    pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
376        let url = uri_to_url(uri)?;
377        let provider = registry.get_provider(url.scheme()).ok_or_else(|| {
378            Error::invalid_input(format!("Unknown scheme: {}", url.scheme()), location!())
379        })?;
380        provider.extract_path(&url)
381    }
382
383    #[deprecated(note = "Use `from_uri` instead")]
384    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
385        Self::from_uri_and_params(
386            Arc::new(ObjectStoreRegistry::default()),
387            str_path,
388            &Default::default(),
389        )
390        .now_or_never()
391        .unwrap()
392    }
393
394    /// Local object store.
395    pub fn local() -> Self {
396        let provider = FileStoreProvider;
397        provider
398            .new_store(Url::parse("file:///").unwrap(), &Default::default())
399            .now_or_never()
400            .unwrap()
401            .unwrap()
402    }
403
404    /// Create a in-memory object store directly for testing.
405    pub fn memory() -> Self {
406        let provider = MemoryStoreProvider;
407        provider
408            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
409            .now_or_never()
410            .unwrap()
411            .unwrap()
412    }
413
414    /// Returns true if the object store pointed to a local file system.
415    pub fn is_local(&self) -> bool {
416        self.scheme == "file"
417    }
418
419    pub fn is_cloud(&self) -> bool {
420        self.scheme != "file" && self.scheme != "memory"
421    }
422
423    pub fn scheme(&self) -> &str {
424        &self.scheme
425    }
426
427    pub fn block_size(&self) -> usize {
428        self.block_size
429    }
430
431    pub fn max_iop_size(&self) -> u64 {
432        self.max_iop_size
433    }
434
435    pub fn io_parallelism(&self) -> usize {
436        std::env::var("LANCE_IO_THREADS")
437            .map(|val| val.parse::<usize>().unwrap())
438            .unwrap_or(self.io_parallelism)
439    }
440
441    /// Open a file for path.
442    ///
443    /// Parameters
444    /// - ``path``: Absolute path to the file.
445    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
446        match self.scheme.as_str() {
447            "file" => LocalObjectReader::open(path, self.block_size, None).await,
448            _ => Ok(Box::new(CloudObjectReader::new(
449                self.inner.clone(),
450                path.clone(),
451                self.block_size,
452                None,
453                self.download_retry_count,
454            )?)),
455        }
456    }
457
458    /// Open a reader for a file with known size.
459    ///
460    /// This size may either have been retrieved from a list operation or
461    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
462    /// call.
463    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
464        // If we know the file is really small, we can read the whole thing
465        // as a single request.
466        if known_size <= self.block_size {
467            return Ok(Box::new(SmallReader::new(
468                self.inner.clone(),
469                path.clone(),
470                self.download_retry_count,
471                known_size,
472            )));
473        }
474
475        match self.scheme.as_str() {
476            "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
477            _ => Ok(Box::new(CloudObjectReader::new(
478                self.inner.clone(),
479                path.clone(),
480                self.block_size,
481                Some(known_size),
482                self.download_retry_count,
483            )?)),
484        }
485    }
486
487    /// Create an [ObjectWriter] from local [std::path::Path]
488    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
489        let object_store = Self::local();
490        let absolute_path = expand_path(path.to_string_lossy())?;
491        let os_path = Path::from_absolute_path(absolute_path)?;
492        object_store.create(&os_path).await
493    }
494
495    /// Open an [Reader] from local [std::path::Path]
496    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
497        let object_store = Self::local();
498        let absolute_path = expand_path(path.to_string_lossy())?;
499        let os_path = Path::from_absolute_path(absolute_path)?;
500        object_store.open(&os_path).await
501    }
502
503    /// Create a new file.
504    pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
505        ObjectWriter::new(self, path).await
506    }
507
508    /// A helper function to create a file and write content to it.
509    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
510        let mut writer = self.create(path).await?;
511        writer.write_all(content).await?;
512        writer.shutdown().await
513    }
514
515    pub async fn delete(&self, path: &Path) -> Result<()> {
516        self.inner.delete(path).await?;
517        Ok(())
518    }
519
520    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
521        if self.is_local() {
522            // Use std::fs::copy for local filesystem to support cross-filesystem copies
523            return super::local::copy_file(from, to);
524        }
525        Ok(self.inner.copy(from, to).await?)
526    }
527
528    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
529    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
530        let path = dir_path.into();
531        let path = Path::parse(&path)?;
532        let output = self.inner.list_with_delimiter(Some(&path)).await?;
533        Ok(output
534            .common_prefixes
535            .iter()
536            .chain(output.objects.iter().map(|o| &o.location))
537            .map(|s| s.filename().unwrap().to_string())
538            .collect())
539    }
540
541    pub fn list(
542        &self,
543        path: Option<Path>,
544    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
545        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
546    }
547
548    /// Read all files (start from base directory) recursively
549    ///
550    /// unmodified_since can be specified to only return files that have not been modified since the given time.
551    pub fn read_dir_all<'a, 'b>(
552        &'a self,
553        dir_path: impl Into<&'b Path> + Send,
554        unmodified_since: Option<DateTime<Utc>>,
555    ) -> BoxStream<'a, Result<ObjectMeta>> {
556        self.inner.read_dir_all(dir_path, unmodified_since)
557    }
558
559    /// Remove a directory recursively.
560    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
561        let path = dir_path.into();
562        let path = Path::parse(&path)?;
563
564        if self.is_local() {
565            // Local file system needs to delete directories as well.
566            return super::local::remove_dir_all(&path);
567        }
568        let sub_entries = self
569            .inner
570            .list(Some(&path))
571            .map(|m| m.map(|meta| meta.location))
572            .boxed();
573        self.inner
574            .delete_stream(sub_entries)
575            .try_collect::<Vec<_>>()
576            .await?;
577        Ok(())
578    }
579
580    pub fn remove_stream<'a>(
581        &'a self,
582        locations: BoxStream<'a, Result<Path>>,
583    ) -> BoxStream<'a, Result<Path>> {
584        self.inner
585            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
586            .err_into::<Error>()
587            .boxed()
588    }
589
590    /// Check a file exists.
591    pub async fn exists(&self, path: &Path) -> Result<bool> {
592        match self.inner.head(path).await {
593            Ok(_) => Ok(true),
594            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
595            Err(e) => Err(e.into()),
596        }
597    }
598
599    /// Get file size.
600    pub async fn size(&self, path: &Path) -> Result<u64> {
601        Ok(self.inner.head(path).await?.size)
602    }
603
604    /// Convenience function to open a reader and read all the bytes
605    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
606        let reader = self.open(path).await?;
607        Ok(reader.get_all().await?)
608    }
609
610    /// Convenience function open a reader and make a single request
611    ///
612    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
613    /// and then call [`Reader::get_range`] multiple times.
614    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
615        let reader = self.open(path).await?;
616        Ok(reader.get_range(range).await?)
617    }
618}
619
620/// Options that can be set for multiple object stores
621#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
622pub enum LanceConfigKey {
623    /// Number of times to retry a download that fails
624    DownloadRetryCount,
625}
626
627impl FromStr for LanceConfigKey {
628    type Err = Error;
629
630    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
631        match s.to_ascii_lowercase().as_str() {
632            "download_retry_count" => Ok(Self::DownloadRetryCount),
633            _ => Err(Error::InvalidInput {
634                source: format!("Invalid LanceConfigKey: {}", s).into(),
635                location: location!(),
636            }),
637        }
638    }
639}
640
641#[derive(Clone, Debug, Default)]
642pub struct StorageOptions(pub HashMap<String, String>);
643
644impl StorageOptions {
645    /// Create a new instance of [`StorageOptions`]
646    pub fn new(options: HashMap<String, String>) -> Self {
647        let mut options = options;
648        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
649            options.insert("allow_http".into(), value);
650        }
651        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
652            options.insert("allow_http".into(), value);
653        }
654        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
655            options.insert("allow_http".into(), value);
656        }
657        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
658            options.insert("client_max_retries".into(), value);
659        }
660        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
661            options.insert("client_retry_timeout".into(), value);
662        }
663        Self(options)
664    }
665
666    /// Denotes if unsecure connections via http are allowed
667    pub fn allow_http(&self) -> bool {
668        self.0.iter().any(|(key, value)| {
669            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
670        })
671    }
672
673    /// Number of times to retry a download that fails
674    pub fn download_retry_count(&self) -> usize {
675        self.0
676            .iter()
677            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
678            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
679            .unwrap_or(3)
680    }
681
682    /// Max retry times to set in RetryConfig for object store client
683    pub fn client_max_retries(&self) -> usize {
684        self.0
685            .iter()
686            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
687            .and_then(|(_, value)| value.parse::<usize>().ok())
688            .unwrap_or(10)
689    }
690
691    /// Seconds of timeout to set in RetryConfig for object store client
692    pub fn client_retry_timeout(&self) -> u64 {
693        self.0
694            .iter()
695            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
696            .and_then(|(_, value)| value.parse::<u64>().ok())
697            .unwrap_or(180)
698    }
699
700    pub fn get(&self, key: &str) -> Option<&String> {
701        self.0.get(key)
702    }
703}
704
705impl From<HashMap<String, String>> for StorageOptions {
706    fn from(value: HashMap<String, String>) -> Self {
707        Self::new(value)
708    }
709}
710
711impl ObjectStore {
712    #[allow(clippy::too_many_arguments)]
713    pub fn new(
714        store: Arc<DynObjectStore>,
715        location: Url,
716        block_size: Option<usize>,
717        wrapper: Option<Arc<dyn WrappingObjectStore>>,
718        use_constant_size_upload_parts: bool,
719        list_is_lexically_ordered: bool,
720        io_parallelism: usize,
721        download_retry_count: usize,
722        storage_options: Option<&HashMap<String, String>>,
723    ) -> Self {
724        let scheme = location.scheme();
725        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
726
727        let store = match wrapper {
728            Some(wrapper) => wrapper.wrap(store, storage_options),
729            None => store,
730        };
731
732        Self {
733            inner: store,
734            scheme: scheme.into(),
735            block_size,
736            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
737            use_constant_size_upload_parts,
738            list_is_lexically_ordered,
739            io_parallelism,
740            download_retry_count,
741        }
742    }
743}
744
745fn infer_block_size(scheme: &str) -> usize {
746    // Block size: On local file systems, we use 4KB block size. On cloud
747    // object stores, we use 64KB block size. This is generally the largest
748    // block size where we don't see a latency penalty.
749    match scheme {
750        "file" => 4 * 1024,
751        _ => 64 * 1024,
752    }
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
759    use object_store::memory::InMemory;
760    use rstest::rstest;
761    use std::env::set_current_dir;
762    use std::fs::{create_dir_all, write};
763    use std::path::Path as StdPath;
764    use std::sync::atomic::{AtomicBool, Ordering};
765
766    /// Write test content to file.
767    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
768        let expanded = tilde(path_str).to_string();
769        let path = StdPath::new(&expanded);
770        std::fs::create_dir_all(path.parent().unwrap())?;
771        write(path, contents)
772    }
773
774    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
775        let test_file_store = store.open(path).await.unwrap();
776        let size = test_file_store.size().await.unwrap();
777        let bytes = test_file_store.get_range(0..size).await.unwrap();
778        let contents = String::from_utf8(bytes.to_vec()).unwrap();
779        Ok(contents)
780    }
781
782    #[tokio::test]
783    async fn test_absolute_paths() {
784        let tmp_path = TempStrDir::default();
785        write_to_file(
786            &format!("{tmp_path}/bar/foo.lance/test_file"),
787            "TEST_CONTENT",
788        )
789        .unwrap();
790
791        // test a few variations of the same path
792        for uri in &[
793            format!("{tmp_path}/bar/foo.lance"),
794            format!("{tmp_path}/./bar/foo.lance"),
795            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
796        ] {
797            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
798            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
799                .await
800                .unwrap();
801            assert_eq!(contents, "TEST_CONTENT");
802        }
803    }
804
805    #[tokio::test]
806    async fn test_cloud_paths() {
807        let uri = "s3://bucket/foo.lance";
808        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
809        assert_eq!(store.scheme, "s3");
810        assert_eq!(path.to_string(), "foo.lance");
811
812        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
813            .await
814            .unwrap();
815        assert_eq!(store.scheme, "s3");
816        assert_eq!(path.to_string(), "foo.lance");
817
818        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
819            .await
820            .unwrap();
821        assert_eq!(store.scheme, "gs");
822        assert_eq!(path.to_string(), "foo.lance");
823    }
824
825    async fn test_block_size_used_test_helper(
826        uri: &str,
827        storage_options: Option<HashMap<String, String>>,
828        default_expected_block_size: usize,
829    ) {
830        // Test the default
831        let registry = Arc::new(ObjectStoreRegistry::default());
832        let params = ObjectStoreParams {
833            storage_options: storage_options.clone(),
834            ..ObjectStoreParams::default()
835        };
836        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
837            .await
838            .unwrap();
839        assert_eq!(store.block_size, default_expected_block_size);
840
841        // Ensure param is used
842        let registry = Arc::new(ObjectStoreRegistry::default());
843        let params = ObjectStoreParams {
844            block_size: Some(1024),
845            storage_options: storage_options.clone(),
846            ..ObjectStoreParams::default()
847        };
848        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
849            .await
850            .unwrap();
851        assert_eq!(store.block_size, 1024);
852    }
853
854    #[rstest]
855    #[case("s3://bucket/foo.lance", None)]
856    #[case("gs://bucket/foo.lance", None)]
857    #[case("az://account/bucket/foo.lance",
858      Some(HashMap::from([
859            (String::from("account_name"), String::from("account")),
860            (String::from("container_name"), String::from("container"))
861           ])))]
862    #[tokio::test]
863    async fn test_block_size_used_cloud(
864        #[case] uri: &str,
865        #[case] storage_options: Option<HashMap<String, String>>,
866    ) {
867        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
868    }
869
870    #[rstest]
871    #[case("file")]
872    #[case("file-object-store")]
873    #[case("memory:///bucket/foo.lance")]
874    #[tokio::test]
875    async fn test_block_size_used_file(#[case] prefix: &str) {
876        let tmp_path = TempStrDir::default();
877        let path = format!("{tmp_path}/bar/foo.lance/test_file");
878        write_to_file(&path, "URL").unwrap();
879        let uri = format!("{prefix}:///{path}");
880        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
881    }
882
883    #[tokio::test]
884    async fn test_relative_paths() {
885        let tmp_path = TempStrDir::default();
886        write_to_file(
887            &format!("{tmp_path}/bar/foo.lance/test_file"),
888            "RELATIVE_URL",
889        )
890        .unwrap();
891
892        set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
893        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
894
895        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
896            .await
897            .unwrap();
898        assert_eq!(contents, "RELATIVE_URL");
899    }
900
901    #[tokio::test]
902    async fn test_tilde_expansion() {
903        let uri = "~/foo.lance";
904        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
905        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
906        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
907            .await
908            .unwrap();
909        assert_eq!(contents, "TILDE");
910    }
911
912    #[tokio::test]
913    async fn test_read_directory() {
914        let path = TempStdDir::default();
915        create_dir_all(path.join("foo").join("bar")).unwrap();
916        create_dir_all(path.join("foo").join("zoo")).unwrap();
917        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
918        write_to_file(
919            path.join("foo").join("test_file").to_str().unwrap(),
920            "read_dir",
921        )
922        .unwrap();
923        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
924
925        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
926        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
927    }
928
929    #[tokio::test]
930    async fn test_delete_directory() {
931        let path = TempStdDir::default();
932        create_dir_all(path.join("foo").join("bar")).unwrap();
933        create_dir_all(path.join("foo").join("zoo")).unwrap();
934        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
935        write_to_file(
936            path.join("foo")
937                .join("bar")
938                .join("test_file")
939                .to_str()
940                .unwrap(),
941            "delete",
942        )
943        .unwrap();
944        write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
945        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
946        store.remove_dir_all(base.child("foo")).await.unwrap();
947
948        assert!(!path.join("foo").exists());
949    }
950
951    #[derive(Debug)]
952    struct TestWrapper {
953        called: AtomicBool,
954
955        return_value: Arc<dyn OSObjectStore>,
956    }
957
958    impl WrappingObjectStore for TestWrapper {
959        fn wrap(
960            &self,
961            _original: Arc<dyn OSObjectStore>,
962            _storage_options: Option<&HashMap<String, String>>,
963        ) -> Arc<dyn OSObjectStore> {
964            self.called.store(true, Ordering::Relaxed);
965
966            // return a mocked value so we can check if the final store is the one we expect
967            self.return_value.clone()
968        }
969    }
970
971    impl TestWrapper {
972        fn called(&self) -> bool {
973            self.called.load(Ordering::Relaxed)
974        }
975    }
976
977    #[tokio::test]
978    async fn test_wrapping_object_store_option_is_used() {
979        // Make a store for the inner store first
980        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
981        let registry = Arc::new(ObjectStoreRegistry::default());
982
983        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
984
985        let wrapper = Arc::new(TestWrapper {
986            called: AtomicBool::new(false),
987            return_value: mock_inner_store.clone(),
988        });
989
990        let params = ObjectStoreParams {
991            object_store_wrapper: Some(wrapper.clone()),
992            ..ObjectStoreParams::default()
993        };
994
995        // not called yet
996        assert!(!wrapper.called());
997
998        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
999            .await
1000            .unwrap();
1001
1002        // called after construction
1003        assert!(wrapper.called());
1004
1005        // hard to compare two trait pointers as the point to vtables
1006        // using the ref count as a proxy to make sure that the store is correctly kept
1007        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1008    }
1009
1010    #[tokio::test]
1011    async fn test_local_paths() {
1012        let file_path = TempStdFile::default();
1013        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1014        writer.write_all(b"LOCAL").await.unwrap();
1015        writer.shutdown().await.unwrap();
1016
1017        let reader = ObjectStore::open_local(&file_path).await.unwrap();
1018        let buf = reader.get_range(0..5).await.unwrap();
1019        assert_eq!(buf.as_ref(), b"LOCAL");
1020    }
1021
1022    #[tokio::test]
1023    async fn test_read_one() {
1024        let file_path = TempStdFile::default();
1025        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1026        writer.write_all(b"LOCAL").await.unwrap();
1027        writer.shutdown().await.unwrap();
1028
1029        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1030        let obj_store = ObjectStore::local();
1031        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1032        assert_eq!(buf.as_ref(), b"LOCAL");
1033
1034        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1035        assert_eq!(buf.as_ref(), b"LOCAL");
1036    }
1037
1038    #[tokio::test]
1039    #[cfg(windows)]
1040    async fn test_windows_paths() {
1041        use std::path::Component;
1042        use std::path::Prefix;
1043        use std::path::Prefix::*;
1044
1045        fn get_path_prefix(path: &StdPath) -> Prefix {
1046            match path.components().next().unwrap() {
1047                Component::Prefix(prefix_component) => prefix_component.kind(),
1048                _ => panic!(),
1049            }
1050        }
1051
1052        fn get_drive_letter(prefix: Prefix) -> String {
1053            match prefix {
1054                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1055                _ => panic!(),
1056            }
1057        }
1058
1059        let tmp_path = TempStdFile::default();
1060        let prefix = get_path_prefix(&tmp_path);
1061        let drive_letter = get_drive_letter(prefix);
1062
1063        write_to_file(
1064            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1065            "WINDOWS",
1066        )
1067        .unwrap();
1068
1069        for uri in &[
1070            format!("{drive_letter}:/test_folder/test.lance"),
1071            format!("{drive_letter}:\\test_folder\\test.lance"),
1072        ] {
1073            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1074            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1075                .await
1076                .unwrap();
1077            assert_eq!(contents, "WINDOWS");
1078        }
1079    }
1080
1081    #[tokio::test]
1082    async fn test_cross_filesystem_copy() {
1083        // Create two temporary directories that simulate different filesystems
1084        let source_dir = TempStdDir::default();
1085        let dest_dir = TempStdDir::default();
1086
1087        // Create a test file in the source directory
1088        let source_file_name = "test_file.txt";
1089        let source_file = source_dir.join(source_file_name);
1090        std::fs::write(&source_file, b"test content").unwrap();
1091
1092        // Create ObjectStore for local filesystem
1093        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1094            .await
1095            .unwrap();
1096
1097        // Create paths relative to the ObjectStore base
1098        let from_path = base_path.child(source_file_name);
1099
1100        // Use object_store::Path::parse for the destination
1101        let dest_file = dest_dir.join("copied_file.txt");
1102        let dest_str = dest_file.to_str().unwrap();
1103        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1104
1105        // Perform the copy operation
1106        store.copy(&from_path, &to_path).await.unwrap();
1107
1108        // Verify the file was copied correctly
1109        assert!(dest_file.exists());
1110        let copied_content = std::fs::read(&dest_file).unwrap();
1111        assert_eq!(copied_content, b"test content");
1112    }
1113
1114    #[tokio::test]
1115    async fn test_copy_creates_parent_directories() {
1116        let source_dir = TempStdDir::default();
1117        let dest_dir = TempStdDir::default();
1118
1119        // Create a test file in the source directory
1120        let source_file_name = "test_file.txt";
1121        let source_file = source_dir.join(source_file_name);
1122        std::fs::write(&source_file, b"test content").unwrap();
1123
1124        // Create ObjectStore for local filesystem
1125        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1126            .await
1127            .unwrap();
1128
1129        // Create paths
1130        let from_path = base_path.child(source_file_name);
1131
1132        // Create destination with nested directories that don't exist yet
1133        let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1134        let dest_str = dest_file.to_str().unwrap();
1135        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1136
1137        // Perform the copy operation - should create parent directories
1138        store.copy(&from_path, &to_path).await.unwrap();
1139
1140        // Verify the file was copied correctly and directories were created
1141        assert!(dest_file.exists());
1142        assert!(dest_file.parent().unwrap().exists());
1143        let copied_content = std::fs::read(&dest_file).unwrap();
1144        assert_eq!(copied_content, b"test content");
1145    }
1146}