lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::WriteResult;
41use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
42use lance_core::{Error, Result};
43
44// Local disks tend to do fine with a few threads
45// Note: the number of threads here also impacts the number of files
46// we need to read in some situations.  So keeping this at 8 keeps the
47// RAM on our scanner down.
48pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
49// Cloud disks often need many many threads to saturate the network
50pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
51
52const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
53#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
54const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
55
56pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
57    std::env::var("LANCE_MAX_IOP_SIZE")
58        .map(|val| val.parse().unwrap())
59        .unwrap_or(16 * 1024 * 1024)
60});
61
62pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
63
64pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
65pub use storage_options::{
66    LanceNamespaceStorageOptionsProvider, StorageOptionsProvider, EXPIRES_AT_MILLIS_KEY,
67};
68
69#[async_trait]
70pub trait ObjectStoreExt {
71    /// Returns true if the file exists.
72    async fn exists(&self, path: &Path) -> Result<bool>;
73
74    /// Read all files (start from base directory) recursively
75    ///
76    /// unmodified_since can be specified to only return files that have not been modified since the given time.
77    fn read_dir_all<'a, 'b>(
78        &'a self,
79        dir_path: impl Into<&'b Path> + Send,
80        unmodified_since: Option<DateTime<Utc>>,
81    ) -> BoxStream<'a, Result<ObjectMeta>>;
82}
83
84#[async_trait]
85impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
86    fn read_dir_all<'a, 'b>(
87        &'a self,
88        dir_path: impl Into<&'b Path> + Send,
89        unmodified_since: Option<DateTime<Utc>>,
90    ) -> BoxStream<'a, Result<ObjectMeta>> {
91        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
92        if let Some(unmodified_since_val) = unmodified_since {
93            output
94                .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
95                .boxed()
96        } else {
97            output.boxed()
98        }
99    }
100
101    async fn exists(&self, path: &Path) -> Result<bool> {
102        match self.head(path).await {
103            Ok(_) => Ok(true),
104            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
105            Err(e) => Err(e.into()),
106        }
107    }
108}
109
110/// Wraps [ObjectStore](object_store::ObjectStore)
111#[derive(Debug, Clone)]
112pub struct ObjectStore {
113    // Inner object store
114    pub inner: Arc<dyn OSObjectStore>,
115    scheme: String,
116    block_size: usize,
117    max_iop_size: u64,
118    /// Whether to use constant size upload parts for multipart uploads. This
119    /// is only necessary for Cloudflare R2.
120    pub use_constant_size_upload_parts: bool,
121    /// Whether we can assume that the list of files is lexically ordered. This
122    /// is true for object stores, but not for local filesystems.
123    pub list_is_lexically_ordered: bool,
124    io_parallelism: usize,
125    /// Number of times to retry a failed download
126    download_retry_count: usize,
127}
128
129impl DeepSizeOf for ObjectStore {
130    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
131        // We aren't counting `inner` here which is problematic but an ObjectStore
132        // shouldn't be too big.  The only exception might be the write cache but, if
133        // the writer cache has data, it means we're using it somewhere else that isn't
134        // a cache and so that doesn't really count.
135        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
136    }
137}
138
139impl std::fmt::Display for ObjectStore {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        write!(f, "ObjectStore({})", self.scheme)
142    }
143}
144
145pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
146    /// Wrap an object store with additional functionality
147    ///
148    /// The storage_options contain namespace information (e.g., azure_storage_account_name)
149    /// that wrappers may need for proper isolation
150    fn wrap(
151        &self,
152        original: Arc<dyn OSObjectStore>,
153        storage_options: Option<&HashMap<String, String>>,
154    ) -> Arc<dyn OSObjectStore>;
155}
156
157#[derive(Debug, Clone)]
158pub struct ChainedWrappingObjectStore {
159    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
160}
161
162impl ChainedWrappingObjectStore {
163    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
164        Self { wrappers }
165    }
166
167    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
168        self.wrappers.push(wrapper);
169    }
170}
171
172impl WrappingObjectStore for ChainedWrappingObjectStore {
173    fn wrap(
174        &self,
175        original: Arc<dyn OSObjectStore>,
176        storage_options: Option<&HashMap<String, String>>,
177    ) -> Arc<dyn OSObjectStore> {
178        self.wrappers
179            .iter()
180            .fold(original, |acc, wrapper| wrapper.wrap(acc, storage_options))
181    }
182}
183
184/// Parameters to create an [ObjectStore]
185///
186#[derive(Debug, Clone)]
187pub struct ObjectStoreParams {
188    pub block_size: Option<usize>,
189    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
190    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
191    pub s3_credentials_refresh_offset: Duration,
192    #[cfg(feature = "aws")]
193    pub aws_credentials: Option<AwsCredentialProvider>,
194    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
195    pub storage_options: Option<HashMap<String, String>>,
196    /// Dynamic storage options provider for automatic credential refresh
197    pub storage_options_provider: Option<Arc<dyn StorageOptionsProvider>>,
198    /// Use constant size upload parts for multipart uploads. Only necessary
199    /// for Cloudflare R2, which doesn't support variable size parts. When this
200    /// is false, max upload size is 2.5TB. When this is true, the max size is
201    /// 50GB.
202    pub use_constant_size_upload_parts: bool,
203    pub list_is_lexically_ordered: Option<bool>,
204}
205
206impl Default for ObjectStoreParams {
207    fn default() -> Self {
208        #[allow(deprecated)]
209        Self {
210            object_store: None,
211            block_size: None,
212            s3_credentials_refresh_offset: Duration::from_secs(60),
213            #[cfg(feature = "aws")]
214            aws_credentials: None,
215            object_store_wrapper: None,
216            storage_options: None,
217            storage_options_provider: None,
218            use_constant_size_upload_parts: false,
219            list_is_lexically_ordered: None,
220        }
221    }
222}
223
224// We implement hash for caching
225impl std::hash::Hash for ObjectStoreParams {
226    #[allow(deprecated)]
227    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
228        // For hashing, we use pointer values for ObjectStore, S3 credentials, wrapper, and storage options provider
229        self.block_size.hash(state);
230        if let Some((store, url)) = &self.object_store {
231            Arc::as_ptr(store).hash(state);
232            url.hash(state);
233        }
234        self.s3_credentials_refresh_offset.hash(state);
235        #[cfg(feature = "aws")]
236        if let Some(aws_credentials) = &self.aws_credentials {
237            Arc::as_ptr(aws_credentials).hash(state);
238        }
239        if let Some(wrapper) = &self.object_store_wrapper {
240            Arc::as_ptr(wrapper).hash(state);
241        }
242        if let Some(storage_options) = &self.storage_options {
243            for (key, value) in storage_options {
244                key.hash(state);
245                value.hash(state);
246            }
247        }
248        if let Some(provider) = &self.storage_options_provider {
249            provider.provider_id().hash(state);
250        }
251        self.use_constant_size_upload_parts.hash(state);
252        self.list_is_lexically_ordered.hash(state);
253    }
254}
255
256// We implement eq for caching
257impl Eq for ObjectStoreParams {}
258impl PartialEq for ObjectStoreParams {
259    #[allow(deprecated)]
260    fn eq(&self, other: &Self) -> bool {
261        #[cfg(feature = "aws")]
262        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
263            return false;
264        }
265
266        // For equality, we use pointer comparison for ObjectStore, S3 credentials, wrapper
267        // For storage_options_provider, we use provider_id() for semantic equality
268        self.block_size == other.block_size
269            && self
270                .object_store
271                .as_ref()
272                .map(|(store, url)| (Arc::as_ptr(store), url))
273                == other
274                    .object_store
275                    .as_ref()
276                    .map(|(store, url)| (Arc::as_ptr(store), url))
277            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
278            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
279                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
280            && self.storage_options == other.storage_options
281            && self
282                .storage_options_provider
283                .as_ref()
284                .map(|p| p.provider_id())
285                == other
286                    .storage_options_provider
287                    .as_ref()
288                    .map(|p| p.provider_id())
289            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
290            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
291    }
292}
293
294fn uri_to_url(uri: &str) -> Result<Url> {
295    match Url::parse(uri) {
296        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
297            // On Windows, the drive is parsed as a scheme
298            local_path_to_url(uri)
299        }
300        Ok(url) => Ok(url),
301        Err(_) => local_path_to_url(uri),
302    }
303}
304
305fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
306    let expanded = tilde(str_path.as_ref()).to_string();
307
308    let mut expanded_path = path_abs::PathAbs::new(expanded)
309        .unwrap()
310        .as_path()
311        .to_path_buf();
312    // path_abs::PathAbs::new(".") returns an empty string.
313    if let Some(s) = expanded_path.as_path().to_str() {
314        if s.is_empty() {
315            expanded_path = std::env::current_dir()?;
316        }
317    }
318
319    Ok(expanded_path)
320}
321
322fn local_path_to_url(str_path: &str) -> Result<Url> {
323    let expanded_path = expand_path(str_path)?;
324
325    Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
326        source: format!("Invalid table location: '{}'", str_path).into(),
327        location: location!(),
328    })
329}
330
331impl ObjectStore {
332    /// Parse from a string URI.
333    ///
334    /// Returns the ObjectStore instance and the absolute path to the object.
335    ///
336    /// This uses the default [ObjectStoreRegistry] to find the object store. To
337    /// allow for potential re-use of object store instances, it's recommended to
338    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
339    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
340        let registry = Arc::new(ObjectStoreRegistry::default());
341
342        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
343    }
344
345    /// Parse from a string URI.
346    ///
347    /// Returns the ObjectStore instance and the absolute path to the object.
348    pub async fn from_uri_and_params(
349        registry: Arc<ObjectStoreRegistry>,
350        uri: &str,
351        params: &ObjectStoreParams,
352    ) -> Result<(Arc<Self>, Path)> {
353        #[allow(deprecated)]
354        if let Some((store, path)) = params.object_store.as_ref() {
355            let mut inner = store.clone();
356            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
357                inner = wrapper.wrap(inner, params.storage_options.as_ref());
358            }
359            let store = Self {
360                inner,
361                scheme: path.scheme().to_string(),
362                block_size: params.block_size.unwrap_or(64 * 1024),
363                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
364                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
365                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
366                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
367                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
368            };
369            let path = Path::parse(path.path())?;
370            return Ok((Arc::new(store), path));
371        }
372        let url = uri_to_url(uri)?;
373        let store = registry.get_store(url.clone(), params).await?;
374        // We know the scheme is valid if we got a store back.
375        let provider = registry.get_provider(url.scheme()).expect_ok()?;
376        let path = provider.extract_path(&url)?;
377
378        Ok((store, path))
379    }
380
381    /// Extract the path component from a URI without initializing the object store.
382    ///
383    /// This is a synchronous operation that only parses the URI and extracts the path,
384    /// without creating or initializing any object store instance.
385    ///
386    /// # Arguments
387    ///
388    /// * `registry` - The object store registry to get the provider
389    /// * `uri` - The URI to extract the path from
390    ///
391    /// # Returns
392    ///
393    /// The extracted path component
394    pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
395        let url = uri_to_url(uri)?;
396        let provider = registry.get_provider(url.scheme()).ok_or_else(|| {
397            Error::invalid_input(format!("Unknown scheme: {}", url.scheme()), location!())
398        })?;
399        provider.extract_path(&url)
400    }
401
402    #[deprecated(note = "Use `from_uri` instead")]
403    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
404        Self::from_uri_and_params(
405            Arc::new(ObjectStoreRegistry::default()),
406            str_path,
407            &Default::default(),
408        )
409        .now_or_never()
410        .unwrap()
411    }
412
413    /// Local object store.
414    pub fn local() -> Self {
415        let provider = FileStoreProvider;
416        provider
417            .new_store(Url::parse("file:///").unwrap(), &Default::default())
418            .now_or_never()
419            .unwrap()
420            .unwrap()
421    }
422
423    /// Create a in-memory object store directly for testing.
424    pub fn memory() -> Self {
425        let provider = MemoryStoreProvider;
426        provider
427            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
428            .now_or_never()
429            .unwrap()
430            .unwrap()
431    }
432
433    /// Returns true if the object store pointed to a local file system.
434    pub fn is_local(&self) -> bool {
435        self.scheme == "file"
436    }
437
438    pub fn is_cloud(&self) -> bool {
439        self.scheme != "file" && self.scheme != "memory"
440    }
441
442    pub fn scheme(&self) -> &str {
443        &self.scheme
444    }
445
446    pub fn block_size(&self) -> usize {
447        self.block_size
448    }
449
450    pub fn max_iop_size(&self) -> u64 {
451        self.max_iop_size
452    }
453
454    pub fn io_parallelism(&self) -> usize {
455        std::env::var("LANCE_IO_THREADS")
456            .map(|val| val.parse::<usize>().unwrap())
457            .unwrap_or(self.io_parallelism)
458    }
459
460    /// Open a file for path.
461    ///
462    /// Parameters
463    /// - ``path``: Absolute path to the file.
464    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
465        match self.scheme.as_str() {
466            "file" => LocalObjectReader::open(path, self.block_size, None).await,
467            _ => Ok(Box::new(CloudObjectReader::new(
468                self.inner.clone(),
469                path.clone(),
470                self.block_size,
471                None,
472                self.download_retry_count,
473            )?)),
474        }
475    }
476
477    /// Open a reader for a file with known size.
478    ///
479    /// This size may either have been retrieved from a list operation or
480    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
481    /// call.
482    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
483        // If we know the file is really small, we can read the whole thing
484        // as a single request.
485        if known_size <= self.block_size {
486            return Ok(Box::new(SmallReader::new(
487                self.inner.clone(),
488                path.clone(),
489                self.download_retry_count,
490                known_size,
491            )));
492        }
493
494        match self.scheme.as_str() {
495            "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
496            _ => Ok(Box::new(CloudObjectReader::new(
497                self.inner.clone(),
498                path.clone(),
499                self.block_size,
500                Some(known_size),
501                self.download_retry_count,
502            )?)),
503        }
504    }
505
506    /// Create an [ObjectWriter] from local [std::path::Path]
507    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
508        let object_store = Self::local();
509        let absolute_path = expand_path(path.to_string_lossy())?;
510        let os_path = Path::from_absolute_path(absolute_path)?;
511        object_store.create(&os_path).await
512    }
513
514    /// Open an [Reader] from local [std::path::Path]
515    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
516        let object_store = Self::local();
517        let absolute_path = expand_path(path.to_string_lossy())?;
518        let os_path = Path::from_absolute_path(absolute_path)?;
519        object_store.open(&os_path).await
520    }
521
522    /// Create a new file.
523    pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
524        ObjectWriter::new(self, path).await
525    }
526
527    /// A helper function to create a file and write content to it.
528    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
529        let mut writer = self.create(path).await?;
530        writer.write_all(content).await?;
531        writer.shutdown().await
532    }
533
534    pub async fn delete(&self, path: &Path) -> Result<()> {
535        self.inner.delete(path).await?;
536        Ok(())
537    }
538
539    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
540        if self.is_local() {
541            // Use std::fs::copy for local filesystem to support cross-filesystem copies
542            return super::local::copy_file(from, to);
543        }
544        Ok(self.inner.copy(from, to).await?)
545    }
546
547    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
548    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
549        let path = dir_path.into();
550        let path = Path::parse(&path)?;
551        let output = self.inner.list_with_delimiter(Some(&path)).await?;
552        Ok(output
553            .common_prefixes
554            .iter()
555            .chain(output.objects.iter().map(|o| &o.location))
556            .map(|s| s.filename().unwrap().to_string())
557            .collect())
558    }
559
560    pub fn list(
561        &self,
562        path: Option<Path>,
563    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
564        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
565    }
566
567    /// Read all files (start from base directory) recursively
568    ///
569    /// unmodified_since can be specified to only return files that have not been modified since the given time.
570    pub fn read_dir_all<'a, 'b>(
571        &'a self,
572        dir_path: impl Into<&'b Path> + Send,
573        unmodified_since: Option<DateTime<Utc>>,
574    ) -> BoxStream<'a, Result<ObjectMeta>> {
575        self.inner.read_dir_all(dir_path, unmodified_since)
576    }
577
578    /// Remove a directory recursively.
579    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
580        let path = dir_path.into();
581        let path = Path::parse(&path)?;
582
583        if self.is_local() {
584            // Local file system needs to delete directories as well.
585            return super::local::remove_dir_all(&path);
586        }
587        let sub_entries = self
588            .inner
589            .list(Some(&path))
590            .map(|m| m.map(|meta| meta.location))
591            .boxed();
592        self.inner
593            .delete_stream(sub_entries)
594            .try_collect::<Vec<_>>()
595            .await?;
596        Ok(())
597    }
598
599    pub fn remove_stream<'a>(
600        &'a self,
601        locations: BoxStream<'a, Result<Path>>,
602    ) -> BoxStream<'a, Result<Path>> {
603        self.inner
604            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
605            .err_into::<Error>()
606            .boxed()
607    }
608
609    /// Check a file exists.
610    pub async fn exists(&self, path: &Path) -> Result<bool> {
611        match self.inner.head(path).await {
612            Ok(_) => Ok(true),
613            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
614            Err(e) => Err(e.into()),
615        }
616    }
617
618    /// Get file size.
619    pub async fn size(&self, path: &Path) -> Result<u64> {
620        Ok(self.inner.head(path).await?.size)
621    }
622
623    /// Convenience function to open a reader and read all the bytes
624    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
625        let reader = self.open(path).await?;
626        Ok(reader.get_all().await?)
627    }
628
629    /// Convenience function open a reader and make a single request
630    ///
631    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
632    /// and then call [`Reader::get_range`] multiple times.
633    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
634        let reader = self.open(path).await?;
635        Ok(reader.get_range(range).await?)
636    }
637}
638
639/// Options that can be set for multiple object stores
640#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
641pub enum LanceConfigKey {
642    /// Number of times to retry a download that fails
643    DownloadRetryCount,
644}
645
646impl FromStr for LanceConfigKey {
647    type Err = Error;
648
649    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
650        match s.to_ascii_lowercase().as_str() {
651            "download_retry_count" => Ok(Self::DownloadRetryCount),
652            _ => Err(Error::InvalidInput {
653                source: format!("Invalid LanceConfigKey: {}", s).into(),
654                location: location!(),
655            }),
656        }
657    }
658}
659
660#[derive(Clone, Debug, Default)]
661pub struct StorageOptions(pub HashMap<String, String>);
662
663impl StorageOptions {
664    /// Create a new instance of [`StorageOptions`]
665    pub fn new(options: HashMap<String, String>) -> Self {
666        let mut options = options;
667        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
668            options.insert("allow_http".into(), value);
669        }
670        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
671            options.insert("allow_http".into(), value);
672        }
673        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
674            options.insert("allow_http".into(), value);
675        }
676        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
677            options.insert("client_max_retries".into(), value);
678        }
679        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
680            options.insert("client_retry_timeout".into(), value);
681        }
682        Self(options)
683    }
684
685    /// Denotes if unsecure connections via http are allowed
686    pub fn allow_http(&self) -> bool {
687        self.0.iter().any(|(key, value)| {
688            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
689        })
690    }
691
692    /// Number of times to retry a download that fails
693    pub fn download_retry_count(&self) -> usize {
694        self.0
695            .iter()
696            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
697            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
698            .unwrap_or(3)
699    }
700
701    /// Max retry times to set in RetryConfig for object store client
702    pub fn client_max_retries(&self) -> usize {
703        self.0
704            .iter()
705            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
706            .and_then(|(_, value)| value.parse::<usize>().ok())
707            .unwrap_or(10)
708    }
709
710    /// Seconds of timeout to set in RetryConfig for object store client
711    pub fn client_retry_timeout(&self) -> u64 {
712        self.0
713            .iter()
714            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
715            .and_then(|(_, value)| value.parse::<u64>().ok())
716            .unwrap_or(180)
717    }
718
719    pub fn get(&self, key: &str) -> Option<&String> {
720        self.0.get(key)
721    }
722
723    /// Get the expiration time in milliseconds since epoch, if present
724    pub fn expires_at_millis(&self) -> Option<u64> {
725        self.0
726            .get(EXPIRES_AT_MILLIS_KEY)
727            .and_then(|s| s.parse::<u64>().ok())
728    }
729}
730
731impl From<HashMap<String, String>> for StorageOptions {
732    fn from(value: HashMap<String, String>) -> Self {
733        Self::new(value)
734    }
735}
736
737impl ObjectStore {
738    #[allow(clippy::too_many_arguments)]
739    pub fn new(
740        store: Arc<DynObjectStore>,
741        location: Url,
742        block_size: Option<usize>,
743        wrapper: Option<Arc<dyn WrappingObjectStore>>,
744        use_constant_size_upload_parts: bool,
745        list_is_lexically_ordered: bool,
746        io_parallelism: usize,
747        download_retry_count: usize,
748        storage_options: Option<&HashMap<String, String>>,
749    ) -> Self {
750        let scheme = location.scheme();
751        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
752
753        let store = match wrapper {
754            Some(wrapper) => wrapper.wrap(store, storage_options),
755            None => store,
756        };
757
758        Self {
759            inner: store,
760            scheme: scheme.into(),
761            block_size,
762            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
763            use_constant_size_upload_parts,
764            list_is_lexically_ordered,
765            io_parallelism,
766            download_retry_count,
767        }
768    }
769}
770
771fn infer_block_size(scheme: &str) -> usize {
772    // Block size: On local file systems, we use 4KB block size. On cloud
773    // object stores, we use 64KB block size. This is generally the largest
774    // block size where we don't see a latency penalty.
775    match scheme {
776        "file" => 4 * 1024,
777        _ => 64 * 1024,
778    }
779}
780
781#[cfg(test)]
782mod tests {
783    use super::*;
784    use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
785    use object_store::memory::InMemory;
786    use rstest::rstest;
787    use std::env::set_current_dir;
788    use std::fs::{create_dir_all, write};
789    use std::path::Path as StdPath;
790    use std::sync::atomic::{AtomicBool, Ordering};
791
792    /// Write test content to file.
793    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
794        let expanded = tilde(path_str).to_string();
795        let path = StdPath::new(&expanded);
796        std::fs::create_dir_all(path.parent().unwrap())?;
797        write(path, contents)
798    }
799
800    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
801        let test_file_store = store.open(path).await.unwrap();
802        let size = test_file_store.size().await.unwrap();
803        let bytes = test_file_store.get_range(0..size).await.unwrap();
804        let contents = String::from_utf8(bytes.to_vec()).unwrap();
805        Ok(contents)
806    }
807
808    #[tokio::test]
809    async fn test_absolute_paths() {
810        let tmp_path = TempStrDir::default();
811        write_to_file(
812            &format!("{tmp_path}/bar/foo.lance/test_file"),
813            "TEST_CONTENT",
814        )
815        .unwrap();
816
817        // test a few variations of the same path
818        for uri in &[
819            format!("{tmp_path}/bar/foo.lance"),
820            format!("{tmp_path}/./bar/foo.lance"),
821            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
822        ] {
823            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
824            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
825                .await
826                .unwrap();
827            assert_eq!(contents, "TEST_CONTENT");
828        }
829    }
830
831    #[tokio::test]
832    async fn test_cloud_paths() {
833        let uri = "s3://bucket/foo.lance";
834        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
835        assert_eq!(store.scheme, "s3");
836        assert_eq!(path.to_string(), "foo.lance");
837
838        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
839            .await
840            .unwrap();
841        assert_eq!(store.scheme, "s3");
842        assert_eq!(path.to_string(), "foo.lance");
843
844        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
845            .await
846            .unwrap();
847        assert_eq!(store.scheme, "gs");
848        assert_eq!(path.to_string(), "foo.lance");
849    }
850
851    async fn test_block_size_used_test_helper(
852        uri: &str,
853        storage_options: Option<HashMap<String, String>>,
854        default_expected_block_size: usize,
855    ) {
856        // Test the default
857        let registry = Arc::new(ObjectStoreRegistry::default());
858        let params = ObjectStoreParams {
859            storage_options: storage_options.clone(),
860            ..ObjectStoreParams::default()
861        };
862        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
863            .await
864            .unwrap();
865        assert_eq!(store.block_size, default_expected_block_size);
866
867        // Ensure param is used
868        let registry = Arc::new(ObjectStoreRegistry::default());
869        let params = ObjectStoreParams {
870            block_size: Some(1024),
871            storage_options: storage_options.clone(),
872            ..ObjectStoreParams::default()
873        };
874        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
875            .await
876            .unwrap();
877        assert_eq!(store.block_size, 1024);
878    }
879
880    #[rstest]
881    #[case("s3://bucket/foo.lance", None)]
882    #[case("gs://bucket/foo.lance", None)]
883    #[case("az://account/bucket/foo.lance",
884      Some(HashMap::from([
885            (String::from("account_name"), String::from("account")),
886            (String::from("container_name"), String::from("container"))
887           ])))]
888    #[tokio::test]
889    async fn test_block_size_used_cloud(
890        #[case] uri: &str,
891        #[case] storage_options: Option<HashMap<String, String>>,
892    ) {
893        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
894    }
895
896    #[rstest]
897    #[case("file")]
898    #[case("file-object-store")]
899    #[case("memory:///bucket/foo.lance")]
900    #[tokio::test]
901    async fn test_block_size_used_file(#[case] prefix: &str) {
902        let tmp_path = TempStrDir::default();
903        let path = format!("{tmp_path}/bar/foo.lance/test_file");
904        write_to_file(&path, "URL").unwrap();
905        let uri = format!("{prefix}:///{path}");
906        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
907    }
908
909    #[tokio::test]
910    async fn test_relative_paths() {
911        let tmp_path = TempStrDir::default();
912        write_to_file(
913            &format!("{tmp_path}/bar/foo.lance/test_file"),
914            "RELATIVE_URL",
915        )
916        .unwrap();
917
918        set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
919        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
920
921        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
922            .await
923            .unwrap();
924        assert_eq!(contents, "RELATIVE_URL");
925    }
926
927    #[tokio::test]
928    async fn test_tilde_expansion() {
929        let uri = "~/foo.lance";
930        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
931        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
932        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
933            .await
934            .unwrap();
935        assert_eq!(contents, "TILDE");
936    }
937
938    #[tokio::test]
939    async fn test_read_directory() {
940        let path = TempStdDir::default();
941        create_dir_all(path.join("foo").join("bar")).unwrap();
942        create_dir_all(path.join("foo").join("zoo")).unwrap();
943        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
944        write_to_file(
945            path.join("foo").join("test_file").to_str().unwrap(),
946            "read_dir",
947        )
948        .unwrap();
949        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
950
951        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
952        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
953    }
954
955    #[tokio::test]
956    async fn test_delete_directory() {
957        let path = TempStdDir::default();
958        create_dir_all(path.join("foo").join("bar")).unwrap();
959        create_dir_all(path.join("foo").join("zoo")).unwrap();
960        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
961        write_to_file(
962            path.join("foo")
963                .join("bar")
964                .join("test_file")
965                .to_str()
966                .unwrap(),
967            "delete",
968        )
969        .unwrap();
970        write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
971        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
972        store.remove_dir_all(base.child("foo")).await.unwrap();
973
974        assert!(!path.join("foo").exists());
975    }
976
977    #[derive(Debug)]
978    struct TestWrapper {
979        called: AtomicBool,
980
981        return_value: Arc<dyn OSObjectStore>,
982    }
983
984    impl WrappingObjectStore for TestWrapper {
985        fn wrap(
986            &self,
987            _original: Arc<dyn OSObjectStore>,
988            _storage_options: Option<&HashMap<String, String>>,
989        ) -> Arc<dyn OSObjectStore> {
990            self.called.store(true, Ordering::Relaxed);
991
992            // return a mocked value so we can check if the final store is the one we expect
993            self.return_value.clone()
994        }
995    }
996
997    impl TestWrapper {
998        fn called(&self) -> bool {
999            self.called.load(Ordering::Relaxed)
1000        }
1001    }
1002
1003    #[tokio::test]
1004    async fn test_wrapping_object_store_option_is_used() {
1005        // Make a store for the inner store first
1006        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1007        let registry = Arc::new(ObjectStoreRegistry::default());
1008
1009        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1010
1011        let wrapper = Arc::new(TestWrapper {
1012            called: AtomicBool::new(false),
1013            return_value: mock_inner_store.clone(),
1014        });
1015
1016        let params = ObjectStoreParams {
1017            object_store_wrapper: Some(wrapper.clone()),
1018            ..ObjectStoreParams::default()
1019        };
1020
1021        // not called yet
1022        assert!(!wrapper.called());
1023
1024        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
1025            .await
1026            .unwrap();
1027
1028        // called after construction
1029        assert!(wrapper.called());
1030
1031        // hard to compare two trait pointers as the point to vtables
1032        // using the ref count as a proxy to make sure that the store is correctly kept
1033        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1034    }
1035
1036    #[tokio::test]
1037    async fn test_local_paths() {
1038        let file_path = TempStdFile::default();
1039        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1040        writer.write_all(b"LOCAL").await.unwrap();
1041        writer.shutdown().await.unwrap();
1042
1043        let reader = ObjectStore::open_local(&file_path).await.unwrap();
1044        let buf = reader.get_range(0..5).await.unwrap();
1045        assert_eq!(buf.as_ref(), b"LOCAL");
1046    }
1047
1048    #[tokio::test]
1049    async fn test_read_one() {
1050        let file_path = TempStdFile::default();
1051        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1052        writer.write_all(b"LOCAL").await.unwrap();
1053        writer.shutdown().await.unwrap();
1054
1055        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1056        let obj_store = ObjectStore::local();
1057        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1058        assert_eq!(buf.as_ref(), b"LOCAL");
1059
1060        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1061        assert_eq!(buf.as_ref(), b"LOCAL");
1062    }
1063
1064    #[tokio::test]
1065    #[cfg(windows)]
1066    async fn test_windows_paths() {
1067        use std::path::Component;
1068        use std::path::Prefix;
1069        use std::path::Prefix::*;
1070
1071        fn get_path_prefix(path: &StdPath) -> Prefix {
1072            match path.components().next().unwrap() {
1073                Component::Prefix(prefix_component) => prefix_component.kind(),
1074                _ => panic!(),
1075            }
1076        }
1077
1078        fn get_drive_letter(prefix: Prefix) -> String {
1079            match prefix {
1080                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1081                _ => panic!(),
1082            }
1083        }
1084
1085        let tmp_path = TempStdFile::default();
1086        let prefix = get_path_prefix(&tmp_path);
1087        let drive_letter = get_drive_letter(prefix);
1088
1089        write_to_file(
1090            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1091            "WINDOWS",
1092        )
1093        .unwrap();
1094
1095        for uri in &[
1096            format!("{drive_letter}:/test_folder/test.lance"),
1097            format!("{drive_letter}:\\test_folder\\test.lance"),
1098        ] {
1099            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1100            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1101                .await
1102                .unwrap();
1103            assert_eq!(contents, "WINDOWS");
1104        }
1105    }
1106
1107    #[tokio::test]
1108    async fn test_cross_filesystem_copy() {
1109        // Create two temporary directories that simulate different filesystems
1110        let source_dir = TempStdDir::default();
1111        let dest_dir = TempStdDir::default();
1112
1113        // Create a test file in the source directory
1114        let source_file_name = "test_file.txt";
1115        let source_file = source_dir.join(source_file_name);
1116        std::fs::write(&source_file, b"test content").unwrap();
1117
1118        // Create ObjectStore for local filesystem
1119        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1120            .await
1121            .unwrap();
1122
1123        // Create paths relative to the ObjectStore base
1124        let from_path = base_path.child(source_file_name);
1125
1126        // Use object_store::Path::parse for the destination
1127        let dest_file = dest_dir.join("copied_file.txt");
1128        let dest_str = dest_file.to_str().unwrap();
1129        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1130
1131        // Perform the copy operation
1132        store.copy(&from_path, &to_path).await.unwrap();
1133
1134        // Verify the file was copied correctly
1135        assert!(dest_file.exists());
1136        let copied_content = std::fs::read(&dest_file).unwrap();
1137        assert_eq!(copied_content, b"test content");
1138    }
1139
1140    #[tokio::test]
1141    async fn test_copy_creates_parent_directories() {
1142        let source_dir = TempStdDir::default();
1143        let dest_dir = TempStdDir::default();
1144
1145        // Create a test file in the source directory
1146        let source_file_name = "test_file.txt";
1147        let source_file = source_dir.join(source_file_name);
1148        std::fs::write(&source_file, b"test content").unwrap();
1149
1150        // Create ObjectStore for local filesystem
1151        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1152            .await
1153            .unwrap();
1154
1155        // Create paths
1156        let from_path = base_path.child(source_file_name);
1157
1158        // Create destination with nested directories that don't exist yet
1159        let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1160        let dest_str = dest_file.to_str().unwrap();
1161        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1162
1163        // Perform the copy operation - should create parent directories
1164        store.copy(&from_path, &to_path).await.unwrap();
1165
1166        // Verify the file was copied correctly and directories were created
1167        assert!(dest_file.exists());
1168        assert!(dest_file.parent().unwrap().exists());
1169        let copied_content = std::fs::read(&dest_file).unwrap();
1170        assert_eq!(copied_content, b"test content");
1171    }
1172}