lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::WriteResult;
41use crate::utils::tracking_store::{IOTracker, IoStats};
42use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
43use lance_core::{Error, Result};
44
45// Local disks tend to do fine with a few threads
46// Note: the number of threads here also impacts the number of files
47// we need to read in some situations.  So keeping this at 8 keeps the
48// RAM on our scanner down.
49pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
50// Cloud disks often need many many threads to saturate the network
51pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
52
53const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
54#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
55const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
56
57pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
58    std::env::var("LANCE_MAX_IOP_SIZE")
59        .map(|val| val.parse().unwrap())
60        .unwrap_or(16 * 1024 * 1024)
61});
62
63pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
64
65pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
66pub use storage_options::{
67    LanceNamespaceStorageOptionsProvider, StorageOptionsProvider, EXPIRES_AT_MILLIS_KEY,
68};
69
70#[async_trait]
71pub trait ObjectStoreExt {
72    /// Returns true if the file exists.
73    async fn exists(&self, path: &Path) -> Result<bool>;
74
75    /// Read all files (start from base directory) recursively
76    ///
77    /// unmodified_since can be specified to only return files that have not been modified since the given time.
78    fn read_dir_all<'a, 'b>(
79        &'a self,
80        dir_path: impl Into<&'b Path> + Send,
81        unmodified_since: Option<DateTime<Utc>>,
82    ) -> BoxStream<'a, Result<ObjectMeta>>;
83}
84
85#[async_trait]
86impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
87    fn read_dir_all<'a, 'b>(
88        &'a self,
89        dir_path: impl Into<&'b Path> + Send,
90        unmodified_since: Option<DateTime<Utc>>,
91    ) -> BoxStream<'a, Result<ObjectMeta>> {
92        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
93        if let Some(unmodified_since_val) = unmodified_since {
94            output
95                .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
96                .boxed()
97        } else {
98            output.boxed()
99        }
100    }
101
102    async fn exists(&self, path: &Path) -> Result<bool> {
103        match self.head(path).await {
104            Ok(_) => Ok(true),
105            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
106            Err(e) => Err(e.into()),
107        }
108    }
109}
110
111/// Wraps [ObjectStore](object_store::ObjectStore)
112#[derive(Debug, Clone)]
113pub struct ObjectStore {
114    // Inner object store
115    pub inner: Arc<dyn OSObjectStore>,
116    scheme: String,
117    block_size: usize,
118    max_iop_size: u64,
119    /// Whether to use constant size upload parts for multipart uploads. This
120    /// is only necessary for Cloudflare R2.
121    pub use_constant_size_upload_parts: bool,
122    /// Whether we can assume that the list of files is lexically ordered. This
123    /// is true for object stores, but not for local filesystems.
124    pub list_is_lexically_ordered: bool,
125    io_parallelism: usize,
126    /// Number of times to retry a failed download
127    download_retry_count: usize,
128    /// IO tracker for monitoring read/write operations
129    io_tracker: IOTracker,
130}
131
132impl DeepSizeOf for ObjectStore {
133    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
134        // We aren't counting `inner` here which is problematic but an ObjectStore
135        // shouldn't be too big.  The only exception might be the write cache but, if
136        // the writer cache has data, it means we're using it somewhere else that isn't
137        // a cache and so that doesn't really count.
138        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
139    }
140}
141
142impl std::fmt::Display for ObjectStore {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        write!(f, "ObjectStore({})", self.scheme)
145    }
146}
147
148pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
149    /// Wrap an object store with additional functionality
150    ///
151    /// The store_prefix is a string which uniquely identifies the object
152    /// store being wrapped.
153    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
154}
155
156#[derive(Debug, Clone)]
157pub struct ChainedWrappingObjectStore {
158    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
159}
160
161impl ChainedWrappingObjectStore {
162    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
163        Self { wrappers }
164    }
165
166    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
167        self.wrappers.push(wrapper);
168    }
169}
170
171impl WrappingObjectStore for ChainedWrappingObjectStore {
172    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
173        self.wrappers
174            .iter()
175            .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
176    }
177}
178
179/// Parameters to create an [ObjectStore]
180///
181#[derive(Debug, Clone)]
182pub struct ObjectStoreParams {
183    pub block_size: Option<usize>,
184    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
185    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
186    pub s3_credentials_refresh_offset: Duration,
187    #[cfg(feature = "aws")]
188    pub aws_credentials: Option<AwsCredentialProvider>,
189    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
190    pub storage_options: Option<HashMap<String, String>>,
191    /// Dynamic storage options provider for automatic credential refresh
192    pub storage_options_provider: Option<Arc<dyn StorageOptionsProvider>>,
193    /// Use constant size upload parts for multipart uploads. Only necessary
194    /// for Cloudflare R2, which doesn't support variable size parts. When this
195    /// is false, max upload size is 2.5TB. When this is true, the max size is
196    /// 50GB.
197    pub use_constant_size_upload_parts: bool,
198    pub list_is_lexically_ordered: Option<bool>,
199}
200
201impl Default for ObjectStoreParams {
202    fn default() -> Self {
203        #[allow(deprecated)]
204        Self {
205            object_store: None,
206            block_size: None,
207            s3_credentials_refresh_offset: Duration::from_secs(60),
208            #[cfg(feature = "aws")]
209            aws_credentials: None,
210            object_store_wrapper: None,
211            storage_options: None,
212            storage_options_provider: None,
213            use_constant_size_upload_parts: false,
214            list_is_lexically_ordered: None,
215        }
216    }
217}
218
219// We implement hash for caching
220impl std::hash::Hash for ObjectStoreParams {
221    #[allow(deprecated)]
222    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
223        // For hashing, we use pointer values for ObjectStore, S3 credentials, wrapper, and storage options provider
224        self.block_size.hash(state);
225        if let Some((store, url)) = &self.object_store {
226            Arc::as_ptr(store).hash(state);
227            url.hash(state);
228        }
229        self.s3_credentials_refresh_offset.hash(state);
230        #[cfg(feature = "aws")]
231        if let Some(aws_credentials) = &self.aws_credentials {
232            Arc::as_ptr(aws_credentials).hash(state);
233        }
234        if let Some(wrapper) = &self.object_store_wrapper {
235            Arc::as_ptr(wrapper).hash(state);
236        }
237        if let Some(storage_options) = &self.storage_options {
238            for (key, value) in storage_options {
239                key.hash(state);
240                value.hash(state);
241            }
242        }
243        if let Some(provider) = &self.storage_options_provider {
244            provider.provider_id().hash(state);
245        }
246        self.use_constant_size_upload_parts.hash(state);
247        self.list_is_lexically_ordered.hash(state);
248    }
249}
250
251// We implement eq for caching
252impl Eq for ObjectStoreParams {}
253impl PartialEq for ObjectStoreParams {
254    #[allow(deprecated)]
255    fn eq(&self, other: &Self) -> bool {
256        #[cfg(feature = "aws")]
257        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
258            return false;
259        }
260
261        // For equality, we use pointer comparison for ObjectStore, S3 credentials, wrapper
262        // For storage_options_provider, we use provider_id() for semantic equality
263        self.block_size == other.block_size
264            && self
265                .object_store
266                .as_ref()
267                .map(|(store, url)| (Arc::as_ptr(store), url))
268                == other
269                    .object_store
270                    .as_ref()
271                    .map(|(store, url)| (Arc::as_ptr(store), url))
272            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
273            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
274                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
275            && self.storage_options == other.storage_options
276            && self
277                .storage_options_provider
278                .as_ref()
279                .map(|p| p.provider_id())
280                == other
281                    .storage_options_provider
282                    .as_ref()
283                    .map(|p| p.provider_id())
284            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
285            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
286    }
287}
288
289/// Convert a URI string or local path to a URL
290///
291/// This function handles both proper URIs (with schemes like `file://`, `s3://`, etc.)
292/// and plain local filesystem paths. On Windows, it correctly handles drive letters
293/// that might be parsed as URL schemes.
294///
295/// # Examples
296///
297/// ```
298/// # use lance_io::object_store::uri_to_url;
299/// // URIs are preserved
300/// let url = uri_to_url("s3://bucket/path").unwrap();
301/// assert_eq!(url.scheme(), "s3");
302///
303/// // Local paths are converted to file:// URIs
304/// # #[cfg(unix)]
305/// let url = uri_to_url("/tmp/data").unwrap();
306/// # #[cfg(unix)]
307/// assert_eq!(url.scheme(), "file");
308/// ```
309pub fn uri_to_url(uri: &str) -> Result<Url> {
310    match Url::parse(uri) {
311        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
312            // On Windows, the drive is parsed as a scheme
313            local_path_to_url(uri)
314        }
315        Ok(url) => Ok(url),
316        Err(_) => local_path_to_url(uri),
317    }
318}
319
320fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
321    let expanded = tilde(str_path.as_ref()).to_string();
322
323    let mut expanded_path = path_abs::PathAbs::new(expanded)
324        .unwrap()
325        .as_path()
326        .to_path_buf();
327    // path_abs::PathAbs::new(".") returns an empty string.
328    if let Some(s) = expanded_path.as_path().to_str() {
329        if s.is_empty() {
330            expanded_path = std::env::current_dir()?;
331        }
332    }
333
334    Ok(expanded_path)
335}
336
337fn local_path_to_url(str_path: &str) -> Result<Url> {
338    let expanded_path = expand_path(str_path)?;
339
340    Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
341        source: format!("Invalid table location: '{}'", str_path).into(),
342        location: location!(),
343    })
344}
345
346#[cfg(feature = "huggingface")]
347fn parse_hf_repo_id(url: &Url) -> Result<String> {
348    // Accept forms with repo type prefix (models/datasets/spaces) or legacy without.
349    let mut segments: Vec<String> = Vec::new();
350    if let Some(host) = url.host_str() {
351        segments.push(host.to_string());
352    }
353    segments.extend(
354        url.path()
355            .trim_start_matches('/')
356            .split('/')
357            .map(|s| s.to_string()),
358    );
359
360    if segments.len() < 2 {
361        return Err(Error::invalid_input(
362            "Huggingface URL must contain at least owner and repo",
363            location!(),
364        ));
365    }
366
367    let repo_type_candidates = ["models", "datasets", "spaces"];
368    let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
369        if segments.len() < 3 {
370            return Err(Error::invalid_input(
371                "Huggingface URL missing owner/repo after repo type",
372                location!(),
373            ));
374        }
375        (segments[1].as_str(), segments[2].as_str())
376    } else {
377        (segments[0].as_str(), segments[1].as_str())
378    };
379
380    let repo = repo_with_rev
381        .split_once('@')
382        .map(|(r, _)| r)
383        .unwrap_or(repo_with_rev);
384    Ok(format!("{owner}/{repo}"))
385}
386
387impl ObjectStore {
388    /// Parse from a string URI.
389    ///
390    /// Returns the ObjectStore instance and the absolute path to the object.
391    ///
392    /// This uses the default [ObjectStoreRegistry] to find the object store. To
393    /// allow for potential re-use of object store instances, it's recommended to
394    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
395    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
396        let registry = Arc::new(ObjectStoreRegistry::default());
397
398        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
399    }
400
401    /// Parse from a string URI.
402    ///
403    /// Returns the ObjectStore instance and the absolute path to the object.
404    pub async fn from_uri_and_params(
405        registry: Arc<ObjectStoreRegistry>,
406        uri: &str,
407        params: &ObjectStoreParams,
408    ) -> Result<(Arc<Self>, Path)> {
409        #[allow(deprecated)]
410        if let Some((store, path)) = params.object_store.as_ref() {
411            let mut inner = store.clone();
412            let store_prefix =
413                registry.calculate_object_store_prefix(uri, params.storage_options.as_ref())?;
414            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
415                inner = wrapper.wrap(&store_prefix, inner);
416            }
417
418            // Always wrap with IO tracking
419            let io_tracker = IOTracker::default();
420            let tracked_store = io_tracker.wrap("", inner);
421
422            let store = Self {
423                inner: tracked_store,
424                scheme: path.scheme().to_string(),
425                block_size: params.block_size.unwrap_or(64 * 1024),
426                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
427                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
428                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
429                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
430                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
431                io_tracker,
432            };
433            let path = Path::parse(path.path())?;
434            return Ok((Arc::new(store), path));
435        }
436        let url = uri_to_url(uri)?;
437
438        let store = registry.get_store(url.clone(), params).await?;
439        // We know the scheme is valid if we got a store back.
440        let provider = registry.get_provider(url.scheme()).expect_ok()?;
441        let path = provider.extract_path(&url)?;
442
443        Ok((store, path))
444    }
445
446    /// Extract the path component from a URI without initializing the object store.
447    ///
448    /// This is a synchronous operation that only parses the URI and extracts the path,
449    /// without creating or initializing any object store instance.
450    ///
451    /// # Arguments
452    ///
453    /// * `registry` - The object store registry to get the provider
454    /// * `uri` - The URI to extract the path from
455    ///
456    /// # Returns
457    ///
458    /// The extracted path component
459    pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
460        let url = uri_to_url(uri)?;
461        let provider = registry.get_provider(url.scheme()).ok_or_else(|| {
462            Error::invalid_input(format!("Unknown scheme: {}", url.scheme()), location!())
463        })?;
464        provider.extract_path(&url)
465    }
466
467    #[deprecated(note = "Use `from_uri` instead")]
468    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
469        Self::from_uri_and_params(
470            Arc::new(ObjectStoreRegistry::default()),
471            str_path,
472            &Default::default(),
473        )
474        .now_or_never()
475        .unwrap()
476    }
477
478    /// Local object store.
479    pub fn local() -> Self {
480        let provider = FileStoreProvider;
481        provider
482            .new_store(Url::parse("file:///").unwrap(), &Default::default())
483            .now_or_never()
484            .unwrap()
485            .unwrap()
486    }
487
488    /// Create a in-memory object store directly for testing.
489    pub fn memory() -> Self {
490        let provider = MemoryStoreProvider;
491        provider
492            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
493            .now_or_never()
494            .unwrap()
495            .unwrap()
496    }
497
498    /// Returns true if the object store pointed to a local file system.
499    pub fn is_local(&self) -> bool {
500        self.scheme == "file"
501    }
502
503    pub fn is_cloud(&self) -> bool {
504        self.scheme != "file" && self.scheme != "memory"
505    }
506
507    pub fn scheme(&self) -> &str {
508        &self.scheme
509    }
510
511    pub fn block_size(&self) -> usize {
512        self.block_size
513    }
514
515    pub fn max_iop_size(&self) -> u64 {
516        self.max_iop_size
517    }
518
519    pub fn io_parallelism(&self) -> usize {
520        std::env::var("LANCE_IO_THREADS")
521            .map(|val| val.parse::<usize>().unwrap())
522            .unwrap_or(self.io_parallelism)
523    }
524
525    /// Get the IO tracker for this object store
526    ///
527    /// The IO tracker can be used to get statistics about read/write operations
528    /// performed on this object store.
529    pub fn io_tracker(&self) -> &IOTracker {
530        &self.io_tracker
531    }
532
533    /// Get a snapshot of current IO statistics without resetting counters
534    ///
535    /// Returns the current IO statistics without modifying the internal state.
536    /// Use this when you need to check stats without resetting them.
537    pub fn io_stats_snapshot(&self) -> IoStats {
538        self.io_tracker.stats()
539    }
540
541    /// Get incremental IO statistics since the last call to this method
542    ///
543    /// Returns the accumulated statistics since the last call and resets the
544    /// counters to zero. This is useful for tracking IO operations between
545    /// different stages of processing.
546    pub fn io_stats_incremental(&self) -> IoStats {
547        self.io_tracker.incremental_stats()
548    }
549
550    /// Open a file for path.
551    ///
552    /// Parameters
553    /// - ``path``: Absolute path to the file.
554    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
555        match self.scheme.as_str() {
556            "file" => {
557                LocalObjectReader::open_with_tracker(
558                    path,
559                    self.block_size,
560                    None,
561                    Arc::new(self.io_tracker.clone()),
562                )
563                .await
564            }
565            _ => Ok(Box::new(CloudObjectReader::new(
566                self.inner.clone(),
567                path.clone(),
568                self.block_size,
569                None,
570                self.download_retry_count,
571            )?)),
572        }
573    }
574
575    /// Open a reader for a file with known size.
576    ///
577    /// This size may either have been retrieved from a list operation or
578    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
579    /// call.
580    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
581        // If we know the file is really small, we can read the whole thing
582        // as a single request.
583        if known_size <= self.block_size {
584            return Ok(Box::new(SmallReader::new(
585                self.inner.clone(),
586                path.clone(),
587                self.download_retry_count,
588                known_size,
589            )));
590        }
591
592        match self.scheme.as_str() {
593            "file" => {
594                LocalObjectReader::open_with_tracker(
595                    path,
596                    self.block_size,
597                    Some(known_size),
598                    Arc::new(self.io_tracker.clone()),
599                )
600                .await
601            }
602            _ => Ok(Box::new(CloudObjectReader::new(
603                self.inner.clone(),
604                path.clone(),
605                self.block_size,
606                Some(known_size),
607                self.download_retry_count,
608            )?)),
609        }
610    }
611
612    /// Create an [ObjectWriter] from local [std::path::Path]
613    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
614        let object_store = Self::local();
615        let absolute_path = expand_path(path.to_string_lossy())?;
616        let os_path = Path::from_absolute_path(absolute_path)?;
617        object_store.create(&os_path).await
618    }
619
620    /// Open an [Reader] from local [std::path::Path]
621    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
622        let object_store = Self::local();
623        let absolute_path = expand_path(path.to_string_lossy())?;
624        let os_path = Path::from_absolute_path(absolute_path)?;
625        object_store.open(&os_path).await
626    }
627
628    /// Create a new file.
629    pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
630        ObjectWriter::new(self, path).await
631    }
632
633    /// A helper function to create a file and write content to it.
634    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
635        let mut writer = self.create(path).await?;
636        writer.write_all(content).await?;
637        writer.shutdown().await
638    }
639
640    pub async fn delete(&self, path: &Path) -> Result<()> {
641        self.inner.delete(path).await?;
642        Ok(())
643    }
644
645    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
646        if self.is_local() {
647            // Use std::fs::copy for local filesystem to support cross-filesystem copies
648            return super::local::copy_file(from, to);
649        }
650        Ok(self.inner.copy(from, to).await?)
651    }
652
653    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
654    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
655        let path = dir_path.into();
656        let path = Path::parse(&path)?;
657        let output = self.inner.list_with_delimiter(Some(&path)).await?;
658        Ok(output
659            .common_prefixes
660            .iter()
661            .chain(output.objects.iter().map(|o| &o.location))
662            .map(|s| s.filename().unwrap().to_string())
663            .collect())
664    }
665
666    pub fn list(
667        &self,
668        path: Option<Path>,
669    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
670        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
671    }
672
673    /// Read all files (start from base directory) recursively
674    ///
675    /// unmodified_since can be specified to only return files that have not been modified since the given time.
676    pub fn read_dir_all<'a, 'b>(
677        &'a self,
678        dir_path: impl Into<&'b Path> + Send,
679        unmodified_since: Option<DateTime<Utc>>,
680    ) -> BoxStream<'a, Result<ObjectMeta>> {
681        self.inner.read_dir_all(dir_path, unmodified_since)
682    }
683
684    /// Remove a directory recursively.
685    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
686        let path = dir_path.into();
687        let path = Path::parse(&path)?;
688
689        if self.is_local() {
690            // Local file system needs to delete directories as well.
691            return super::local::remove_dir_all(&path);
692        }
693        let sub_entries = self
694            .inner
695            .list(Some(&path))
696            .map(|m| m.map(|meta| meta.location))
697            .boxed();
698        self.inner
699            .delete_stream(sub_entries)
700            .try_collect::<Vec<_>>()
701            .await?;
702        Ok(())
703    }
704
705    pub fn remove_stream<'a>(
706        &'a self,
707        locations: BoxStream<'a, Result<Path>>,
708    ) -> BoxStream<'a, Result<Path>> {
709        self.inner
710            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
711            .err_into::<Error>()
712            .boxed()
713    }
714
715    /// Check a file exists.
716    pub async fn exists(&self, path: &Path) -> Result<bool> {
717        match self.inner.head(path).await {
718            Ok(_) => Ok(true),
719            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
720            Err(e) => Err(e.into()),
721        }
722    }
723
724    /// Get file size.
725    pub async fn size(&self, path: &Path) -> Result<u64> {
726        Ok(self.inner.head(path).await?.size)
727    }
728
729    /// Convenience function to open a reader and read all the bytes
730    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
731        let reader = self.open(path).await?;
732        Ok(reader.get_all().await?)
733    }
734
735    /// Convenience function open a reader and make a single request
736    ///
737    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
738    /// and then call [`Reader::get_range`] multiple times.
739    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
740        let reader = self.open(path).await?;
741        Ok(reader.get_range(range).await?)
742    }
743}
744
745/// Options that can be set for multiple object stores
746#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
747pub enum LanceConfigKey {
748    /// Number of times to retry a download that fails
749    DownloadRetryCount,
750}
751
752impl FromStr for LanceConfigKey {
753    type Err = Error;
754
755    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
756        match s.to_ascii_lowercase().as_str() {
757            "download_retry_count" => Ok(Self::DownloadRetryCount),
758            _ => Err(Error::InvalidInput {
759                source: format!("Invalid LanceConfigKey: {}", s).into(),
760                location: location!(),
761            }),
762        }
763    }
764}
765
766#[derive(Clone, Debug, Default)]
767pub struct StorageOptions(pub HashMap<String, String>);
768
769impl StorageOptions {
770    /// Create a new instance of [`StorageOptions`]
771    pub fn new(options: HashMap<String, String>) -> Self {
772        let mut options = options;
773        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
774            options.insert("allow_http".into(), value);
775        }
776        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
777            options.insert("allow_http".into(), value);
778        }
779        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
780            options.insert("allow_http".into(), value);
781        }
782        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
783            options.insert("client_max_retries".into(), value);
784        }
785        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
786            options.insert("client_retry_timeout".into(), value);
787        }
788        Self(options)
789    }
790
791    /// Denotes if unsecure connections via http are allowed
792    pub fn allow_http(&self) -> bool {
793        self.0.iter().any(|(key, value)| {
794            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
795        })
796    }
797
798    /// Number of times to retry a download that fails
799    pub fn download_retry_count(&self) -> usize {
800        self.0
801            .iter()
802            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
803            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
804            .unwrap_or(3)
805    }
806
807    /// Max retry times to set in RetryConfig for object store client
808    pub fn client_max_retries(&self) -> usize {
809        self.0
810            .iter()
811            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
812            .and_then(|(_, value)| value.parse::<usize>().ok())
813            .unwrap_or(10)
814    }
815
816    /// Seconds of timeout to set in RetryConfig for object store client
817    pub fn client_retry_timeout(&self) -> u64 {
818        self.0
819            .iter()
820            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
821            .and_then(|(_, value)| value.parse::<u64>().ok())
822            .unwrap_or(180)
823    }
824
825    pub fn get(&self, key: &str) -> Option<&String> {
826        self.0.get(key)
827    }
828
829    /// Get the expiration time in milliseconds since epoch, if present
830    pub fn expires_at_millis(&self) -> Option<u64> {
831        self.0
832            .get(EXPIRES_AT_MILLIS_KEY)
833            .and_then(|s| s.parse::<u64>().ok())
834    }
835}
836
837impl From<HashMap<String, String>> for StorageOptions {
838    fn from(value: HashMap<String, String>) -> Self {
839        Self::new(value)
840    }
841}
842
843static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
844    std::sync::LazyLock::new(ObjectStoreRegistry::default);
845
846impl ObjectStore {
847    #[allow(clippy::too_many_arguments)]
848    pub fn new(
849        store: Arc<DynObjectStore>,
850        location: Url,
851        block_size: Option<usize>,
852        wrapper: Option<Arc<dyn WrappingObjectStore>>,
853        use_constant_size_upload_parts: bool,
854        list_is_lexically_ordered: bool,
855        io_parallelism: usize,
856        download_retry_count: usize,
857        storage_options: Option<&HashMap<String, String>>,
858    ) -> Self {
859        let scheme = location.scheme();
860        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
861
862        let store = match wrapper {
863            Some(wrapper) => {
864                let store_prefix = DEFAULT_OBJECT_STORE_REGISTRY
865                    .calculate_object_store_prefix(location.as_ref(), storage_options)
866                    .unwrap();
867                wrapper.wrap(&store_prefix, store)
868            }
869            None => store,
870        };
871
872        // Always wrap with IO tracking
873        let io_tracker = IOTracker::default();
874        let tracked_store = io_tracker.wrap("", store);
875
876        Self {
877            inner: tracked_store,
878            scheme: scheme.into(),
879            block_size,
880            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
881            use_constant_size_upload_parts,
882            list_is_lexically_ordered,
883            io_parallelism,
884            download_retry_count,
885            io_tracker,
886        }
887    }
888}
889
890fn infer_block_size(scheme: &str) -> usize {
891    // Block size: On local file systems, we use 4KB block size. On cloud
892    // object stores, we use 64KB block size. This is generally the largest
893    // block size where we don't see a latency penalty.
894    match scheme {
895        "file" => 4 * 1024,
896        _ => 64 * 1024,
897    }
898}
899
900#[cfg(test)]
901mod tests {
902    use super::*;
903    use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
904    use object_store::memory::InMemory;
905    use rstest::rstest;
906    use std::env::set_current_dir;
907    use std::fs::{create_dir_all, write};
908    use std::path::Path as StdPath;
909    use std::sync::atomic::{AtomicBool, Ordering};
910
911    /// Write test content to file.
912    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
913        let expanded = tilde(path_str).to_string();
914        let path = StdPath::new(&expanded);
915        std::fs::create_dir_all(path.parent().unwrap())?;
916        write(path, contents)
917    }
918
919    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
920        let test_file_store = store.open(path).await.unwrap();
921        let size = test_file_store.size().await.unwrap();
922        let bytes = test_file_store.get_range(0..size).await.unwrap();
923        let contents = String::from_utf8(bytes.to_vec()).unwrap();
924        Ok(contents)
925    }
926
927    #[tokio::test]
928    async fn test_absolute_paths() {
929        let tmp_path = TempStrDir::default();
930        write_to_file(
931            &format!("{tmp_path}/bar/foo.lance/test_file"),
932            "TEST_CONTENT",
933        )
934        .unwrap();
935
936        // test a few variations of the same path
937        for uri in &[
938            format!("{tmp_path}/bar/foo.lance"),
939            format!("{tmp_path}/./bar/foo.lance"),
940            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
941        ] {
942            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
943            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
944                .await
945                .unwrap();
946            assert_eq!(contents, "TEST_CONTENT");
947        }
948    }
949
950    #[tokio::test]
951    async fn test_cloud_paths() {
952        let uri = "s3://bucket/foo.lance";
953        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
954        assert_eq!(store.scheme, "s3");
955        assert_eq!(path.to_string(), "foo.lance");
956
957        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
958            .await
959            .unwrap();
960        assert_eq!(store.scheme, "s3");
961        assert_eq!(path.to_string(), "foo.lance");
962
963        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
964            .await
965            .unwrap();
966        assert_eq!(store.scheme, "gs");
967        assert_eq!(path.to_string(), "foo.lance");
968    }
969
970    async fn test_block_size_used_test_helper(
971        uri: &str,
972        storage_options: Option<HashMap<String, String>>,
973        default_expected_block_size: usize,
974    ) {
975        // Test the default
976        let registry = Arc::new(ObjectStoreRegistry::default());
977        let params = ObjectStoreParams {
978            storage_options: storage_options.clone(),
979            ..ObjectStoreParams::default()
980        };
981        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
982            .await
983            .unwrap();
984        assert_eq!(store.block_size, default_expected_block_size);
985
986        // Ensure param is used
987        let registry = Arc::new(ObjectStoreRegistry::default());
988        let params = ObjectStoreParams {
989            block_size: Some(1024),
990            storage_options: storage_options.clone(),
991            ..ObjectStoreParams::default()
992        };
993        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
994            .await
995            .unwrap();
996        assert_eq!(store.block_size, 1024);
997    }
998
999    #[rstest]
1000    #[case("s3://bucket/foo.lance", None)]
1001    #[case("gs://bucket/foo.lance", None)]
1002    #[case("az://account/bucket/foo.lance",
1003      Some(HashMap::from([
1004            (String::from("account_name"), String::from("account")),
1005            (String::from("container_name"), String::from("container"))
1006           ])))]
1007    #[tokio::test]
1008    async fn test_block_size_used_cloud(
1009        #[case] uri: &str,
1010        #[case] storage_options: Option<HashMap<String, String>>,
1011    ) {
1012        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1013    }
1014
1015    #[rstest]
1016    #[case("file")]
1017    #[case("file-object-store")]
1018    #[case("memory:///bucket/foo.lance")]
1019    #[tokio::test]
1020    async fn test_block_size_used_file(#[case] prefix: &str) {
1021        let tmp_path = TempStrDir::default();
1022        let path = format!("{tmp_path}/bar/foo.lance/test_file");
1023        write_to_file(&path, "URL").unwrap();
1024        let uri = format!("{prefix}:///{path}");
1025        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1026    }
1027
1028    #[tokio::test]
1029    async fn test_relative_paths() {
1030        let tmp_path = TempStrDir::default();
1031        write_to_file(
1032            &format!("{tmp_path}/bar/foo.lance/test_file"),
1033            "RELATIVE_URL",
1034        )
1035        .unwrap();
1036
1037        set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1038        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1039
1040        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1041            .await
1042            .unwrap();
1043        assert_eq!(contents, "RELATIVE_URL");
1044    }
1045
1046    #[tokio::test]
1047    async fn test_tilde_expansion() {
1048        let uri = "~/foo.lance";
1049        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1050        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1051        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1052            .await
1053            .unwrap();
1054        assert_eq!(contents, "TILDE");
1055    }
1056
1057    #[tokio::test]
1058    async fn test_read_directory() {
1059        let path = TempStdDir::default();
1060        create_dir_all(path.join("foo").join("bar")).unwrap();
1061        create_dir_all(path.join("foo").join("zoo")).unwrap();
1062        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1063        write_to_file(
1064            path.join("foo").join("test_file").to_str().unwrap(),
1065            "read_dir",
1066        )
1067        .unwrap();
1068        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1069
1070        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1071        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1072    }
1073
1074    #[tokio::test]
1075    async fn test_delete_directory() {
1076        let path = TempStdDir::default();
1077        create_dir_all(path.join("foo").join("bar")).unwrap();
1078        create_dir_all(path.join("foo").join("zoo")).unwrap();
1079        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1080        write_to_file(
1081            path.join("foo")
1082                .join("bar")
1083                .join("test_file")
1084                .to_str()
1085                .unwrap(),
1086            "delete",
1087        )
1088        .unwrap();
1089        write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
1090        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1091        store.remove_dir_all(base.child("foo")).await.unwrap();
1092
1093        assert!(!path.join("foo").exists());
1094    }
1095
1096    #[derive(Debug)]
1097    struct TestWrapper {
1098        called: AtomicBool,
1099
1100        return_value: Arc<dyn OSObjectStore>,
1101    }
1102
1103    impl WrappingObjectStore for TestWrapper {
1104        fn wrap(
1105            &self,
1106            _store_prefix: &str,
1107            _original: Arc<dyn OSObjectStore>,
1108        ) -> Arc<dyn OSObjectStore> {
1109            self.called.store(true, Ordering::Relaxed);
1110
1111            // return a mocked value so we can check if the final store is the one we expect
1112            self.return_value.clone()
1113        }
1114    }
1115
1116    impl TestWrapper {
1117        fn called(&self) -> bool {
1118            self.called.load(Ordering::Relaxed)
1119        }
1120    }
1121
1122    #[tokio::test]
1123    async fn test_wrapping_object_store_option_is_used() {
1124        // Make a store for the inner store first
1125        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1126        let registry = Arc::new(ObjectStoreRegistry::default());
1127
1128        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1129
1130        let wrapper = Arc::new(TestWrapper {
1131            called: AtomicBool::new(false),
1132            return_value: mock_inner_store.clone(),
1133        });
1134
1135        let params = ObjectStoreParams {
1136            object_store_wrapper: Some(wrapper.clone()),
1137            ..ObjectStoreParams::default()
1138        };
1139
1140        // not called yet
1141        assert!(!wrapper.called());
1142
1143        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
1144            .await
1145            .unwrap();
1146
1147        // called after construction
1148        assert!(wrapper.called());
1149
1150        // hard to compare two trait pointers as the point to vtables
1151        // using the ref count as a proxy to make sure that the store is correctly kept
1152        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1153    }
1154
1155    #[tokio::test]
1156    async fn test_local_paths() {
1157        let file_path = TempStdFile::default();
1158        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1159        writer.write_all(b"LOCAL").await.unwrap();
1160        writer.shutdown().await.unwrap();
1161
1162        let reader = ObjectStore::open_local(&file_path).await.unwrap();
1163        let buf = reader.get_range(0..5).await.unwrap();
1164        assert_eq!(buf.as_ref(), b"LOCAL");
1165    }
1166
1167    #[tokio::test]
1168    async fn test_read_one() {
1169        let file_path = TempStdFile::default();
1170        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1171        writer.write_all(b"LOCAL").await.unwrap();
1172        writer.shutdown().await.unwrap();
1173
1174        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1175        let obj_store = ObjectStore::local();
1176        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1177        assert_eq!(buf.as_ref(), b"LOCAL");
1178
1179        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1180        assert_eq!(buf.as_ref(), b"LOCAL");
1181    }
1182
1183    #[tokio::test]
1184    #[cfg(windows)]
1185    async fn test_windows_paths() {
1186        use std::path::Component;
1187        use std::path::Prefix;
1188        use std::path::Prefix::*;
1189
1190        fn get_path_prefix(path: &StdPath) -> Prefix {
1191            match path.components().next().unwrap() {
1192                Component::Prefix(prefix_component) => prefix_component.kind(),
1193                _ => panic!(),
1194            }
1195        }
1196
1197        fn get_drive_letter(prefix: Prefix) -> String {
1198            match prefix {
1199                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1200                _ => panic!(),
1201            }
1202        }
1203
1204        let tmp_path = TempStdFile::default();
1205        let prefix = get_path_prefix(&tmp_path);
1206        let drive_letter = get_drive_letter(prefix);
1207
1208        write_to_file(
1209            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1210            "WINDOWS",
1211        )
1212        .unwrap();
1213
1214        for uri in &[
1215            format!("{drive_letter}:/test_folder/test.lance"),
1216            format!("{drive_letter}:\\test_folder\\test.lance"),
1217        ] {
1218            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1219            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1220                .await
1221                .unwrap();
1222            assert_eq!(contents, "WINDOWS");
1223        }
1224    }
1225
1226    #[tokio::test]
1227    async fn test_cross_filesystem_copy() {
1228        // Create two temporary directories that simulate different filesystems
1229        let source_dir = TempStdDir::default();
1230        let dest_dir = TempStdDir::default();
1231
1232        // Create a test file in the source directory
1233        let source_file_name = "test_file.txt";
1234        let source_file = source_dir.join(source_file_name);
1235        std::fs::write(&source_file, b"test content").unwrap();
1236
1237        // Create ObjectStore for local filesystem
1238        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1239            .await
1240            .unwrap();
1241
1242        // Create paths relative to the ObjectStore base
1243        let from_path = base_path.child(source_file_name);
1244
1245        // Use object_store::Path::parse for the destination
1246        let dest_file = dest_dir.join("copied_file.txt");
1247        let dest_str = dest_file.to_str().unwrap();
1248        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1249
1250        // Perform the copy operation
1251        store.copy(&from_path, &to_path).await.unwrap();
1252
1253        // Verify the file was copied correctly
1254        assert!(dest_file.exists());
1255        let copied_content = std::fs::read(&dest_file).unwrap();
1256        assert_eq!(copied_content, b"test content");
1257    }
1258
1259    #[tokio::test]
1260    async fn test_copy_creates_parent_directories() {
1261        let source_dir = TempStdDir::default();
1262        let dest_dir = TempStdDir::default();
1263
1264        // Create a test file in the source directory
1265        let source_file_name = "test_file.txt";
1266        let source_file = source_dir.join(source_file_name);
1267        std::fs::write(&source_file, b"test content").unwrap();
1268
1269        // Create ObjectStore for local filesystem
1270        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1271            .await
1272            .unwrap();
1273
1274        // Create paths
1275        let from_path = base_path.child(source_file_name);
1276
1277        // Create destination with nested directories that don't exist yet
1278        let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1279        let dest_str = dest_file.to_str().unwrap();
1280        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1281
1282        // Perform the copy operation - should create parent directories
1283        store.copy(&from_path, &to_path).await.unwrap();
1284
1285        // Verify the file was copied correctly and directories were created
1286        assert!(dest_file.exists());
1287        assert!(dest_file.parent().unwrap().exists());
1288        let copied_content = std::fs::read(&dest_file).unwrap();
1289        assert_eq!(copied_content, b"test content");
1290    }
1291}