Skip to main content

lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use futures::{FutureExt, Stream};
17use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
18use lance_core::deepsize::DeepSizeOf;
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::ObjectStoreExt as OSObjectStoreExt;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35#[cfg(target_os = "linux")]
36use crate::uring::{UringCurrentThreadReader, UringReader};
37#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
38pub(crate) mod dynamic_credentials;
39#[cfg(any(feature = "oss", feature = "huggingface", feature = "tos"))]
40pub(crate) mod dynamic_opendal;
41mod list_retry;
42pub mod providers;
43pub mod storage_options;
44#[cfg(test)]
45pub(crate) mod test_utils;
46pub mod throttle;
47mod tracing;
48use crate::object_reader::SmallReader;
49use crate::object_writer::{LocalWriter, WriteResult};
50use crate::traits::{WriteExt, Writer};
51use crate::utils::tracking_store::{IOTracker, IoStats};
52use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
53use lance_core::{Error, Result};
54
55// Local disks tend to do fine with a few threads
56// Note: the number of threads here also impacts the number of files
57// we need to read in some situations.  So keeping this at 8 keeps the
58// RAM on our scanner down.
59pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
60// Cloud disks often need many many threads to saturate the network
61pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
62
63const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
64#[cfg(any(
65    feature = "aws",
66    feature = "gcp",
67    feature = "azure",
68    feature = "oss",
69    feature = "tencent",
70    feature = "huggingface",
71    feature = "tos",
72))]
73const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
74
75pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
76    std::env::var("LANCE_MAX_IOP_SIZE")
77        .map(|val| val.parse().unwrap())
78        .unwrap_or(16 * 1024 * 1024)
79});
80
81pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
82
83pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
84pub use storage_options::{
85    EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
86    StorageOptionsAccessor, StorageOptionsProvider,
87};
88
89#[async_trait]
90pub trait ObjectStoreExt {
91    /// Returns true if the file exists.
92    async fn exists(&self, path: &Path) -> Result<bool>;
93
94    /// Read all files (start from base directory) recursively
95    ///
96    /// unmodified_since can be specified to only return files that have not been modified since the given time.
97    fn read_dir_all<'a, 'b>(
98        &'a self,
99        dir_path: impl Into<&'b Path> + Send,
100        unmodified_since: Option<DateTime<Utc>>,
101    ) -> BoxStream<'a, Result<ObjectMeta>>;
102}
103
104#[async_trait]
105impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
106    fn read_dir_all<'a, 'b>(
107        &'a self,
108        dir_path: impl Into<&'b Path> + Send,
109        unmodified_since: Option<DateTime<Utc>>,
110    ) -> BoxStream<'a, Result<ObjectMeta>> {
111        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
112        if let Some(unmodified_since_val) = unmodified_since {
113            output
114                .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
115                .boxed()
116        } else {
117            output.boxed()
118        }
119    }
120
121    async fn exists(&self, path: &Path) -> Result<bool> {
122        match self.head(path).await {
123            Ok(_) => Ok(true),
124            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
125            Err(e) => Err(e.into()),
126        }
127    }
128}
129
130/// Wraps [ObjectStore](object_store::ObjectStore)
131#[derive(Debug, Clone)]
132pub struct ObjectStore {
133    // Inner object store
134    pub inner: Arc<dyn OSObjectStore>,
135    scheme: String,
136    block_size: usize,
137    max_iop_size: u64,
138    /// Whether to use constant size upload parts for multipart uploads. This
139    /// is only necessary for Cloudflare R2.
140    pub use_constant_size_upload_parts: bool,
141    /// Whether we can assume that the list of files is lexically ordered. This
142    /// is true for object stores, but not for local filesystems.
143    pub list_is_lexically_ordered: bool,
144    io_parallelism: usize,
145    /// Number of times to retry a failed download
146    download_retry_count: usize,
147    /// IO tracker for monitoring read/write operations
148    io_tracker: IOTracker,
149    /// The datastore prefix that uniquely identifies this object store. It encodes information
150    /// which usually cannot be found in the URL such as Azure account name. The prefix plus the
151    /// path uniquely identifies any object inside the store.
152    pub store_prefix: String,
153}
154
155impl DeepSizeOf for ObjectStore {
156    fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
157        // We aren't counting `inner` here which is problematic but an ObjectStore
158        // shouldn't be too big.  The only exception might be the write cache but, if
159        // the writer cache has data, it means we're using it somewhere else that isn't
160        // a cache and so that doesn't really count.
161        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
162    }
163}
164
165impl std::fmt::Display for ObjectStore {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        write!(f, "ObjectStore({})", self.scheme)
168    }
169}
170
171pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
172    /// Wrap an object store with additional functionality
173    ///
174    /// The store_prefix is a string which uniquely identifies the object
175    /// store being wrapped.
176    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
177}
178
179#[derive(Debug, Clone)]
180pub struct ChainedWrappingObjectStore {
181    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
182}
183
184impl ChainedWrappingObjectStore {
185    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
186        Self { wrappers }
187    }
188
189    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
190        self.wrappers.push(wrapper);
191    }
192}
193
194impl WrappingObjectStore for ChainedWrappingObjectStore {
195    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
196        self.wrappers
197            .iter()
198            .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
199    }
200}
201
202/// Parameters to create an [ObjectStore]
203///
204#[derive(Debug, Clone)]
205pub struct ObjectStoreParams {
206    pub block_size: Option<usize>,
207    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
208    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
209    /// Refresh offset for AWS credentials when using the legacy AWS credentials path.
210    /// For StorageOptionsAccessor, use `refresh_offset_millis` storage option instead.
211    pub s3_credentials_refresh_offset: Duration,
212    #[cfg(feature = "aws")]
213    pub aws_credentials: Option<AwsCredentialProvider>,
214    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
215    /// Unified storage options accessor with caching and automatic refresh
216    ///
217    /// Provides storage options and optionally a dynamic provider for automatic
218    /// credential refresh. Use `StorageOptionsAccessor::with_static_options()` for static
219    /// options or `StorageOptionsAccessor::with_initial_and_provider()` for dynamic refresh.
220    pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
221    /// Use constant size upload parts for multipart uploads. Only necessary
222    /// for Cloudflare R2, which doesn't support variable size parts. When this
223    /// is false, max upload size is 2.5TB. When this is true, the max size is
224    /// 50GB.
225    pub use_constant_size_upload_parts: bool,
226    pub list_is_lexically_ordered: Option<bool>,
227}
228
229impl Default for ObjectStoreParams {
230    fn default() -> Self {
231        #[allow(deprecated)]
232        Self {
233            object_store: None,
234            block_size: None,
235            s3_credentials_refresh_offset: Duration::from_secs(60),
236            #[cfg(feature = "aws")]
237            aws_credentials: None,
238            object_store_wrapper: None,
239            storage_options_accessor: None,
240            use_constant_size_upload_parts: false,
241            list_is_lexically_ordered: None,
242        }
243    }
244}
245
246impl ObjectStoreParams {
247    /// Get the StorageOptionsAccessor from the params
248    pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
249        self.storage_options_accessor.clone()
250    }
251
252    /// Get storage options from the accessor, if any
253    ///
254    /// Returns the initial storage options from the accessor without triggering refresh.
255    pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
256        self.storage_options_accessor
257            .as_ref()
258            .and_then(|a| a.initial_storage_options())
259    }
260}
261
262// We implement hash for caching
263impl std::hash::Hash for ObjectStoreParams {
264    #[allow(deprecated)]
265    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
266        // For hashing, we use pointer values for ObjectStore, S3 credentials, wrapper
267        self.block_size.hash(state);
268        if let Some((store, url)) = &self.object_store {
269            Arc::as_ptr(store).hash(state);
270            url.hash(state);
271        }
272        self.s3_credentials_refresh_offset.hash(state);
273        #[cfg(feature = "aws")]
274        if let Some(aws_credentials) = &self.aws_credentials {
275            Arc::as_ptr(aws_credentials).hash(state);
276        }
277        if let Some(wrapper) = &self.object_store_wrapper {
278            Arc::as_ptr(wrapper).hash(state);
279        }
280        if let Some(accessor) = &self.storage_options_accessor {
281            accessor.accessor_id().hash(state);
282        }
283        self.use_constant_size_upload_parts.hash(state);
284        self.list_is_lexically_ordered.hash(state);
285    }
286}
287
288// We implement eq for caching
289impl Eq for ObjectStoreParams {}
290impl PartialEq for ObjectStoreParams {
291    #[allow(deprecated)]
292    fn eq(&self, other: &Self) -> bool {
293        #[cfg(feature = "aws")]
294        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
295            return false;
296        }
297
298        // For equality, we use pointer comparison for ObjectStore, S3 credentials, wrapper
299        // For accessor, we use accessor_id() for semantic equality
300        self.block_size == other.block_size
301            && self
302                .object_store
303                .as_ref()
304                .map(|(store, url)| (Arc::as_ptr(store), url))
305                == other
306                    .object_store
307                    .as_ref()
308                    .map(|(store, url)| (Arc::as_ptr(store), url))
309            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
310            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
311                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
312            && self
313                .storage_options_accessor
314                .as_ref()
315                .map(|a| a.accessor_id())
316                == other
317                    .storage_options_accessor
318                    .as_ref()
319                    .map(|a| a.accessor_id())
320            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
321            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
322    }
323}
324
325/// Convert a URI string or local path to a URL
326///
327/// This function handles both proper URIs (with schemes like `file://`, `s3://`, etc.)
328/// and plain local filesystem paths. On Windows, it correctly handles drive letters
329/// that might be parsed as URL schemes.
330///
331/// # Examples
332///
333/// ```
334/// # use lance_io::object_store::uri_to_url;
335/// // URIs are preserved
336/// let url = uri_to_url("s3://bucket/path").unwrap();
337/// assert_eq!(url.scheme(), "s3");
338///
339/// // Local paths are converted to file:// URIs
340/// # #[cfg(unix)]
341/// let url = uri_to_url("/tmp/data").unwrap();
342/// # #[cfg(unix)]
343/// assert_eq!(url.scheme(), "file");
344/// ```
345pub fn uri_to_url(uri: &str) -> Result<Url> {
346    match Url::parse(uri) {
347        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
348            // On Windows, the drive is parsed as a scheme
349            local_path_to_url(uri)
350        }
351        Ok(url) => Ok(url),
352        Err(_) => local_path_to_url(uri),
353    }
354}
355
356fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
357    let str_path = str_path.as_ref();
358    let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
359
360    let mut expanded_path = path_abs::PathAbs::new(expanded)
361        .unwrap()
362        .as_path()
363        .to_path_buf();
364    // path_abs::PathAbs::new(".") returns an empty string.
365    if let Some(s) = expanded_path.as_path().to_str()
366        && s.is_empty()
367    {
368        expanded_path = std::env::current_dir()?;
369    }
370
371    Ok(expanded_path)
372}
373
374fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
375    let home_dir = std::env::home_dir()?;
376    if path == "~" {
377        return Some(home_dir);
378    }
379    if let Some(stripped) = path.strip_prefix("~/") {
380        return Some(home_dir.join(stripped));
381    }
382    #[cfg(windows)]
383    if let Some(stripped) = path.strip_prefix("~\\") {
384        return Some(home_dir.join(stripped));
385    }
386
387    None
388}
389
390fn local_path_to_url(str_path: &str) -> Result<Url> {
391    let expanded_path = expand_path(str_path)?;
392
393    Url::from_directory_path(expanded_path).map_err(|_| {
394        Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
395    })
396}
397
398#[cfg(feature = "huggingface")]
399fn parse_hf_repo_id(url: &Url) -> Result<String> {
400    // Accept forms with repo type prefix (models/datasets/spaces) or legacy without.
401    let mut segments: Vec<String> = Vec::new();
402    if let Some(host) = url.host_str() {
403        segments.push(host.to_string());
404    }
405    segments.extend(
406        url.path()
407            .trim_start_matches('/')
408            .split('/')
409            .map(|s| s.to_string()),
410    );
411
412    if segments.len() < 2 {
413        return Err(Error::invalid_input(
414            "Huggingface URL must contain at least owner and repo",
415        ));
416    }
417
418    let repo_type_candidates = ["models", "datasets", "spaces"];
419    let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
420        if segments.len() < 3 {
421            return Err(Error::invalid_input(
422                "Huggingface URL missing owner/repo after repo type",
423            ));
424        }
425        (segments[1].as_str(), segments[2].as_str())
426    } else {
427        (segments[0].as_str(), segments[1].as_str())
428    };
429
430    let repo = repo_with_rev
431        .split_once('@')
432        .map(|(r, _)| r)
433        .unwrap_or(repo_with_rev);
434    Ok(format!("{owner}/{repo}"))
435}
436
437impl ObjectStore {
438    /// Parse from a string URI.
439    ///
440    /// Returns the ObjectStore instance and the absolute path to the object.
441    ///
442    /// This uses the default [ObjectStoreRegistry] to find the object store. To
443    /// allow for potential re-use of object store instances, it's recommended to
444    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
445    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
446        let registry = Arc::new(ObjectStoreRegistry::default());
447
448        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
449    }
450
451    /// Parse from a string URI.
452    ///
453    /// Returns the ObjectStore instance and the absolute path to the object.
454    pub async fn from_uri_and_params(
455        registry: Arc<ObjectStoreRegistry>,
456        uri: &str,
457        params: &ObjectStoreParams,
458    ) -> Result<(Arc<Self>, Path)> {
459        #[allow(deprecated)]
460        if let Some((store, path)) = params.object_store.as_ref() {
461            let mut inner = store.clone();
462            let store_prefix =
463                registry.calculate_object_store_prefix(uri, params.storage_options())?;
464            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
465                inner = wrapper.wrap(&store_prefix, inner);
466            }
467
468            // Always wrap with IO tracking
469            let io_tracker = IOTracker::default();
470            let tracked_store = io_tracker.wrap("", inner);
471
472            let store = Self {
473                inner: tracked_store,
474                scheme: path.scheme().to_string(),
475                block_size: params.block_size.unwrap_or(64 * 1024),
476                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
477                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
478                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
479                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
480                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
481                io_tracker,
482                store_prefix,
483            };
484            let path = Path::parse(path.path())?;
485            return Ok((Arc::new(store), path));
486        }
487        let url = uri_to_url(uri)?;
488
489        let store = registry.get_store(url.clone(), params).await?;
490        // We know the scheme is valid if we got a store back.
491        let provider = registry.get_provider(url.scheme()).expect_ok()?;
492        let path = provider.extract_path(&url)?;
493
494        Ok((store, path))
495    }
496
497    /// Extract the path component from a URI without initializing the object store.
498    ///
499    /// This is a synchronous operation that only parses the URI and extracts the path,
500    /// without creating or initializing any object store instance.
501    ///
502    /// # Arguments
503    ///
504    /// * `registry` - The object store registry to get the provider
505    /// * `uri` - The URI to extract the path from
506    ///
507    /// # Returns
508    ///
509    /// The extracted path component
510    pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
511        let url = uri_to_url(uri)?;
512        let provider = registry
513            .get_provider(url.scheme())
514            .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
515        provider.extract_path(&url)
516    }
517
518    #[deprecated(note = "Use `from_uri` instead")]
519    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
520        Self::from_uri_and_params(
521            Arc::new(ObjectStoreRegistry::default()),
522            str_path,
523            &Default::default(),
524        )
525        .now_or_never()
526        .unwrap()
527    }
528
529    /// Local object store.
530    pub fn local() -> Self {
531        let provider = FileStoreProvider;
532        provider
533            .new_store(Url::parse("file:///").unwrap(), &Default::default())
534            .now_or_never()
535            .unwrap()
536            .unwrap()
537    }
538
539    /// Create a in-memory object store directly for testing.
540    pub fn memory() -> Self {
541        let provider = MemoryStoreProvider;
542        provider
543            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
544            .now_or_never()
545            .unwrap()
546            .unwrap()
547    }
548
549    /// Returns true if the object store pointed to a local file system.
550    pub fn is_local(&self) -> bool {
551        self.scheme == "file" || self.scheme == "file+uring"
552    }
553
554    pub fn is_cloud(&self) -> bool {
555        if self.is_local() || self.scheme == "memory" || self.scheme == "shared-memory" {
556            return false;
557        }
558        true
559    }
560
561    /// Whether this object store prefers the lite scheduler.
562    ///
563    /// The lite scheduler is designed for backends like io_uring where
564    /// tasks should only be polled when the consumer polls them.
565    pub fn prefers_lite_scheduler(&self) -> bool {
566        self.scheme == "file+uring"
567    }
568
569    pub fn scheme(&self) -> &str {
570        &self.scheme
571    }
572
573    pub fn block_size(&self) -> usize {
574        self.block_size
575    }
576
577    pub fn max_iop_size(&self) -> u64 {
578        self.max_iop_size
579    }
580
581    pub fn io_parallelism(&self) -> usize {
582        std::env::var("LANCE_IO_THREADS")
583            .map(|val| val.parse::<usize>().unwrap())
584            .unwrap_or(self.io_parallelism)
585    }
586
587    /// Get the IO tracker for this object store
588    ///
589    /// The IO tracker can be used to get statistics about read/write operations
590    /// performed on this object store.
591    pub fn io_tracker(&self) -> &IOTracker {
592        &self.io_tracker
593    }
594
595    /// Get a snapshot of current IO statistics without resetting counters
596    ///
597    /// Returns the current IO statistics without modifying the internal state.
598    /// Use this when you need to check stats without resetting them.
599    pub fn io_stats_snapshot(&self) -> IoStats {
600        self.io_tracker.stats()
601    }
602
603    /// Get incremental IO statistics since the last call to this method
604    ///
605    /// Returns the accumulated statistics since the last call and resets the
606    /// counters to zero. This is useful for tracking IO operations between
607    /// different stages of processing.
608    pub fn io_stats_incremental(&self) -> IoStats {
609        self.io_tracker.incremental_stats()
610    }
611
612    /// Open a file for path.
613    ///
614    /// Parameters
615    /// - ``path``: Absolute path to the file.
616    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
617        match self.scheme.as_str() {
618            "file" => {
619                LocalObjectReader::open_with_tracker(
620                    path,
621                    self.block_size,
622                    None,
623                    Arc::new(self.io_tracker.clone()),
624                )
625                .await
626            }
627            #[cfg(target_os = "linux")]
628            "file+uring" => {
629                // Check if current-thread mode enabled
630                let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
631                    .map(|v| str_is_truthy(&v))
632                    .unwrap_or(false);
633
634                if use_current_thread {
635                    UringCurrentThreadReader::open(
636                        path,
637                        self.block_size,
638                        None,
639                        Arc::new(self.io_tracker.clone()),
640                    )
641                    .await
642                } else {
643                    UringReader::open(
644                        path,
645                        self.block_size,
646                        None,
647                        Arc::new(self.io_tracker.clone()),
648                    )
649                    .await
650                }
651            }
652            _ => Ok(Box::new(CloudObjectReader::new(
653                self.inner.clone(),
654                path.clone(),
655                self.block_size,
656                None,
657                self.download_retry_count,
658            )?)),
659        }
660    }
661
662    /// Open a reader for a file with known size.
663    ///
664    /// This size may either have been retrieved from a list operation or
665    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
666    /// call.
667    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
668        // If we know the file is really small, we can read the whole thing
669        // as a single request.
670        if known_size <= self.block_size {
671            return Ok(Box::new(SmallReader::new(
672                self.inner.clone(),
673                path.clone(),
674                self.download_retry_count,
675                known_size,
676            )));
677        }
678
679        match self.scheme.as_str() {
680            "file" => {
681                LocalObjectReader::open_with_tracker(
682                    path,
683                    self.block_size,
684                    Some(known_size),
685                    Arc::new(self.io_tracker.clone()),
686                )
687                .await
688            }
689            #[cfg(target_os = "linux")]
690            "file+uring" => {
691                // Check if current-thread mode enabled
692                let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
693                    .map(|v| str_is_truthy(&v))
694                    .unwrap_or(false);
695
696                if use_current_thread {
697                    UringCurrentThreadReader::open(
698                        path,
699                        self.block_size,
700                        Some(known_size),
701                        Arc::new(self.io_tracker.clone()),
702                    )
703                    .await
704                } else {
705                    UringReader::open(
706                        path,
707                        self.block_size,
708                        Some(known_size),
709                        Arc::new(self.io_tracker.clone()),
710                    )
711                    .await
712                }
713            }
714            _ => Ok(Box::new(CloudObjectReader::new(
715                self.inner.clone(),
716                path.clone(),
717                self.block_size,
718                Some(known_size),
719                self.download_retry_count,
720            )?)),
721        }
722    }
723
724    /// Create an [ObjectWriter] from local [std::path::Path]
725    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
726        let object_store = Self::local();
727        let absolute_path = expand_path(path.to_string_lossy())?;
728        let os_path = Path::from_absolute_path(absolute_path)?;
729        ObjectWriter::new(&object_store, &os_path).await
730    }
731
732    /// Open an [Reader] from local [std::path::Path]
733    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
734        let object_store = Self::local();
735        let absolute_path = expand_path(path.to_string_lossy())?;
736        let os_path = Path::from_absolute_path(absolute_path)?;
737        object_store.open(&os_path).await
738    }
739
740    /// Create a new file.
741    pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
742        match self.scheme.as_str() {
743            "file" => {
744                let local_path = super::local::to_local_path(path);
745                let local_path = std::path::PathBuf::from(&local_path);
746                if let Some(parent) = local_path.parent() {
747                    tokio::fs::create_dir_all(parent).await?;
748                }
749                let parent = local_path
750                    .parent()
751                    .expect("file path must have parent")
752                    .to_owned();
753                let named_temp =
754                    tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
755                        .await
756                        .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
757                let (std_file, temp_path) = named_temp.into_parts();
758                let file = tokio::fs::File::from_std(std_file);
759                Ok(Box::new(LocalWriter::new(
760                    file,
761                    path.clone(),
762                    temp_path,
763                    Arc::new(self.io_tracker.clone()),
764                )))
765            }
766            _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
767        }
768    }
769
770    /// A helper function to create a file and write content to it.
771    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
772        let mut writer = self.create(path).await?;
773        writer.write_all(content).await?;
774        Writer::shutdown(writer.as_mut()).await
775    }
776
777    pub async fn delete(&self, path: &Path) -> Result<()> {
778        self.inner.delete(path).await?;
779        Ok(())
780    }
781
782    /// AWS S3 and GCS reject a single-shot server-side copy whose source is
783    /// larger than this; such sources are streamed through a multipart write.
784    const MAX_SINGLE_COPY_BYTES: u64 = 5 * 1024 * 1024 * 1024; // 5 GiB
785
786    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
787        // S3 and GCS cap single-shot server-side copies at 5 GiB and object_store
788        // does not fall back to a multipart copy for larger sources
789        // (https://github.com/apache/arrow-rs-object-store/issues/563). Azure and
790        // other blob stores don't have this limit, so we only pay for the fallback
791        // (an extra size lookup) on S3 and GCS.
792        let multipart_copy_fallback = matches!(self.scheme.as_str(), "s3" | "s3+ddb" | "gs");
793        self.copy_impl(
794            from,
795            to,
796            multipart_copy_fallback,
797            Self::MAX_SINGLE_COPY_BYTES,
798        )
799        .await
800    }
801
802    /// Copy `from` to `to`. When `multipart_copy_fallback` is set, a source
803    /// larger than `max_single_copy` is streamed through a multipart write
804    /// instead of a single-shot server-side copy. Both are parameters so tests
805    /// can drive the streaming path without a multi-gigabyte fixture or an S3
806    /// endpoint.
807    async fn copy_impl(
808        &self,
809        from: &Path,
810        to: &Path,
811        multipart_copy_fallback: bool,
812        max_single_copy: u64,
813    ) -> Result<()> {
814        if self.is_local() {
815            // Use std::fs::copy for local filesystem to support cross-filesystem copies
816            return super::local::copy_file(from, to);
817        }
818        if multipart_copy_fallback {
819            // Reuse the reader for both the size lookup (a single cached HEAD)
820            // and the streamed copy, avoiding a separate HEAD request.
821            let reader = self.open(from).await?;
822            if reader.size().await? as u64 > max_single_copy {
823                let mut writer = self.create(to).await?;
824                writer.copy_from_reader(reader.as_ref()).await?;
825                Writer::shutdown(writer.as_mut()).await?;
826                return Ok(());
827            }
828        }
829        Ok(self.inner.copy(from, to).await?)
830    }
831
832    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
833    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
834        let path = dir_path.into();
835        let path = Path::parse(&path)?;
836        let output = self.inner.list_with_delimiter(Some(&path)).await?;
837        Ok(output
838            .common_prefixes
839            .iter()
840            .chain(output.objects.iter().map(|o| &o.location))
841            .filter_map(|s| s.filename().map(|f| f.to_string()))
842            .collect())
843    }
844
845    pub fn list(
846        &self,
847        path: Option<Path>,
848    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
849        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
850    }
851
852    /// Read all files (start from base directory) recursively
853    ///
854    /// unmodified_since can be specified to only return files that have not been modified since the given time.
855    pub fn read_dir_all<'a, 'b>(
856        &'a self,
857        dir_path: impl Into<&'b Path> + Send,
858        unmodified_since: Option<DateTime<Utc>>,
859    ) -> BoxStream<'a, Result<ObjectMeta>> {
860        self.inner.read_dir_all(dir_path, unmodified_since)
861    }
862
863    /// Remove a directory recursively.
864    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
865        let path = dir_path.into();
866        let path = Path::parse(&path)?;
867
868        if self.is_local() {
869            // The local file system provider needs to delete both files and directories.
870            return super::local::remove_dir_all(&path);
871        }
872        let sub_entries = self
873            .inner
874            .list(Some(&path))
875            .map(|m| m.map(|meta| meta.location))
876            .boxed();
877        self.inner
878            .delete_stream(sub_entries)
879            .try_collect::<Vec<_>>()
880            .await?;
881        if self.scheme == "file-object-store" {
882            // file-object-store tries to do everything as similarly as possible to the remote
883            // object stores. But we still have to delete the directory entries afterwards.
884            return super::local::remove_dir_all(&path);
885        }
886        Ok(())
887    }
888
889    pub fn remove_stream<'a>(
890        &'a self,
891        locations: BoxStream<'a, Result<Path>>,
892    ) -> BoxStream<'a, Result<Path>> {
893        let store = Arc::clone(&self.inner);
894        locations
895            .and_then(move |location| {
896                let store = Arc::clone(&store);
897                async move {
898                    store.delete(&location).await?;
899                    Ok(location)
900                }
901            })
902            .boxed()
903    }
904
905    /// Check a file exists.
906    pub async fn exists(&self, path: &Path) -> Result<bool> {
907        match self.inner.head(path).await {
908            Ok(_) => Ok(true),
909            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
910            Err(e) => Err(e.into()),
911        }
912    }
913
914    /// Get file size.
915    pub async fn size(&self, path: &Path) -> Result<u64> {
916        Ok(self.inner.head(path).await?.size)
917    }
918
919    /// Convenience function to open a reader and read all the bytes
920    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
921        let reader = self.open(path).await?;
922        Ok(reader.get_all().await?)
923    }
924
925    /// Convenience function open a reader and make a single request
926    ///
927    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
928    /// and then call [`Reader::get_range`] multiple times.
929    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
930        let reader = self.open(path).await?;
931        Ok(reader.get_range(range).await?)
932    }
933}
934
935/// Options that can be set for multiple object stores
936#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
937pub enum LanceConfigKey {
938    /// Number of times to retry a download that fails
939    DownloadRetryCount,
940}
941
942impl FromStr for LanceConfigKey {
943    type Err = Error;
944
945    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
946        match s.to_ascii_lowercase().as_str() {
947            "download_retry_count" => Ok(Self::DownloadRetryCount),
948            _ => Err(Error::invalid_input_source(
949                format!("Invalid LanceConfigKey: {}", s).into(),
950            )),
951        }
952    }
953}
954
955#[derive(Clone, Debug, Default)]
956pub struct StorageOptions(pub HashMap<String, String>);
957
958impl StorageOptions {
959    /// Create a new instance of [`StorageOptions`]
960    pub fn new(options: HashMap<String, String>) -> Self {
961        let mut options = options;
962        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
963            options.insert("allow_http".into(), value);
964        }
965        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
966            options.insert("allow_http".into(), value);
967        }
968        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
969            options.insert("allow_http".into(), value);
970        }
971        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
972            options.insert("client_max_retries".into(), value);
973        }
974        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
975            options.insert("client_retry_timeout".into(), value);
976        }
977        Self(options)
978    }
979
980    /// Denotes if unsecure connections via http are allowed
981    pub fn allow_http(&self) -> bool {
982        self.0.iter().any(|(key, value)| {
983            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
984        })
985    }
986
987    /// Number of times to retry a download that fails
988    pub fn download_retry_count(&self) -> usize {
989        self.0
990            .iter()
991            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
992            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
993            .unwrap_or(3)
994    }
995
996    /// Max retry times to set in RetryConfig for object store client
997    pub fn client_max_retries(&self) -> usize {
998        self.0
999            .iter()
1000            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
1001            .and_then(|(_, value)| value.parse::<usize>().ok())
1002            .unwrap_or(3)
1003    }
1004
1005    /// Seconds of timeout to set in RetryConfig for object store client
1006    pub fn client_retry_timeout(&self) -> u64 {
1007        self.0
1008            .iter()
1009            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
1010            .and_then(|(_, value)| value.parse::<u64>().ok())
1011            .unwrap_or(180)
1012    }
1013
1014    pub fn get(&self, key: &str) -> Option<&String> {
1015        self.0.get(key)
1016    }
1017
1018    /// Build [`ClientOptions`] with default headers extracted from `headers.*` keys.
1019    ///
1020    /// Keys prefixed with `headers.` are parsed into HTTP headers. For example,
1021    /// `headers.x-ms-version = 2023-11-03` results in a default header
1022    /// `x-ms-version: 2023-11-03`.
1023    ///
1024    /// Returns an error if any `headers.*` key has an invalid header name or value.
1025    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1026    pub fn client_options(&self) -> Result<ClientOptions> {
1027        let mut headers = HeaderMap::new();
1028        for (key, value) in &self.0 {
1029            if let Some(header_name) = key.strip_prefix("headers.") {
1030                let name = header_name
1031                    .parse::<http::header::HeaderName>()
1032                    .map_err(|e| {
1033                        Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
1034                    })?;
1035                let val = HeaderValue::from_str(value).map_err(|e| {
1036                    Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
1037                })?;
1038                headers.insert(name, val);
1039            }
1040        }
1041        let mut client_options = ClientOptions::default();
1042        if !headers.is_empty() {
1043            client_options = client_options.with_default_headers(headers);
1044        }
1045        Ok(client_options)
1046    }
1047
1048    /// Get the expiration time in milliseconds since epoch, if present
1049    pub fn expires_at_millis(&self) -> Option<u64> {
1050        self.0
1051            .get(EXPIRES_AT_MILLIS_KEY)
1052            .and_then(|s| s.parse::<u64>().ok())
1053    }
1054}
1055
1056impl From<HashMap<String, String>> for StorageOptions {
1057    fn from(value: HashMap<String, String>) -> Self {
1058        Self::new(value)
1059    }
1060}
1061
1062static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
1063    std::sync::LazyLock::new(ObjectStoreRegistry::default);
1064
1065impl ObjectStore {
1066    #[allow(clippy::too_many_arguments)]
1067    pub fn new(
1068        store: Arc<DynObjectStore>,
1069        location: Url,
1070        block_size: Option<usize>,
1071        wrapper: Option<Arc<dyn WrappingObjectStore>>,
1072        use_constant_size_upload_parts: bool,
1073        list_is_lexically_ordered: bool,
1074        io_parallelism: usize,
1075        download_retry_count: usize,
1076        storage_options: Option<&HashMap<String, String>>,
1077    ) -> Self {
1078        let scheme = location.scheme();
1079        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
1080        let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
1081            Some(provider) => provider
1082                .calculate_object_store_prefix(&location, storage_options)
1083                .unwrap(),
1084            None => {
1085                let store_prefix = format!("{}${}", location.scheme(), location.authority());
1086                log::warn!(
1087                    "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
1088                    store_prefix
1089                );
1090                store_prefix
1091            }
1092        };
1093        let store = match wrapper {
1094            Some(wrapper) => wrapper.wrap(&store_prefix, store),
1095            None => store,
1096        };
1097
1098        // Always wrap with IO tracking
1099        let io_tracker = IOTracker::default();
1100        let tracked_store = io_tracker.wrap("", store);
1101
1102        Self {
1103            inner: tracked_store,
1104            scheme: scheme.into(),
1105            block_size,
1106            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
1107            use_constant_size_upload_parts,
1108            list_is_lexically_ordered,
1109            io_parallelism,
1110            download_retry_count,
1111            io_tracker,
1112            store_prefix,
1113        }
1114    }
1115}
1116
1117fn infer_block_size(scheme: &str) -> usize {
1118    // Block size: On local file systems, we use 4KB block size. On cloud
1119    // object stores, we use 64KB block size. This is generally the largest
1120    // block size where we don't see a latency penalty.
1121    match scheme {
1122        "file" => 4 * 1024,
1123        _ => 64 * 1024,
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use super::*;
1130    use async_trait::async_trait;
1131    use bytes::Bytes;
1132    use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1133    use object_store::memory::InMemory;
1134    use object_store::{
1135        CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
1136        PutOptions, PutPayload, PutResult, Result as OSResult,
1137    };
1138    use rstest::rstest;
1139    use std::env::set_current_dir;
1140    use std::fmt::{Display, Formatter};
1141    use std::fs::{create_dir_all, write};
1142    use std::ops::Range;
1143    use std::path::Path as StdPath;
1144    use std::sync::atomic::{AtomicBool, Ordering};
1145
1146    /// Write test content to file.
1147    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1148        let path = expand_path(path_str).map_err(std::io::Error::other)?;
1149        std::fs::create_dir_all(path.parent().unwrap())?;
1150        write(path, contents)
1151    }
1152
1153    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1154        let test_file_store = store.open(path).await.unwrap();
1155        let size = test_file_store.size().await.unwrap();
1156        let bytes = test_file_store.get_range(0..size).await.unwrap();
1157        let contents = String::from_utf8(bytes.to_vec()).unwrap();
1158        Ok(contents)
1159    }
1160
1161    #[tokio::test]
1162    async fn test_absolute_paths() {
1163        let tmp_path = TempStrDir::default();
1164        write_to_file(
1165            &format!("{tmp_path}/bar/foo.lance/test_file"),
1166            "TEST_CONTENT",
1167        )
1168        .unwrap();
1169
1170        // test a few variations of the same path
1171        for uri in &[
1172            format!("{tmp_path}/bar/foo.lance"),
1173            format!("{tmp_path}/./bar/foo.lance"),
1174            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1175        ] {
1176            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1177            let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1178                .await
1179                .unwrap();
1180            assert_eq!(contents, "TEST_CONTENT");
1181        }
1182    }
1183
1184    #[tokio::test]
1185    async fn test_cloud_paths() {
1186        let uri = "s3://bucket/foo.lance";
1187        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1188        assert_eq!(store.scheme, "s3");
1189        assert_eq!(path.to_string(), "foo.lance");
1190
1191        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1192            .await
1193            .unwrap();
1194        assert_eq!(store.scheme, "s3");
1195        assert_eq!(path.to_string(), "foo.lance");
1196
1197        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1198            .await
1199            .unwrap();
1200        assert_eq!(store.scheme, "gs");
1201        assert_eq!(path.to_string(), "foo.lance");
1202
1203        let (store, path) =
1204            ObjectStore::from_uri("abfss://filesystem@account.dfs.core.windows.net/foo.lance")
1205                .await
1206                .unwrap();
1207        assert_eq!(store.scheme, "abfss");
1208        assert_eq!(path.to_string(), "foo.lance");
1209    }
1210
1211    async fn test_block_size_used_test_helper(
1212        uri: &str,
1213        storage_options: Option<HashMap<String, String>>,
1214        default_expected_block_size: usize,
1215    ) {
1216        // Test the default
1217        let registry = Arc::new(ObjectStoreRegistry::default());
1218        let accessor = storage_options
1219            .clone()
1220            .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1221        let params = ObjectStoreParams {
1222            storage_options_accessor: accessor.clone(),
1223            ..ObjectStoreParams::default()
1224        };
1225        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1226            .await
1227            .unwrap();
1228        assert_eq!(store.block_size, default_expected_block_size);
1229
1230        // Ensure param is used
1231        let registry = Arc::new(ObjectStoreRegistry::default());
1232        let params = ObjectStoreParams {
1233            block_size: Some(1024),
1234            storage_options_accessor: accessor,
1235            ..ObjectStoreParams::default()
1236        };
1237        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1238            .await
1239            .unwrap();
1240        assert_eq!(store.block_size, 1024);
1241    }
1242
1243    #[rstest]
1244    #[case("s3://bucket/foo.lance", None)]
1245    #[case("gs://bucket/foo.lance", None)]
1246    #[case("az://account/bucket/foo.lance",
1247      Some(HashMap::from([
1248            (String::from("account_name"), String::from("account")),
1249            (String::from("container_name"), String::from("container"))
1250           ])))]
1251    #[case("abfss://filesystem@account.dfs.core.windows.net/foo.lance",
1252      Some(HashMap::from([
1253            (String::from("account_name"), String::from("account")),
1254            (String::from("container_name"), String::from("filesystem"))
1255           ])))]
1256    #[tokio::test]
1257    async fn test_block_size_used_cloud(
1258        #[case] uri: &str,
1259        #[case] storage_options: Option<HashMap<String, String>>,
1260    ) {
1261        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1262    }
1263
1264    #[rstest]
1265    #[case("file")]
1266    #[case("file-object-store")]
1267    #[case("memory:///bucket/foo.lance")]
1268    #[tokio::test]
1269    async fn test_block_size_used_file(#[case] prefix: &str) {
1270        let tmp_path = TempStrDir::default();
1271        let path = format!("{tmp_path}/bar/foo.lance/test_file");
1272        write_to_file(&path, "URL").unwrap();
1273        let uri = format!("{prefix}:///{path}");
1274        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1275    }
1276
1277    #[tokio::test]
1278    async fn test_relative_paths() {
1279        let tmp_path = TempStrDir::default();
1280        write_to_file(
1281            &format!("{tmp_path}/bar/foo.lance/test_file"),
1282            "RELATIVE_URL",
1283        )
1284        .unwrap();
1285
1286        set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1287        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1288
1289        let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1290            .await
1291            .unwrap();
1292        assert_eq!(contents, "RELATIVE_URL");
1293    }
1294
1295    #[tokio::test]
1296    async fn test_tilde_expansion() {
1297        let uri = "~/foo.lance";
1298        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1299        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1300        let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1301            .await
1302            .unwrap();
1303        assert_eq!(contents, "TILDE");
1304    }
1305
1306    #[tokio::test]
1307    async fn test_read_directory() {
1308        let path = TempStdDir::default();
1309        create_dir_all(path.join("foo").join("bar")).unwrap();
1310        create_dir_all(path.join("foo").join("zoo")).unwrap();
1311        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1312        write_to_file(
1313            path.join("foo").join("test_file").to_str().unwrap(),
1314            "read_dir",
1315        )
1316        .unwrap();
1317        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1318
1319        let sub_dirs = store.read_dir(base.clone().join("foo")).await.unwrap();
1320        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1321    }
1322
1323    #[tokio::test]
1324    async fn test_delete_directory_local_store() {
1325        test_delete_directory("").await;
1326    }
1327
1328    #[tokio::test]
1329    async fn test_delete_directory_file_object_store() {
1330        test_delete_directory("file-object-store").await;
1331    }
1332
1333    async fn test_delete_directory(scheme: &str) {
1334        let path = TempStdDir::default();
1335        create_dir_all(path.join("foo").join("bar")).unwrap();
1336        create_dir_all(path.join("foo").join("zoo")).unwrap();
1337        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1338        write_to_file(
1339            path.join("foo")
1340                .join("bar")
1341                .join("test_file")
1342                .to_str()
1343                .unwrap(),
1344            "delete",
1345        )
1346        .unwrap();
1347        let file_url = Url::from_directory_path(&path).unwrap();
1348        let url = if scheme.is_empty() {
1349            file_url
1350        } else {
1351            let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1352            // Use the file:// URL's normalized path so this works on Windows too.
1353            url.set_path(file_url.path());
1354            url
1355        };
1356        let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1357        store
1358            .remove_dir_all(base.clone().join("foo"))
1359            .await
1360            .unwrap();
1361
1362        assert!(!path.join("foo").exists());
1363    }
1364
1365    #[derive(Debug)]
1366    struct TestWrapper {
1367        called: AtomicBool,
1368
1369        return_value: Arc<dyn OSObjectStore>,
1370    }
1371
1372    impl WrappingObjectStore for TestWrapper {
1373        fn wrap(
1374            &self,
1375            _store_prefix: &str,
1376            _original: Arc<dyn OSObjectStore>,
1377        ) -> Arc<dyn OSObjectStore> {
1378            self.called.store(true, Ordering::Relaxed);
1379
1380            // return a mocked value so we can check if the final store is the one we expect
1381            self.return_value.clone()
1382        }
1383    }
1384
1385    impl TestWrapper {
1386        fn called(&self) -> bool {
1387            self.called.load(Ordering::Relaxed)
1388        }
1389    }
1390
1391    #[tokio::test]
1392    async fn test_wrapping_object_store_option_is_used() {
1393        // Make a store for the inner store first
1394        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1395        let registry = Arc::new(ObjectStoreRegistry::default());
1396
1397        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1398
1399        let wrapper = Arc::new(TestWrapper {
1400            called: AtomicBool::new(false),
1401            return_value: mock_inner_store.clone(),
1402        });
1403
1404        let params = ObjectStoreParams {
1405            object_store_wrapper: Some(wrapper.clone()),
1406            ..ObjectStoreParams::default()
1407        };
1408
1409        // not called yet
1410        assert!(!wrapper.called());
1411
1412        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
1413            .await
1414            .unwrap();
1415
1416        // called after construction
1417        assert!(wrapper.called());
1418
1419        // hard to compare two trait pointers as the point to vtables
1420        // using the ref count as a proxy to make sure that the store is correctly kept
1421        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1422    }
1423
1424    #[tokio::test]
1425    async fn test_local_paths() {
1426        let file_path = TempStdFile::default();
1427        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1428        writer.write_all(b"LOCAL").await.unwrap();
1429        Writer::shutdown(&mut writer).await.unwrap();
1430
1431        let reader = ObjectStore::open_local(&file_path).await.unwrap();
1432        let buf = reader.get_range(0..5).await.unwrap();
1433        assert_eq!(buf.as_ref(), b"LOCAL");
1434    }
1435
1436    #[tokio::test]
1437    async fn test_read_one() {
1438        let file_path = TempStdFile::default();
1439        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1440        writer.write_all(b"LOCAL").await.unwrap();
1441        Writer::shutdown(&mut writer).await.unwrap();
1442
1443        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1444        let obj_store = ObjectStore::local();
1445        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1446        assert_eq!(buf.as_ref(), b"LOCAL");
1447
1448        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1449        assert_eq!(buf.as_ref(), b"LOCAL");
1450    }
1451
1452    #[tokio::test]
1453    #[cfg(windows)]
1454    async fn test_windows_paths() {
1455        use std::path::Component;
1456        use std::path::Prefix;
1457        use std::path::Prefix::*;
1458
1459        fn get_path_prefix(path: &StdPath) -> Prefix<'_> {
1460            match path.components().next().unwrap() {
1461                Component::Prefix(prefix_component) => prefix_component.kind(),
1462                _ => panic!(),
1463            }
1464        }
1465
1466        fn get_drive_letter(prefix: Prefix) -> String {
1467            match prefix {
1468                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1469                _ => panic!(),
1470            }
1471        }
1472
1473        let tmp_path = TempStdFile::default();
1474        let prefix = get_path_prefix(&tmp_path);
1475        let drive_letter = get_drive_letter(prefix);
1476
1477        write_to_file(
1478            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1479            "WINDOWS",
1480        )
1481        .unwrap();
1482
1483        for uri in &[
1484            format!("{drive_letter}:/test_folder/test.lance"),
1485            format!("{drive_letter}:\\test_folder\\test.lance"),
1486        ] {
1487            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1488            let contents = read_from_store(store.as_ref(), &base.clone().join("test_file"))
1489                .await
1490                .unwrap();
1491            assert_eq!(contents, "WINDOWS");
1492        }
1493    }
1494
1495    #[tokio::test]
1496    async fn test_cross_filesystem_copy() {
1497        // Create two temporary directories that simulate different filesystems
1498        let source_dir = TempStdDir::default();
1499        let dest_dir = TempStdDir::default();
1500
1501        // Create a test file in the source directory
1502        let source_file_name = "test_file.txt";
1503        let source_file = source_dir.join(source_file_name);
1504        std::fs::write(&source_file, b"test content").unwrap();
1505
1506        // Create ObjectStore for local filesystem
1507        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1508            .await
1509            .unwrap();
1510
1511        // Create paths relative to the ObjectStore base
1512        let from_path = base_path.clone().join(source_file_name);
1513
1514        // Use object_store::Path::parse for the destination
1515        let dest_file = dest_dir.join("copied_file.txt");
1516        let dest_str = dest_file.to_str().unwrap();
1517        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1518
1519        // Perform the copy operation
1520        store.copy(&from_path, &to_path).await.unwrap();
1521
1522        // Verify the file was copied correctly
1523        assert!(dest_file.exists());
1524        let copied_content = std::fs::read(&dest_file).unwrap();
1525        assert_eq!(copied_content, b"test content");
1526    }
1527
1528    #[tokio::test]
1529    async fn test_copy_creates_parent_directories() {
1530        let source_dir = TempStdDir::default();
1531        let dest_dir = TempStdDir::default();
1532
1533        // Create a test file in the source directory
1534        let source_file_name = "test_file.txt";
1535        let source_file = source_dir.join(source_file_name);
1536        std::fs::write(&source_file, b"test content").unwrap();
1537
1538        // Create ObjectStore for local filesystem
1539        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1540            .await
1541            .unwrap();
1542
1543        // Create paths
1544        let from_path = base_path.clone().join(source_file_name);
1545
1546        // Create destination with nested directories that don't exist yet
1547        let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1548        let dest_str = dest_file.to_str().unwrap();
1549        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1550
1551        // Perform the copy operation - should create parent directories
1552        store.copy(&from_path, &to_path).await.unwrap();
1553
1554        // Verify the file was copied correctly and directories were created
1555        assert!(dest_file.exists());
1556        assert!(dest_file.parent().unwrap().exists());
1557        let copied_content = std::fs::read(&dest_file).unwrap();
1558        assert_eq!(copied_content, b"test content");
1559    }
1560
1561    /// Inner store that forwards everything to `InMemory` except single-shot
1562    /// server-side copy (`copy_opts`), which always fails. This lets a test
1563    /// prove that `ObjectStore::copy` fell back to a streaming multipart copy
1564    /// for an oversized source rather than issuing a single `CopyObject`.
1565    #[derive(Debug)]
1566    struct CopyFailingStore {
1567        inner: InMemory,
1568    }
1569
1570    impl Display for CopyFailingStore {
1571        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1572            write!(f, "CopyFailingStore")
1573        }
1574    }
1575
1576    #[async_trait]
1577    impl OSObjectStore for CopyFailingStore {
1578        async fn put_opts(
1579            &self,
1580            location: &Path,
1581            bytes: PutPayload,
1582            opts: PutOptions,
1583        ) -> OSResult<PutResult> {
1584            self.inner.put_opts(location, bytes, opts).await
1585        }
1586        async fn put_multipart_opts(
1587            &self,
1588            location: &Path,
1589            opts: PutMultipartOptions,
1590        ) -> OSResult<Box<dyn MultipartUpload>> {
1591            self.inner.put_multipart_opts(location, opts).await
1592        }
1593        async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
1594            self.inner.get_opts(location, options).await
1595        }
1596        async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
1597            self.inner.get_ranges(location, ranges).await
1598        }
1599        fn delete_stream(
1600            &self,
1601            locations: BoxStream<'static, OSResult<Path>>,
1602        ) -> BoxStream<'static, OSResult<Path>> {
1603            self.inner.delete_stream(locations)
1604        }
1605        fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
1606            self.inner.list(prefix)
1607        }
1608        fn list_with_offset(
1609            &self,
1610            prefix: Option<&Path>,
1611            offset: &Path,
1612        ) -> BoxStream<'static, OSResult<ObjectMeta>> {
1613            self.inner.list_with_offset(prefix, offset)
1614        }
1615        async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
1616            self.inner.list_with_delimiter(prefix).await
1617        }
1618        async fn copy_opts(&self, _from: &Path, _to: &Path, _opts: CopyOptions) -> OSResult<()> {
1619            Err(object_store::Error::Generic {
1620                store: "CopyFailingStore",
1621                source: "single-shot copy disabled in test".into(),
1622            })
1623        }
1624    }
1625
1626    #[tokio::test]
1627    async fn test_copy_streams_objects_larger_than_threshold() {
1628        // memory:// is non-local but isn't an S3/GCS scheme, so copy() wouldn't
1629        // enable the fallback on its own. Drive copy_impl directly with
1630        // multipart_copy_fallback = true to exercise the streaming path. The
1631        // inner store rejects any single-shot copy, so a successful copy can only
1632        // have gone through the streaming branch.
1633        let mut store = ObjectStore::memory();
1634        store.inner = Arc::new(CopyFailingStore {
1635            inner: InMemory::new(),
1636        });
1637
1638        let from = Path::from("source.bin");
1639        let contents = b"streaming multipart copy payload well past the tiny threshold";
1640        store.put(&from, contents).await.unwrap();
1641
1642        // Source size (61 bytes) exceeds the threshold -> must stream via a
1643        // multipart write rather than a single-shot server-side copy.
1644        let streamed = Path::from("streamed.bin");
1645        store.copy_impl(&from, &streamed, true, 8).await.unwrap();
1646        let copied = store.read_one_all(&streamed).await.unwrap();
1647        assert_eq!(copied.as_ref(), contents.as_slice());
1648
1649        // Source size below the threshold -> single-shot copy, which the inner
1650        // store rejects, confirming that the streaming branch (not native copy)
1651        // is what made the first copy succeed.
1652        let native = Path::from("native.bin");
1653        assert!(
1654            store
1655                .copy_impl(&from, &native, true, u64::MAX)
1656                .await
1657                .is_err()
1658        );
1659    }
1660
1661    #[test]
1662    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1663    fn test_client_options_extracts_headers() {
1664        let opts = StorageOptions(HashMap::from([
1665            ("headers.x-custom-foo".to_string(), "bar".to_string()),
1666            ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1667            ("region".to_string(), "us-west-2".to_string()),
1668        ]));
1669        let client_options = opts.client_options().unwrap();
1670
1671        // Verify non-header keys are not consumed as headers by creating
1672        // another StorageOptions with no headers.* keys.
1673        let opts_no_headers = StorageOptions(HashMap::from([(
1674            "region".to_string(),
1675            "us-west-2".to_string(),
1676        )]));
1677        opts_no_headers.client_options().unwrap();
1678
1679        // Smoke test: the client_options with headers should be usable
1680        // in a builder (we can't inspect the headers directly, but building
1681        // should not fail).
1682        #[cfg(feature = "gcp")]
1683        {
1684            use object_store::gcp::GoogleCloudStorageBuilder;
1685            let _builder = GoogleCloudStorageBuilder::new()
1686                .with_client_options(client_options)
1687                .with_url("gs://test-bucket");
1688        }
1689    }
1690
1691    #[test]
1692    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1693    fn test_client_options_rejects_invalid_header_name() {
1694        let opts = StorageOptions(HashMap::from([(
1695            "headers.bad header".to_string(),
1696            "value".to_string(),
1697        )]));
1698        let err = opts.client_options().unwrap_err();
1699        assert!(err.to_string().contains("invalid header name"));
1700    }
1701
1702    #[test]
1703    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1704    fn test_client_options_rejects_invalid_header_value() {
1705        let opts = StorageOptions(HashMap::from([(
1706            "headers.x-good-name".to_string(),
1707            "bad\x01value".to_string(),
1708        )]));
1709        let err = opts.client_options().unwrap_err();
1710        assert!(err.to_string().contains("invalid header value"));
1711    }
1712
1713    #[test]
1714    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1715    fn test_client_options_empty_when_no_header_keys() {
1716        let opts = StorageOptions(HashMap::from([
1717            ("region".to_string(), "us-east-1".to_string()),
1718            ("access_key_id".to_string(), "AKID".to_string()),
1719        ]));
1720        opts.client_options().unwrap();
1721    }
1722}