Skip to main content

lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{FutureExt, Stream};
18use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::Error as ObjectStoreError;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35#[cfg(target_os = "linux")]
36use crate::uring::{UringCurrentThreadReader, UringReader};
37#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
38pub(crate) mod dynamic_credentials;
39#[cfg(any(feature = "oss", feature = "huggingface"))]
40pub(crate) mod dynamic_opendal;
41mod list_retry;
42pub mod providers;
43pub mod storage_options;
44#[cfg(test)]
45pub(crate) mod test_utils;
46pub mod throttle;
47mod tracing;
48use crate::object_reader::SmallReader;
49use crate::object_writer::{LocalWriter, WriteResult};
50use crate::traits::Writer;
51use crate::utils::tracking_store::{IOTracker, IoStats};
52use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
53use lance_core::{Error, Result};
54
55// Local disks tend to do fine with a few threads
56// Note: the number of threads here also impacts the number of files
57// we need to read in some situations.  So keeping this at 8 keeps the
58// RAM on our scanner down.
59pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
60// Cloud disks often need many many threads to saturate the network
61pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
62
63const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
64#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
65const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
66
67pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
68    std::env::var("LANCE_MAX_IOP_SIZE")
69        .map(|val| val.parse().unwrap())
70        .unwrap_or(16 * 1024 * 1024)
71});
72
73pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
74
75pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
76pub use storage_options::{
77    EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
78    StorageOptionsAccessor, StorageOptionsProvider,
79};
80
81#[async_trait]
82pub trait ObjectStoreExt {
83    /// Returns true if the file exists.
84    async fn exists(&self, path: &Path) -> Result<bool>;
85
86    /// Read all files (start from base directory) recursively
87    ///
88    /// unmodified_since can be specified to only return files that have not been modified since the given time.
89    fn read_dir_all<'a, 'b>(
90        &'a self,
91        dir_path: impl Into<&'b Path> + Send,
92        unmodified_since: Option<DateTime<Utc>>,
93    ) -> BoxStream<'a, Result<ObjectMeta>>;
94}
95
96#[async_trait]
97impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
98    fn read_dir_all<'a, 'b>(
99        &'a self,
100        dir_path: impl Into<&'b Path> + Send,
101        unmodified_since: Option<DateTime<Utc>>,
102    ) -> BoxStream<'a, Result<ObjectMeta>> {
103        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
104        if let Some(unmodified_since_val) = unmodified_since {
105            output
106                .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
107                .boxed()
108        } else {
109            output.boxed()
110        }
111    }
112
113    async fn exists(&self, path: &Path) -> Result<bool> {
114        match self.head(path).await {
115            Ok(_) => Ok(true),
116            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
117            Err(e) => Err(e.into()),
118        }
119    }
120}
121
122/// Wraps [ObjectStore](object_store::ObjectStore)
123#[derive(Debug, Clone)]
124pub struct ObjectStore {
125    // Inner object store
126    pub inner: Arc<dyn OSObjectStore>,
127    scheme: String,
128    block_size: usize,
129    max_iop_size: u64,
130    /// Whether to use constant size upload parts for multipart uploads. This
131    /// is only necessary for Cloudflare R2.
132    pub use_constant_size_upload_parts: bool,
133    /// Whether we can assume that the list of files is lexically ordered. This
134    /// is true for object stores, but not for local filesystems.
135    pub list_is_lexically_ordered: bool,
136    io_parallelism: usize,
137    /// Number of times to retry a failed download
138    download_retry_count: usize,
139    /// IO tracker for monitoring read/write operations
140    io_tracker: IOTracker,
141    /// The datastore prefix that uniquely identifies this object store. It encodes information
142    /// which usually cannot be found in the URL such as Azure account name. The prefix plus the
143    /// path uniquely identifies any object inside the store.
144    pub store_prefix: String,
145}
146
147impl DeepSizeOf for ObjectStore {
148    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
149        // We aren't counting `inner` here which is problematic but an ObjectStore
150        // shouldn't be too big.  The only exception might be the write cache but, if
151        // the writer cache has data, it means we're using it somewhere else that isn't
152        // a cache and so that doesn't really count.
153        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
154    }
155}
156
157impl std::fmt::Display for ObjectStore {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        write!(f, "ObjectStore({})", self.scheme)
160    }
161}
162
163pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
164    /// Wrap an object store with additional functionality
165    ///
166    /// The store_prefix is a string which uniquely identifies the object
167    /// store being wrapped.
168    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
169}
170
171#[derive(Debug, Clone)]
172pub struct ChainedWrappingObjectStore {
173    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
174}
175
176impl ChainedWrappingObjectStore {
177    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
178        Self { wrappers }
179    }
180
181    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
182        self.wrappers.push(wrapper);
183    }
184}
185
186impl WrappingObjectStore for ChainedWrappingObjectStore {
187    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
188        self.wrappers
189            .iter()
190            .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
191    }
192}
193
194/// Parameters to create an [ObjectStore]
195///
196#[derive(Debug, Clone)]
197pub struct ObjectStoreParams {
198    pub block_size: Option<usize>,
199    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
200    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
201    /// Refresh offset for AWS credentials when using the legacy AWS credentials path.
202    /// For StorageOptionsAccessor, use `refresh_offset_millis` storage option instead.
203    pub s3_credentials_refresh_offset: Duration,
204    #[cfg(feature = "aws")]
205    pub aws_credentials: Option<AwsCredentialProvider>,
206    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
207    /// Unified storage options accessor with caching and automatic refresh
208    ///
209    /// Provides storage options and optionally a dynamic provider for automatic
210    /// credential refresh. Use `StorageOptionsAccessor::with_static_options()` for static
211    /// options or `StorageOptionsAccessor::with_initial_and_provider()` for dynamic refresh.
212    pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
213    /// Use constant size upload parts for multipart uploads. Only necessary
214    /// for Cloudflare R2, which doesn't support variable size parts. When this
215    /// is false, max upload size is 2.5TB. When this is true, the max size is
216    /// 50GB.
217    pub use_constant_size_upload_parts: bool,
218    pub list_is_lexically_ordered: Option<bool>,
219}
220
221impl Default for ObjectStoreParams {
222    fn default() -> Self {
223        #[allow(deprecated)]
224        Self {
225            object_store: None,
226            block_size: None,
227            s3_credentials_refresh_offset: Duration::from_secs(60),
228            #[cfg(feature = "aws")]
229            aws_credentials: None,
230            object_store_wrapper: None,
231            storage_options_accessor: None,
232            use_constant_size_upload_parts: false,
233            list_is_lexically_ordered: None,
234        }
235    }
236}
237
238impl ObjectStoreParams {
239    /// Get the StorageOptionsAccessor from the params
240    pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
241        self.storage_options_accessor.clone()
242    }
243
244    /// Get storage options from the accessor, if any
245    ///
246    /// Returns the initial storage options from the accessor without triggering refresh.
247    pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
248        self.storage_options_accessor
249            .as_ref()
250            .and_then(|a| a.initial_storage_options())
251    }
252}
253
254// We implement hash for caching
255impl std::hash::Hash for ObjectStoreParams {
256    #[allow(deprecated)]
257    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
258        // For hashing, we use pointer values for ObjectStore, S3 credentials, wrapper
259        self.block_size.hash(state);
260        if let Some((store, url)) = &self.object_store {
261            Arc::as_ptr(store).hash(state);
262            url.hash(state);
263        }
264        self.s3_credentials_refresh_offset.hash(state);
265        #[cfg(feature = "aws")]
266        if let Some(aws_credentials) = &self.aws_credentials {
267            Arc::as_ptr(aws_credentials).hash(state);
268        }
269        if let Some(wrapper) = &self.object_store_wrapper {
270            Arc::as_ptr(wrapper).hash(state);
271        }
272        if let Some(accessor) = &self.storage_options_accessor {
273            accessor.accessor_id().hash(state);
274        }
275        self.use_constant_size_upload_parts.hash(state);
276        self.list_is_lexically_ordered.hash(state);
277    }
278}
279
280// We implement eq for caching
281impl Eq for ObjectStoreParams {}
282impl PartialEq for ObjectStoreParams {
283    #[allow(deprecated)]
284    fn eq(&self, other: &Self) -> bool {
285        #[cfg(feature = "aws")]
286        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
287            return false;
288        }
289
290        // For equality, we use pointer comparison for ObjectStore, S3 credentials, wrapper
291        // For accessor, we use accessor_id() for semantic equality
292        self.block_size == other.block_size
293            && self
294                .object_store
295                .as_ref()
296                .map(|(store, url)| (Arc::as_ptr(store), url))
297                == other
298                    .object_store
299                    .as_ref()
300                    .map(|(store, url)| (Arc::as_ptr(store), url))
301            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
302            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
303                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
304            && self
305                .storage_options_accessor
306                .as_ref()
307                .map(|a| a.accessor_id())
308                == other
309                    .storage_options_accessor
310                    .as_ref()
311                    .map(|a| a.accessor_id())
312            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
313            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
314    }
315}
316
317/// Convert a URI string or local path to a URL
318///
319/// This function handles both proper URIs (with schemes like `file://`, `s3://`, etc.)
320/// and plain local filesystem paths. On Windows, it correctly handles drive letters
321/// that might be parsed as URL schemes.
322///
323/// # Examples
324///
325/// ```
326/// # use lance_io::object_store::uri_to_url;
327/// // URIs are preserved
328/// let url = uri_to_url("s3://bucket/path").unwrap();
329/// assert_eq!(url.scheme(), "s3");
330///
331/// // Local paths are converted to file:// URIs
332/// # #[cfg(unix)]
333/// let url = uri_to_url("/tmp/data").unwrap();
334/// # #[cfg(unix)]
335/// assert_eq!(url.scheme(), "file");
336/// ```
337pub fn uri_to_url(uri: &str) -> Result<Url> {
338    match Url::parse(uri) {
339        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
340            // On Windows, the drive is parsed as a scheme
341            local_path_to_url(uri)
342        }
343        Ok(url) => Ok(url),
344        Err(_) => local_path_to_url(uri),
345    }
346}
347
348fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
349    let str_path = str_path.as_ref();
350    let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
351
352    let mut expanded_path = path_abs::PathAbs::new(expanded)
353        .unwrap()
354        .as_path()
355        .to_path_buf();
356    // path_abs::PathAbs::new(".") returns an empty string.
357    if let Some(s) = expanded_path.as_path().to_str()
358        && s.is_empty()
359    {
360        expanded_path = std::env::current_dir()?;
361    }
362
363    Ok(expanded_path)
364}
365
366fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
367    let home_dir = std::env::home_dir()?;
368    if path == "~" {
369        return Some(home_dir);
370    }
371    if let Some(stripped) = path.strip_prefix("~/") {
372        return Some(home_dir.join(stripped));
373    }
374    #[cfg(windows)]
375    if let Some(stripped) = path.strip_prefix("~\\") {
376        return Some(home_dir.join(stripped));
377    }
378
379    None
380}
381
382fn local_path_to_url(str_path: &str) -> Result<Url> {
383    let expanded_path = expand_path(str_path)?;
384
385    Url::from_directory_path(expanded_path).map_err(|_| {
386        Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
387    })
388}
389
390#[cfg(feature = "huggingface")]
391fn parse_hf_repo_id(url: &Url) -> Result<String> {
392    // Accept forms with repo type prefix (models/datasets/spaces) or legacy without.
393    let mut segments: Vec<String> = Vec::new();
394    if let Some(host) = url.host_str() {
395        segments.push(host.to_string());
396    }
397    segments.extend(
398        url.path()
399            .trim_start_matches('/')
400            .split('/')
401            .map(|s| s.to_string()),
402    );
403
404    if segments.len() < 2 {
405        return Err(Error::invalid_input(
406            "Huggingface URL must contain at least owner and repo",
407        ));
408    }
409
410    let repo_type_candidates = ["models", "datasets", "spaces"];
411    let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
412        if segments.len() < 3 {
413            return Err(Error::invalid_input(
414                "Huggingface URL missing owner/repo after repo type",
415            ));
416        }
417        (segments[1].as_str(), segments[2].as_str())
418    } else {
419        (segments[0].as_str(), segments[1].as_str())
420    };
421
422    let repo = repo_with_rev
423        .split_once('@')
424        .map(|(r, _)| r)
425        .unwrap_or(repo_with_rev);
426    Ok(format!("{owner}/{repo}"))
427}
428
429impl ObjectStore {
430    /// Parse from a string URI.
431    ///
432    /// Returns the ObjectStore instance and the absolute path to the object.
433    ///
434    /// This uses the default [ObjectStoreRegistry] to find the object store. To
435    /// allow for potential re-use of object store instances, it's recommended to
436    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
437    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
438        let registry = Arc::new(ObjectStoreRegistry::default());
439
440        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
441    }
442
443    /// Parse from a string URI.
444    ///
445    /// Returns the ObjectStore instance and the absolute path to the object.
446    pub async fn from_uri_and_params(
447        registry: Arc<ObjectStoreRegistry>,
448        uri: &str,
449        params: &ObjectStoreParams,
450    ) -> Result<(Arc<Self>, Path)> {
451        #[allow(deprecated)]
452        if let Some((store, path)) = params.object_store.as_ref() {
453            let mut inner = store.clone();
454            let store_prefix =
455                registry.calculate_object_store_prefix(uri, params.storage_options())?;
456            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
457                inner = wrapper.wrap(&store_prefix, inner);
458            }
459
460            // Always wrap with IO tracking
461            let io_tracker = IOTracker::default();
462            let tracked_store = io_tracker.wrap("", inner);
463
464            let store = Self {
465                inner: tracked_store,
466                scheme: path.scheme().to_string(),
467                block_size: params.block_size.unwrap_or(64 * 1024),
468                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
469                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
470                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
471                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
472                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
473                io_tracker,
474                store_prefix,
475            };
476            let path = Path::parse(path.path())?;
477            return Ok((Arc::new(store), path));
478        }
479        let url = uri_to_url(uri)?;
480
481        let store = registry.get_store(url.clone(), params).await?;
482        // We know the scheme is valid if we got a store back.
483        let provider = registry.get_provider(url.scheme()).expect_ok()?;
484        let path = provider.extract_path(&url)?;
485
486        Ok((store, path))
487    }
488
489    /// Extract the path component from a URI without initializing the object store.
490    ///
491    /// This is a synchronous operation that only parses the URI and extracts the path,
492    /// without creating or initializing any object store instance.
493    ///
494    /// # Arguments
495    ///
496    /// * `registry` - The object store registry to get the provider
497    /// * `uri` - The URI to extract the path from
498    ///
499    /// # Returns
500    ///
501    /// The extracted path component
502    pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
503        let url = uri_to_url(uri)?;
504        let provider = registry
505            .get_provider(url.scheme())
506            .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
507        provider.extract_path(&url)
508    }
509
510    #[deprecated(note = "Use `from_uri` instead")]
511    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
512        Self::from_uri_and_params(
513            Arc::new(ObjectStoreRegistry::default()),
514            str_path,
515            &Default::default(),
516        )
517        .now_or_never()
518        .unwrap()
519    }
520
521    /// Local object store.
522    pub fn local() -> Self {
523        let provider = FileStoreProvider;
524        provider
525            .new_store(Url::parse("file:///").unwrap(), &Default::default())
526            .now_or_never()
527            .unwrap()
528            .unwrap()
529    }
530
531    /// Create a in-memory object store directly for testing.
532    pub fn memory() -> Self {
533        let provider = MemoryStoreProvider;
534        provider
535            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
536            .now_or_never()
537            .unwrap()
538            .unwrap()
539    }
540
541    /// Returns true if the object store pointed to a local file system.
542    pub fn is_local(&self) -> bool {
543        self.scheme == "file" || self.scheme == "file+uring"
544    }
545
546    pub fn is_cloud(&self) -> bool {
547        !self.is_local() && self.scheme != "memory"
548    }
549
550    /// Whether this object store prefers the lite scheduler.
551    ///
552    /// The lite scheduler is designed for backends like io_uring where
553    /// tasks should only be polled when the consumer polls them.
554    pub fn prefers_lite_scheduler(&self) -> bool {
555        self.scheme == "file+uring"
556    }
557
558    pub fn scheme(&self) -> &str {
559        &self.scheme
560    }
561
562    pub fn block_size(&self) -> usize {
563        self.block_size
564    }
565
566    pub fn max_iop_size(&self) -> u64 {
567        self.max_iop_size
568    }
569
570    pub fn io_parallelism(&self) -> usize {
571        std::env::var("LANCE_IO_THREADS")
572            .map(|val| val.parse::<usize>().unwrap())
573            .unwrap_or(self.io_parallelism)
574    }
575
576    /// Get the IO tracker for this object store
577    ///
578    /// The IO tracker can be used to get statistics about read/write operations
579    /// performed on this object store.
580    pub fn io_tracker(&self) -> &IOTracker {
581        &self.io_tracker
582    }
583
584    /// Get a snapshot of current IO statistics without resetting counters
585    ///
586    /// Returns the current IO statistics without modifying the internal state.
587    /// Use this when you need to check stats without resetting them.
588    pub fn io_stats_snapshot(&self) -> IoStats {
589        self.io_tracker.stats()
590    }
591
592    /// Get incremental IO statistics since the last call to this method
593    ///
594    /// Returns the accumulated statistics since the last call and resets the
595    /// counters to zero. This is useful for tracking IO operations between
596    /// different stages of processing.
597    pub fn io_stats_incremental(&self) -> IoStats {
598        self.io_tracker.incremental_stats()
599    }
600
601    /// Open a file for path.
602    ///
603    /// Parameters
604    /// - ``path``: Absolute path to the file.
605    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
606        match self.scheme.as_str() {
607            "file" => {
608                LocalObjectReader::open_with_tracker(
609                    path,
610                    self.block_size,
611                    None,
612                    Arc::new(self.io_tracker.clone()),
613                )
614                .await
615            }
616            #[cfg(target_os = "linux")]
617            "file+uring" => {
618                // Check if current-thread mode enabled
619                let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
620                    .map(|v| str_is_truthy(&v))
621                    .unwrap_or(false);
622
623                if use_current_thread {
624                    UringCurrentThreadReader::open(
625                        path,
626                        self.block_size,
627                        None,
628                        Arc::new(self.io_tracker.clone()),
629                    )
630                    .await
631                } else {
632                    UringReader::open(
633                        path,
634                        self.block_size,
635                        None,
636                        Arc::new(self.io_tracker.clone()),
637                    )
638                    .await
639                }
640            }
641            _ => Ok(Box::new(CloudObjectReader::new(
642                self.inner.clone(),
643                path.clone(),
644                self.block_size,
645                None,
646                self.download_retry_count,
647            )?)),
648        }
649    }
650
651    /// Open a reader for a file with known size.
652    ///
653    /// This size may either have been retrieved from a list operation or
654    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
655    /// call.
656    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
657        // If we know the file is really small, we can read the whole thing
658        // as a single request.
659        if known_size <= self.block_size {
660            return Ok(Box::new(SmallReader::new(
661                self.inner.clone(),
662                path.clone(),
663                self.download_retry_count,
664                known_size,
665            )));
666        }
667
668        match self.scheme.as_str() {
669            "file" => {
670                LocalObjectReader::open_with_tracker(
671                    path,
672                    self.block_size,
673                    Some(known_size),
674                    Arc::new(self.io_tracker.clone()),
675                )
676                .await
677            }
678            #[cfg(target_os = "linux")]
679            "file+uring" => {
680                // Check if current-thread mode enabled
681                let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
682                    .map(|v| str_is_truthy(&v))
683                    .unwrap_or(false);
684
685                if use_current_thread {
686                    UringCurrentThreadReader::open(
687                        path,
688                        self.block_size,
689                        Some(known_size),
690                        Arc::new(self.io_tracker.clone()),
691                    )
692                    .await
693                } else {
694                    UringReader::open(
695                        path,
696                        self.block_size,
697                        Some(known_size),
698                        Arc::new(self.io_tracker.clone()),
699                    )
700                    .await
701                }
702            }
703            _ => Ok(Box::new(CloudObjectReader::new(
704                self.inner.clone(),
705                path.clone(),
706                self.block_size,
707                Some(known_size),
708                self.download_retry_count,
709            )?)),
710        }
711    }
712
713    /// Create an [ObjectWriter] from local [std::path::Path]
714    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
715        let object_store = Self::local();
716        let absolute_path = expand_path(path.to_string_lossy())?;
717        let os_path = Path::from_absolute_path(absolute_path)?;
718        ObjectWriter::new(&object_store, &os_path).await
719    }
720
721    /// Open an [Reader] from local [std::path::Path]
722    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
723        let object_store = Self::local();
724        let absolute_path = expand_path(path.to_string_lossy())?;
725        let os_path = Path::from_absolute_path(absolute_path)?;
726        object_store.open(&os_path).await
727    }
728
729    /// Create a new file.
730    pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
731        match self.scheme.as_str() {
732            "file" => {
733                let local_path = super::local::to_local_path(path);
734                let local_path = std::path::PathBuf::from(&local_path);
735                if let Some(parent) = local_path.parent() {
736                    tokio::fs::create_dir_all(parent).await?;
737                }
738                let parent = local_path
739                    .parent()
740                    .expect("file path must have parent")
741                    .to_owned();
742                let named_temp =
743                    tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
744                        .await
745                        .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
746                let (std_file, temp_path) = named_temp.into_parts();
747                let file = tokio::fs::File::from_std(std_file);
748                Ok(Box::new(LocalWriter::new(
749                    file,
750                    path.clone(),
751                    temp_path,
752                    Arc::new(self.io_tracker.clone()),
753                )))
754            }
755            _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
756        }
757    }
758
759    /// A helper function to create a file and write content to it.
760    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
761        let mut writer = self.create(path).await?;
762        writer.write_all(content).await?;
763        Writer::shutdown(writer.as_mut()).await
764    }
765
766    pub async fn delete(&self, path: &Path) -> Result<()> {
767        self.inner.delete(path).await?;
768        Ok(())
769    }
770
771    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
772        if self.is_local() {
773            // Use std::fs::copy for local filesystem to support cross-filesystem copies
774            return super::local::copy_file(from, to);
775        }
776        Ok(self.inner.copy(from, to).await?)
777    }
778
779    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
780    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
781        let path = dir_path.into();
782        let path = Path::parse(&path)?;
783        let output = self.inner.list_with_delimiter(Some(&path)).await?;
784        Ok(output
785            .common_prefixes
786            .iter()
787            .chain(output.objects.iter().map(|o| &o.location))
788            .map(|s| s.filename().unwrap().to_string())
789            .collect())
790    }
791
792    pub fn list(
793        &self,
794        path: Option<Path>,
795    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
796        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
797    }
798
799    /// Read all files (start from base directory) recursively
800    ///
801    /// unmodified_since can be specified to only return files that have not been modified since the given time.
802    pub fn read_dir_all<'a, 'b>(
803        &'a self,
804        dir_path: impl Into<&'b Path> + Send,
805        unmodified_since: Option<DateTime<Utc>>,
806    ) -> BoxStream<'a, Result<ObjectMeta>> {
807        self.inner.read_dir_all(dir_path, unmodified_since)
808    }
809
810    /// Remove a directory recursively.
811    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
812        let path = dir_path.into();
813        let path = Path::parse(&path)?;
814
815        if self.is_local() {
816            // The local file system provider needs to delete both files and directories.
817            return super::local::remove_dir_all(&path);
818        }
819        let sub_entries = self
820            .inner
821            .list(Some(&path))
822            .map(|m| m.map(|meta| meta.location))
823            .boxed();
824        self.inner
825            .delete_stream(sub_entries)
826            .try_collect::<Vec<_>>()
827            .await?;
828        if self.scheme == "file-object-store" {
829            // file-object-store tries to do everything as similarly as possible to the remote
830            // object stores. But we still have to delete the directory entries afterwards.
831            return super::local::remove_dir_all(&path);
832        }
833        Ok(())
834    }
835
836    pub fn remove_stream<'a>(
837        &'a self,
838        locations: BoxStream<'a, Result<Path>>,
839    ) -> BoxStream<'a, Result<Path>> {
840        self.inner
841            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
842            .err_into::<Error>()
843            .boxed()
844    }
845
846    /// Check a file exists.
847    pub async fn exists(&self, path: &Path) -> Result<bool> {
848        match self.inner.head(path).await {
849            Ok(_) => Ok(true),
850            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
851            Err(e) => Err(e.into()),
852        }
853    }
854
855    /// Get file size.
856    pub async fn size(&self, path: &Path) -> Result<u64> {
857        Ok(self.inner.head(path).await?.size)
858    }
859
860    /// Convenience function to open a reader and read all the bytes
861    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
862        let reader = self.open(path).await?;
863        Ok(reader.get_all().await?)
864    }
865
866    /// Convenience function open a reader and make a single request
867    ///
868    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
869    /// and then call [`Reader::get_range`] multiple times.
870    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
871        let reader = self.open(path).await?;
872        Ok(reader.get_range(range).await?)
873    }
874}
875
876/// Options that can be set for multiple object stores
877#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
878pub enum LanceConfigKey {
879    /// Number of times to retry a download that fails
880    DownloadRetryCount,
881}
882
883impl FromStr for LanceConfigKey {
884    type Err = Error;
885
886    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
887        match s.to_ascii_lowercase().as_str() {
888            "download_retry_count" => Ok(Self::DownloadRetryCount),
889            _ => Err(Error::invalid_input_source(
890                format!("Invalid LanceConfigKey: {}", s).into(),
891            )),
892        }
893    }
894}
895
896#[derive(Clone, Debug, Default)]
897pub struct StorageOptions(pub HashMap<String, String>);
898
899impl StorageOptions {
900    /// Create a new instance of [`StorageOptions`]
901    pub fn new(options: HashMap<String, String>) -> Self {
902        let mut options = options;
903        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
904            options.insert("allow_http".into(), value);
905        }
906        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
907            options.insert("allow_http".into(), value);
908        }
909        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
910            options.insert("allow_http".into(), value);
911        }
912        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
913            options.insert("client_max_retries".into(), value);
914        }
915        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
916            options.insert("client_retry_timeout".into(), value);
917        }
918        Self(options)
919    }
920
921    /// Denotes if unsecure connections via http are allowed
922    pub fn allow_http(&self) -> bool {
923        self.0.iter().any(|(key, value)| {
924            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
925        })
926    }
927
928    /// Number of times to retry a download that fails
929    pub fn download_retry_count(&self) -> usize {
930        self.0
931            .iter()
932            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
933            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
934            .unwrap_or(3)
935    }
936
937    /// Max retry times to set in RetryConfig for object store client
938    pub fn client_max_retries(&self) -> usize {
939        self.0
940            .iter()
941            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
942            .and_then(|(_, value)| value.parse::<usize>().ok())
943            .unwrap_or(3)
944    }
945
946    /// Seconds of timeout to set in RetryConfig for object store client
947    pub fn client_retry_timeout(&self) -> u64 {
948        self.0
949            .iter()
950            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
951            .and_then(|(_, value)| value.parse::<u64>().ok())
952            .unwrap_or(180)
953    }
954
955    pub fn get(&self, key: &str) -> Option<&String> {
956        self.0.get(key)
957    }
958
959    /// Build [`ClientOptions`] with default headers extracted from `headers.*` keys.
960    ///
961    /// Keys prefixed with `headers.` are parsed into HTTP headers. For example,
962    /// `headers.x-ms-version = 2023-11-03` results in a default header
963    /// `x-ms-version: 2023-11-03`.
964    ///
965    /// Returns an error if any `headers.*` key has an invalid header name or value.
966    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
967    pub fn client_options(&self) -> Result<ClientOptions> {
968        let mut headers = HeaderMap::new();
969        for (key, value) in &self.0 {
970            if let Some(header_name) = key.strip_prefix("headers.") {
971                let name = header_name
972                    .parse::<http::header::HeaderName>()
973                    .map_err(|e| {
974                        Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
975                    })?;
976                let val = HeaderValue::from_str(value).map_err(|e| {
977                    Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
978                })?;
979                headers.insert(name, val);
980            }
981        }
982        let mut client_options = ClientOptions::default();
983        if !headers.is_empty() {
984            client_options = client_options.with_default_headers(headers);
985        }
986        Ok(client_options)
987    }
988
989    /// Get the expiration time in milliseconds since epoch, if present
990    pub fn expires_at_millis(&self) -> Option<u64> {
991        self.0
992            .get(EXPIRES_AT_MILLIS_KEY)
993            .and_then(|s| s.parse::<u64>().ok())
994    }
995}
996
997impl From<HashMap<String, String>> for StorageOptions {
998    fn from(value: HashMap<String, String>) -> Self {
999        Self::new(value)
1000    }
1001}
1002
1003static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
1004    std::sync::LazyLock::new(ObjectStoreRegistry::default);
1005
1006impl ObjectStore {
1007    #[allow(clippy::too_many_arguments)]
1008    pub fn new(
1009        store: Arc<DynObjectStore>,
1010        location: Url,
1011        block_size: Option<usize>,
1012        wrapper: Option<Arc<dyn WrappingObjectStore>>,
1013        use_constant_size_upload_parts: bool,
1014        list_is_lexically_ordered: bool,
1015        io_parallelism: usize,
1016        download_retry_count: usize,
1017        storage_options: Option<&HashMap<String, String>>,
1018    ) -> Self {
1019        let scheme = location.scheme();
1020        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
1021        let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
1022            Some(provider) => provider
1023                .calculate_object_store_prefix(&location, storage_options)
1024                .unwrap(),
1025            None => {
1026                let store_prefix = format!("{}${}", location.scheme(), location.authority());
1027                log::warn!(
1028                    "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
1029                    store_prefix
1030                );
1031                store_prefix
1032            }
1033        };
1034        let store = match wrapper {
1035            Some(wrapper) => wrapper.wrap(&store_prefix, store),
1036            None => store,
1037        };
1038
1039        // Always wrap with IO tracking
1040        let io_tracker = IOTracker::default();
1041        let tracked_store = io_tracker.wrap("", store);
1042
1043        Self {
1044            inner: tracked_store,
1045            scheme: scheme.into(),
1046            block_size,
1047            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
1048            use_constant_size_upload_parts,
1049            list_is_lexically_ordered,
1050            io_parallelism,
1051            download_retry_count,
1052            io_tracker,
1053            store_prefix,
1054        }
1055    }
1056}
1057
1058fn infer_block_size(scheme: &str) -> usize {
1059    // Block size: On local file systems, we use 4KB block size. On cloud
1060    // object stores, we use 64KB block size. This is generally the largest
1061    // block size where we don't see a latency penalty.
1062    match scheme {
1063        "file" => 4 * 1024,
1064        _ => 64 * 1024,
1065    }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070    use super::*;
1071    use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1072    use object_store::memory::InMemory;
1073    use rstest::rstest;
1074    use std::env::set_current_dir;
1075    use std::fs::{create_dir_all, write};
1076    use std::path::Path as StdPath;
1077    use std::sync::atomic::{AtomicBool, Ordering};
1078
1079    /// Write test content to file.
1080    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1081        let path = expand_path(path_str).map_err(std::io::Error::other)?;
1082        std::fs::create_dir_all(path.parent().unwrap())?;
1083        write(path, contents)
1084    }
1085
1086    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1087        let test_file_store = store.open(path).await.unwrap();
1088        let size = test_file_store.size().await.unwrap();
1089        let bytes = test_file_store.get_range(0..size).await.unwrap();
1090        let contents = String::from_utf8(bytes.to_vec()).unwrap();
1091        Ok(contents)
1092    }
1093
1094    #[tokio::test]
1095    async fn test_absolute_paths() {
1096        let tmp_path = TempStrDir::default();
1097        write_to_file(
1098            &format!("{tmp_path}/bar/foo.lance/test_file"),
1099            "TEST_CONTENT",
1100        )
1101        .unwrap();
1102
1103        // test a few variations of the same path
1104        for uri in &[
1105            format!("{tmp_path}/bar/foo.lance"),
1106            format!("{tmp_path}/./bar/foo.lance"),
1107            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1108        ] {
1109            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1110            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1111                .await
1112                .unwrap();
1113            assert_eq!(contents, "TEST_CONTENT");
1114        }
1115    }
1116
1117    #[tokio::test]
1118    async fn test_cloud_paths() {
1119        let uri = "s3://bucket/foo.lance";
1120        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1121        assert_eq!(store.scheme, "s3");
1122        assert_eq!(path.to_string(), "foo.lance");
1123
1124        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1125            .await
1126            .unwrap();
1127        assert_eq!(store.scheme, "s3");
1128        assert_eq!(path.to_string(), "foo.lance");
1129
1130        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1131            .await
1132            .unwrap();
1133        assert_eq!(store.scheme, "gs");
1134        assert_eq!(path.to_string(), "foo.lance");
1135
1136        let (store, path) =
1137            ObjectStore::from_uri("abfss://filesystem@account.dfs.core.windows.net/foo.lance")
1138                .await
1139                .unwrap();
1140        assert_eq!(store.scheme, "abfss");
1141        assert_eq!(path.to_string(), "foo.lance");
1142    }
1143
1144    async fn test_block_size_used_test_helper(
1145        uri: &str,
1146        storage_options: Option<HashMap<String, String>>,
1147        default_expected_block_size: usize,
1148    ) {
1149        // Test the default
1150        let registry = Arc::new(ObjectStoreRegistry::default());
1151        let accessor = storage_options
1152            .clone()
1153            .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1154        let params = ObjectStoreParams {
1155            storage_options_accessor: accessor.clone(),
1156            ..ObjectStoreParams::default()
1157        };
1158        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1159            .await
1160            .unwrap();
1161        assert_eq!(store.block_size, default_expected_block_size);
1162
1163        // Ensure param is used
1164        let registry = Arc::new(ObjectStoreRegistry::default());
1165        let params = ObjectStoreParams {
1166            block_size: Some(1024),
1167            storage_options_accessor: accessor,
1168            ..ObjectStoreParams::default()
1169        };
1170        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1171            .await
1172            .unwrap();
1173        assert_eq!(store.block_size, 1024);
1174    }
1175
1176    #[rstest]
1177    #[case("s3://bucket/foo.lance", None)]
1178    #[case("gs://bucket/foo.lance", None)]
1179    #[case("az://account/bucket/foo.lance",
1180      Some(HashMap::from([
1181            (String::from("account_name"), String::from("account")),
1182            (String::from("container_name"), String::from("container"))
1183           ])))]
1184    #[case("abfss://filesystem@account.dfs.core.windows.net/foo.lance",
1185      Some(HashMap::from([
1186            (String::from("account_name"), String::from("account")),
1187            (String::from("container_name"), String::from("filesystem"))
1188           ])))]
1189    #[tokio::test]
1190    async fn test_block_size_used_cloud(
1191        #[case] uri: &str,
1192        #[case] storage_options: Option<HashMap<String, String>>,
1193    ) {
1194        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1195    }
1196
1197    #[rstest]
1198    #[case("file")]
1199    #[case("file-object-store")]
1200    #[case("memory:///bucket/foo.lance")]
1201    #[tokio::test]
1202    async fn test_block_size_used_file(#[case] prefix: &str) {
1203        let tmp_path = TempStrDir::default();
1204        let path = format!("{tmp_path}/bar/foo.lance/test_file");
1205        write_to_file(&path, "URL").unwrap();
1206        let uri = format!("{prefix}:///{path}");
1207        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1208    }
1209
1210    #[tokio::test]
1211    async fn test_relative_paths() {
1212        let tmp_path = TempStrDir::default();
1213        write_to_file(
1214            &format!("{tmp_path}/bar/foo.lance/test_file"),
1215            "RELATIVE_URL",
1216        )
1217        .unwrap();
1218
1219        set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1220        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1221
1222        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1223            .await
1224            .unwrap();
1225        assert_eq!(contents, "RELATIVE_URL");
1226    }
1227
1228    #[tokio::test]
1229    async fn test_tilde_expansion() {
1230        let uri = "~/foo.lance";
1231        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1232        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1233        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1234            .await
1235            .unwrap();
1236        assert_eq!(contents, "TILDE");
1237    }
1238
1239    #[tokio::test]
1240    async fn test_read_directory() {
1241        let path = TempStdDir::default();
1242        create_dir_all(path.join("foo").join("bar")).unwrap();
1243        create_dir_all(path.join("foo").join("zoo")).unwrap();
1244        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1245        write_to_file(
1246            path.join("foo").join("test_file").to_str().unwrap(),
1247            "read_dir",
1248        )
1249        .unwrap();
1250        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1251
1252        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1253        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1254    }
1255
1256    #[tokio::test]
1257    async fn test_delete_directory_local_store() {
1258        test_delete_directory("").await;
1259    }
1260
1261    #[tokio::test]
1262    async fn test_delete_directory_file_object_store() {
1263        test_delete_directory("file-object-store").await;
1264    }
1265
1266    async fn test_delete_directory(scheme: &str) {
1267        let path = TempStdDir::default();
1268        create_dir_all(path.join("foo").join("bar")).unwrap();
1269        create_dir_all(path.join("foo").join("zoo")).unwrap();
1270        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1271        write_to_file(
1272            path.join("foo")
1273                .join("bar")
1274                .join("test_file")
1275                .to_str()
1276                .unwrap(),
1277            "delete",
1278        )
1279        .unwrap();
1280        let file_url = Url::from_directory_path(&path).unwrap();
1281        let url = if scheme.is_empty() {
1282            file_url
1283        } else {
1284            let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1285            // Use the file:// URL's normalized path so this works on Windows too.
1286            url.set_path(file_url.path());
1287            url
1288        };
1289        let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1290        store.remove_dir_all(base.child("foo")).await.unwrap();
1291
1292        assert!(!path.join("foo").exists());
1293    }
1294
1295    #[derive(Debug)]
1296    struct TestWrapper {
1297        called: AtomicBool,
1298
1299        return_value: Arc<dyn OSObjectStore>,
1300    }
1301
1302    impl WrappingObjectStore for TestWrapper {
1303        fn wrap(
1304            &self,
1305            _store_prefix: &str,
1306            _original: Arc<dyn OSObjectStore>,
1307        ) -> Arc<dyn OSObjectStore> {
1308            self.called.store(true, Ordering::Relaxed);
1309
1310            // return a mocked value so we can check if the final store is the one we expect
1311            self.return_value.clone()
1312        }
1313    }
1314
1315    impl TestWrapper {
1316        fn called(&self) -> bool {
1317            self.called.load(Ordering::Relaxed)
1318        }
1319    }
1320
1321    #[tokio::test]
1322    async fn test_wrapping_object_store_option_is_used() {
1323        // Make a store for the inner store first
1324        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1325        let registry = Arc::new(ObjectStoreRegistry::default());
1326
1327        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1328
1329        let wrapper = Arc::new(TestWrapper {
1330            called: AtomicBool::new(false),
1331            return_value: mock_inner_store.clone(),
1332        });
1333
1334        let params = ObjectStoreParams {
1335            object_store_wrapper: Some(wrapper.clone()),
1336            ..ObjectStoreParams::default()
1337        };
1338
1339        // not called yet
1340        assert!(!wrapper.called());
1341
1342        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
1343            .await
1344            .unwrap();
1345
1346        // called after construction
1347        assert!(wrapper.called());
1348
1349        // hard to compare two trait pointers as the point to vtables
1350        // using the ref count as a proxy to make sure that the store is correctly kept
1351        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1352    }
1353
1354    #[tokio::test]
1355    async fn test_local_paths() {
1356        let file_path = TempStdFile::default();
1357        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1358        writer.write_all(b"LOCAL").await.unwrap();
1359        Writer::shutdown(&mut writer).await.unwrap();
1360
1361        let reader = ObjectStore::open_local(&file_path).await.unwrap();
1362        let buf = reader.get_range(0..5).await.unwrap();
1363        assert_eq!(buf.as_ref(), b"LOCAL");
1364    }
1365
1366    #[tokio::test]
1367    async fn test_read_one() {
1368        let file_path = TempStdFile::default();
1369        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1370        writer.write_all(b"LOCAL").await.unwrap();
1371        Writer::shutdown(&mut writer).await.unwrap();
1372
1373        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1374        let obj_store = ObjectStore::local();
1375        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1376        assert_eq!(buf.as_ref(), b"LOCAL");
1377
1378        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1379        assert_eq!(buf.as_ref(), b"LOCAL");
1380    }
1381
1382    #[tokio::test]
1383    #[cfg(windows)]
1384    async fn test_windows_paths() {
1385        use std::path::Component;
1386        use std::path::Prefix;
1387        use std::path::Prefix::*;
1388
1389        fn get_path_prefix(path: &StdPath) -> Prefix {
1390            match path.components().next().unwrap() {
1391                Component::Prefix(prefix_component) => prefix_component.kind(),
1392                _ => panic!(),
1393            }
1394        }
1395
1396        fn get_drive_letter(prefix: Prefix) -> String {
1397            match prefix {
1398                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1399                _ => panic!(),
1400            }
1401        }
1402
1403        let tmp_path = TempStdFile::default();
1404        let prefix = get_path_prefix(&tmp_path);
1405        let drive_letter = get_drive_letter(prefix);
1406
1407        write_to_file(
1408            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1409            "WINDOWS",
1410        )
1411        .unwrap();
1412
1413        for uri in &[
1414            format!("{drive_letter}:/test_folder/test.lance"),
1415            format!("{drive_letter}:\\test_folder\\test.lance"),
1416        ] {
1417            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1418            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1419                .await
1420                .unwrap();
1421            assert_eq!(contents, "WINDOWS");
1422        }
1423    }
1424
1425    #[tokio::test]
1426    async fn test_cross_filesystem_copy() {
1427        // Create two temporary directories that simulate different filesystems
1428        let source_dir = TempStdDir::default();
1429        let dest_dir = TempStdDir::default();
1430
1431        // Create a test file in the source directory
1432        let source_file_name = "test_file.txt";
1433        let source_file = source_dir.join(source_file_name);
1434        std::fs::write(&source_file, b"test content").unwrap();
1435
1436        // Create ObjectStore for local filesystem
1437        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1438            .await
1439            .unwrap();
1440
1441        // Create paths relative to the ObjectStore base
1442        let from_path = base_path.child(source_file_name);
1443
1444        // Use object_store::Path::parse for the destination
1445        let dest_file = dest_dir.join("copied_file.txt");
1446        let dest_str = dest_file.to_str().unwrap();
1447        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1448
1449        // Perform the copy operation
1450        store.copy(&from_path, &to_path).await.unwrap();
1451
1452        // Verify the file was copied correctly
1453        assert!(dest_file.exists());
1454        let copied_content = std::fs::read(&dest_file).unwrap();
1455        assert_eq!(copied_content, b"test content");
1456    }
1457
1458    #[tokio::test]
1459    async fn test_copy_creates_parent_directories() {
1460        let source_dir = TempStdDir::default();
1461        let dest_dir = TempStdDir::default();
1462
1463        // Create a test file in the source directory
1464        let source_file_name = "test_file.txt";
1465        let source_file = source_dir.join(source_file_name);
1466        std::fs::write(&source_file, b"test content").unwrap();
1467
1468        // Create ObjectStore for local filesystem
1469        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1470            .await
1471            .unwrap();
1472
1473        // Create paths
1474        let from_path = base_path.child(source_file_name);
1475
1476        // Create destination with nested directories that don't exist yet
1477        let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1478        let dest_str = dest_file.to_str().unwrap();
1479        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1480
1481        // Perform the copy operation - should create parent directories
1482        store.copy(&from_path, &to_path).await.unwrap();
1483
1484        // Verify the file was copied correctly and directories were created
1485        assert!(dest_file.exists());
1486        assert!(dest_file.parent().unwrap().exists());
1487        let copied_content = std::fs::read(&dest_file).unwrap();
1488        assert_eq!(copied_content, b"test content");
1489    }
1490
1491    #[test]
1492    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1493    fn test_client_options_extracts_headers() {
1494        let opts = StorageOptions(HashMap::from([
1495            ("headers.x-custom-foo".to_string(), "bar".to_string()),
1496            ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1497            ("region".to_string(), "us-west-2".to_string()),
1498        ]));
1499        let client_options = opts.client_options().unwrap();
1500
1501        // Verify non-header keys are not consumed as headers by creating
1502        // another StorageOptions with no headers.* keys.
1503        let opts_no_headers = StorageOptions(HashMap::from([(
1504            "region".to_string(),
1505            "us-west-2".to_string(),
1506        )]));
1507        opts_no_headers.client_options().unwrap();
1508
1509        // Smoke test: the client_options with headers should be usable
1510        // in a builder (we can't inspect the headers directly, but building
1511        // should not fail).
1512        #[cfg(feature = "gcp")]
1513        {
1514            use object_store::gcp::GoogleCloudStorageBuilder;
1515            let _builder = GoogleCloudStorageBuilder::new()
1516                .with_client_options(client_options)
1517                .with_url("gs://test-bucket");
1518        }
1519    }
1520
1521    #[test]
1522    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1523    fn test_client_options_rejects_invalid_header_name() {
1524        let opts = StorageOptions(HashMap::from([(
1525            "headers.bad header".to_string(),
1526            "value".to_string(),
1527        )]));
1528        let err = opts.client_options().unwrap_err();
1529        assert!(err.to_string().contains("invalid header name"));
1530    }
1531
1532    #[test]
1533    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1534    fn test_client_options_rejects_invalid_header_value() {
1535        let opts = StorageOptions(HashMap::from([(
1536            "headers.x-good-name".to_string(),
1537            "bad\x01value".to_string(),
1538        )]));
1539        let err = opts.client_options().unwrap_err();
1540        assert!(err.to_string().contains("invalid header value"));
1541    }
1542
1543    #[test]
1544    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1545    fn test_client_options_empty_when_no_header_keys() {
1546        let opts = StorageOptions(HashMap::from([
1547            ("region".to_string(), "us-east-1".to_string()),
1548            ("access_key_id".to_string(), "AKID".to_string()),
1549        ]));
1550        opts.client_options().unwrap();
1551    }
1552}