Skip to main content

lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{FutureExt, Stream};
18use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::ObjectStoreExt as OSObjectStoreExt;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35#[cfg(target_os = "linux")]
36use crate::uring::{UringCurrentThreadReader, UringReader};
37#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
38pub(crate) mod dynamic_credentials;
39#[cfg(any(feature = "oss", feature = "huggingface"))]
40pub(crate) mod dynamic_opendal;
41mod list_retry;
42pub mod providers;
43pub mod storage_options;
44#[cfg(test)]
45pub(crate) mod test_utils;
46pub mod throttle;
47mod tracing;
48use crate::object_reader::SmallReader;
49use crate::object_writer::{LocalWriter, WriteResult};
50use crate::traits::Writer;
51use crate::utils::tracking_store::{IOTracker, IoStats};
52use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
53use lance_core::{Error, Result};
54
55// Local disks tend to do fine with a few threads
56// Note: the number of threads here also impacts the number of files
57// we need to read in some situations.  So keeping this at 8 keeps the
58// RAM on our scanner down.
59pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
60// Cloud disks often need many many threads to saturate the network
61pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
62
63const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
64#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
65const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
66
67pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
68    std::env::var("LANCE_MAX_IOP_SIZE")
69        .map(|val| val.parse().unwrap())
70        .unwrap_or(16 * 1024 * 1024)
71});
72
73pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
74
75pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
76pub use storage_options::{
77    EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
78    StorageOptionsAccessor, StorageOptionsProvider,
79};
80
81#[async_trait]
82pub trait ObjectStoreExt {
83    /// Returns true if the file exists.
84    async fn exists(&self, path: &Path) -> Result<bool>;
85
86    /// Read all files (start from base directory) recursively
87    ///
88    /// unmodified_since can be specified to only return files that have not been modified since the given time.
89    fn read_dir_all<'a, 'b>(
90        &'a self,
91        dir_path: impl Into<&'b Path> + Send,
92        unmodified_since: Option<DateTime<Utc>>,
93    ) -> BoxStream<'a, Result<ObjectMeta>>;
94}
95
96#[async_trait]
97impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
98    fn read_dir_all<'a, 'b>(
99        &'a self,
100        dir_path: impl Into<&'b Path> + Send,
101        unmodified_since: Option<DateTime<Utc>>,
102    ) -> BoxStream<'a, Result<ObjectMeta>> {
103        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
104        if let Some(unmodified_since_val) = unmodified_since {
105            output
106                .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
107                .boxed()
108        } else {
109            output.boxed()
110        }
111    }
112
113    async fn exists(&self, path: &Path) -> Result<bool> {
114        match self.head(path).await {
115            Ok(_) => Ok(true),
116            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
117            Err(e) => Err(e.into()),
118        }
119    }
120}
121
122/// Wraps [ObjectStore](object_store::ObjectStore)
123#[derive(Debug, Clone)]
124pub struct ObjectStore {
125    // Inner object store
126    pub inner: Arc<dyn OSObjectStore>,
127    scheme: String,
128    block_size: usize,
129    max_iop_size: u64,
130    /// Whether to use constant size upload parts for multipart uploads. This
131    /// is only necessary for Cloudflare R2.
132    pub use_constant_size_upload_parts: bool,
133    /// Whether we can assume that the list of files is lexically ordered. This
134    /// is true for object stores, but not for local filesystems.
135    pub list_is_lexically_ordered: bool,
136    io_parallelism: usize,
137    /// Number of times to retry a failed download
138    download_retry_count: usize,
139    /// IO tracker for monitoring read/write operations
140    io_tracker: IOTracker,
141    /// The datastore prefix that uniquely identifies this object store. It encodes information
142    /// which usually cannot be found in the URL such as Azure account name. The prefix plus the
143    /// path uniquely identifies any object inside the store.
144    pub store_prefix: String,
145}
146
147impl DeepSizeOf for ObjectStore {
148    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
149        // We aren't counting `inner` here which is problematic but an ObjectStore
150        // shouldn't be too big.  The only exception might be the write cache but, if
151        // the writer cache has data, it means we're using it somewhere else that isn't
152        // a cache and so that doesn't really count.
153        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
154    }
155}
156
157impl std::fmt::Display for ObjectStore {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        write!(f, "ObjectStore({})", self.scheme)
160    }
161}
162
163pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
164    /// Wrap an object store with additional functionality
165    ///
166    /// The store_prefix is a string which uniquely identifies the object
167    /// store being wrapped.
168    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
169}
170
171#[derive(Debug, Clone)]
172pub struct ChainedWrappingObjectStore {
173    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
174}
175
176impl ChainedWrappingObjectStore {
177    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
178        Self { wrappers }
179    }
180
181    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
182        self.wrappers.push(wrapper);
183    }
184}
185
186impl WrappingObjectStore for ChainedWrappingObjectStore {
187    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
188        self.wrappers
189            .iter()
190            .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
191    }
192}
193
194/// Parameters to create an [ObjectStore]
195///
196#[derive(Debug, Clone)]
197pub struct ObjectStoreParams {
198    pub block_size: Option<usize>,
199    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
200    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
201    /// Refresh offset for AWS credentials when using the legacy AWS credentials path.
202    /// For StorageOptionsAccessor, use `refresh_offset_millis` storage option instead.
203    pub s3_credentials_refresh_offset: Duration,
204    #[cfg(feature = "aws")]
205    pub aws_credentials: Option<AwsCredentialProvider>,
206    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
207    /// Unified storage options accessor with caching and automatic refresh
208    ///
209    /// Provides storage options and optionally a dynamic provider for automatic
210    /// credential refresh. Use `StorageOptionsAccessor::with_static_options()` for static
211    /// options or `StorageOptionsAccessor::with_initial_and_provider()` for dynamic refresh.
212    pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
213    /// Use constant size upload parts for multipart uploads. Only necessary
214    /// for Cloudflare R2, which doesn't support variable size parts. When this
215    /// is false, max upload size is 2.5TB. When this is true, the max size is
216    /// 50GB.
217    pub use_constant_size_upload_parts: bool,
218    pub list_is_lexically_ordered: Option<bool>,
219}
220
221impl Default for ObjectStoreParams {
222    fn default() -> Self {
223        #[allow(deprecated)]
224        Self {
225            object_store: None,
226            block_size: None,
227            s3_credentials_refresh_offset: Duration::from_secs(60),
228            #[cfg(feature = "aws")]
229            aws_credentials: None,
230            object_store_wrapper: None,
231            storage_options_accessor: None,
232            use_constant_size_upload_parts: false,
233            list_is_lexically_ordered: None,
234        }
235    }
236}
237
238impl ObjectStoreParams {
239    /// Get the StorageOptionsAccessor from the params
240    pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
241        self.storage_options_accessor.clone()
242    }
243
244    /// Get storage options from the accessor, if any
245    ///
246    /// Returns the initial storage options from the accessor without triggering refresh.
247    pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
248        self.storage_options_accessor
249            .as_ref()
250            .and_then(|a| a.initial_storage_options())
251    }
252}
253
254// We implement hash for caching
255impl std::hash::Hash for ObjectStoreParams {
256    #[allow(deprecated)]
257    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
258        // For hashing, we use pointer values for ObjectStore, S3 credentials, wrapper
259        self.block_size.hash(state);
260        if let Some((store, url)) = &self.object_store {
261            Arc::as_ptr(store).hash(state);
262            url.hash(state);
263        }
264        self.s3_credentials_refresh_offset.hash(state);
265        #[cfg(feature = "aws")]
266        if let Some(aws_credentials) = &self.aws_credentials {
267            Arc::as_ptr(aws_credentials).hash(state);
268        }
269        if let Some(wrapper) = &self.object_store_wrapper {
270            Arc::as_ptr(wrapper).hash(state);
271        }
272        if let Some(accessor) = &self.storage_options_accessor {
273            accessor.accessor_id().hash(state);
274        }
275        self.use_constant_size_upload_parts.hash(state);
276        self.list_is_lexically_ordered.hash(state);
277    }
278}
279
280// We implement eq for caching
281impl Eq for ObjectStoreParams {}
282impl PartialEq for ObjectStoreParams {
283    #[allow(deprecated)]
284    fn eq(&self, other: &Self) -> bool {
285        #[cfg(feature = "aws")]
286        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
287            return false;
288        }
289
290        // For equality, we use pointer comparison for ObjectStore, S3 credentials, wrapper
291        // For accessor, we use accessor_id() for semantic equality
292        self.block_size == other.block_size
293            && self
294                .object_store
295                .as_ref()
296                .map(|(store, url)| (Arc::as_ptr(store), url))
297                == other
298                    .object_store
299                    .as_ref()
300                    .map(|(store, url)| (Arc::as_ptr(store), url))
301            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
302            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
303                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
304            && self
305                .storage_options_accessor
306                .as_ref()
307                .map(|a| a.accessor_id())
308                == other
309                    .storage_options_accessor
310                    .as_ref()
311                    .map(|a| a.accessor_id())
312            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
313            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
314    }
315}
316
317/// Convert a URI string or local path to a URL
318///
319/// This function handles both proper URIs (with schemes like `file://`, `s3://`, etc.)
320/// and plain local filesystem paths. On Windows, it correctly handles drive letters
321/// that might be parsed as URL schemes.
322///
323/// # Examples
324///
325/// ```
326/// # use lance_io::object_store::uri_to_url;
327/// // URIs are preserved
328/// let url = uri_to_url("s3://bucket/path").unwrap();
329/// assert_eq!(url.scheme(), "s3");
330///
331/// // Local paths are converted to file:// URIs
332/// # #[cfg(unix)]
333/// let url = uri_to_url("/tmp/data").unwrap();
334/// # #[cfg(unix)]
335/// assert_eq!(url.scheme(), "file");
336/// ```
337pub fn uri_to_url(uri: &str) -> Result<Url> {
338    match Url::parse(uri) {
339        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
340            // On Windows, the drive is parsed as a scheme
341            local_path_to_url(uri)
342        }
343        Ok(url) => Ok(url),
344        Err(_) => local_path_to_url(uri),
345    }
346}
347
348fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
349    let str_path = str_path.as_ref();
350    let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
351
352    let mut expanded_path = path_abs::PathAbs::new(expanded)
353        .unwrap()
354        .as_path()
355        .to_path_buf();
356    // path_abs::PathAbs::new(".") returns an empty string.
357    if let Some(s) = expanded_path.as_path().to_str()
358        && s.is_empty()
359    {
360        expanded_path = std::env::current_dir()?;
361    }
362
363    Ok(expanded_path)
364}
365
366fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
367    let home_dir = std::env::home_dir()?;
368    if path == "~" {
369        return Some(home_dir);
370    }
371    if let Some(stripped) = path.strip_prefix("~/") {
372        return Some(home_dir.join(stripped));
373    }
374    #[cfg(windows)]
375    if let Some(stripped) = path.strip_prefix("~\\") {
376        return Some(home_dir.join(stripped));
377    }
378
379    None
380}
381
382fn local_path_to_url(str_path: &str) -> Result<Url> {
383    let expanded_path = expand_path(str_path)?;
384
385    Url::from_directory_path(expanded_path).map_err(|_| {
386        Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
387    })
388}
389
390#[cfg(feature = "huggingface")]
391fn parse_hf_repo_id(url: &Url) -> Result<String> {
392    // Accept forms with repo type prefix (models/datasets/spaces) or legacy without.
393    let mut segments: Vec<String> = Vec::new();
394    if let Some(host) = url.host_str() {
395        segments.push(host.to_string());
396    }
397    segments.extend(
398        url.path()
399            .trim_start_matches('/')
400            .split('/')
401            .map(|s| s.to_string()),
402    );
403
404    if segments.len() < 2 {
405        return Err(Error::invalid_input(
406            "Huggingface URL must contain at least owner and repo",
407        ));
408    }
409
410    let repo_type_candidates = ["models", "datasets", "spaces"];
411    let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
412        if segments.len() < 3 {
413            return Err(Error::invalid_input(
414                "Huggingface URL missing owner/repo after repo type",
415            ));
416        }
417        (segments[1].as_str(), segments[2].as_str())
418    } else {
419        (segments[0].as_str(), segments[1].as_str())
420    };
421
422    let repo = repo_with_rev
423        .split_once('@')
424        .map(|(r, _)| r)
425        .unwrap_or(repo_with_rev);
426    Ok(format!("{owner}/{repo}"))
427}
428
429impl ObjectStore {
430    /// Parse from a string URI.
431    ///
432    /// Returns the ObjectStore instance and the absolute path to the object.
433    ///
434    /// This uses the default [ObjectStoreRegistry] to find the object store. To
435    /// allow for potential re-use of object store instances, it's recommended to
436    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
437    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
438        let registry = Arc::new(ObjectStoreRegistry::default());
439
440        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
441    }
442
443    /// Parse from a string URI.
444    ///
445    /// Returns the ObjectStore instance and the absolute path to the object.
446    pub async fn from_uri_and_params(
447        registry: Arc<ObjectStoreRegistry>,
448        uri: &str,
449        params: &ObjectStoreParams,
450    ) -> Result<(Arc<Self>, Path)> {
451        #[allow(deprecated)]
452        if let Some((store, path)) = params.object_store.as_ref() {
453            let mut inner = store.clone();
454            let store_prefix =
455                registry.calculate_object_store_prefix(uri, params.storage_options())?;
456            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
457                inner = wrapper.wrap(&store_prefix, inner);
458            }
459
460            // Always wrap with IO tracking
461            let io_tracker = IOTracker::default();
462            let tracked_store = io_tracker.wrap("", inner);
463
464            let store = Self {
465                inner: tracked_store,
466                scheme: path.scheme().to_string(),
467                block_size: params.block_size.unwrap_or(64 * 1024),
468                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
469                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
470                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
471                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
472                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
473                io_tracker,
474                store_prefix,
475            };
476            let path = Path::parse(path.path())?;
477            return Ok((Arc::new(store), path));
478        }
479        let url = uri_to_url(uri)?;
480
481        let store = registry.get_store(url.clone(), params).await?;
482        // We know the scheme is valid if we got a store back.
483        let provider = registry.get_provider(url.scheme()).expect_ok()?;
484        let path = provider.extract_path(&url)?;
485
486        Ok((store, path))
487    }
488
489    /// Extract the path component from a URI without initializing the object store.
490    ///
491    /// This is a synchronous operation that only parses the URI and extracts the path,
492    /// without creating or initializing any object store instance.
493    ///
494    /// # Arguments
495    ///
496    /// * `registry` - The object store registry to get the provider
497    /// * `uri` - The URI to extract the path from
498    ///
499    /// # Returns
500    ///
501    /// The extracted path component
502    pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
503        let url = uri_to_url(uri)?;
504        let provider = registry
505            .get_provider(url.scheme())
506            .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
507        provider.extract_path(&url)
508    }
509
510    #[deprecated(note = "Use `from_uri` instead")]
511    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
512        Self::from_uri_and_params(
513            Arc::new(ObjectStoreRegistry::default()),
514            str_path,
515            &Default::default(),
516        )
517        .now_or_never()
518        .unwrap()
519    }
520
521    /// Local object store.
522    pub fn local() -> Self {
523        let provider = FileStoreProvider;
524        provider
525            .new_store(Url::parse("file:///").unwrap(), &Default::default())
526            .now_or_never()
527            .unwrap()
528            .unwrap()
529    }
530
531    /// Create a in-memory object store directly for testing.
532    pub fn memory() -> Self {
533        let provider = MemoryStoreProvider;
534        provider
535            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
536            .now_or_never()
537            .unwrap()
538            .unwrap()
539    }
540
541    /// Returns true if the object store pointed to a local file system.
542    pub fn is_local(&self) -> bool {
543        self.scheme == "file" || self.scheme == "file+uring"
544    }
545
546    pub fn is_cloud(&self) -> bool {
547        if self.is_local() || self.scheme == "memory" || self.scheme == "shared-memory" {
548            return false;
549        }
550        true
551    }
552
553    /// Whether this object store prefers the lite scheduler.
554    ///
555    /// The lite scheduler is designed for backends like io_uring where
556    /// tasks should only be polled when the consumer polls them.
557    pub fn prefers_lite_scheduler(&self) -> bool {
558        self.scheme == "file+uring"
559    }
560
561    pub fn scheme(&self) -> &str {
562        &self.scheme
563    }
564
565    pub fn block_size(&self) -> usize {
566        self.block_size
567    }
568
569    pub fn max_iop_size(&self) -> u64 {
570        self.max_iop_size
571    }
572
573    pub fn io_parallelism(&self) -> usize {
574        std::env::var("LANCE_IO_THREADS")
575            .map(|val| val.parse::<usize>().unwrap())
576            .unwrap_or(self.io_parallelism)
577    }
578
579    /// Get the IO tracker for this object store
580    ///
581    /// The IO tracker can be used to get statistics about read/write operations
582    /// performed on this object store.
583    pub fn io_tracker(&self) -> &IOTracker {
584        &self.io_tracker
585    }
586
587    /// Get a snapshot of current IO statistics without resetting counters
588    ///
589    /// Returns the current IO statistics without modifying the internal state.
590    /// Use this when you need to check stats without resetting them.
591    pub fn io_stats_snapshot(&self) -> IoStats {
592        self.io_tracker.stats()
593    }
594
595    /// Get incremental IO statistics since the last call to this method
596    ///
597    /// Returns the accumulated statistics since the last call and resets the
598    /// counters to zero. This is useful for tracking IO operations between
599    /// different stages of processing.
600    pub fn io_stats_incremental(&self) -> IoStats {
601        self.io_tracker.incremental_stats()
602    }
603
604    /// Open a file for path.
605    ///
606    /// Parameters
607    /// - ``path``: Absolute path to the file.
608    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
609        match self.scheme.as_str() {
610            "file" => {
611                LocalObjectReader::open_with_tracker(
612                    path,
613                    self.block_size,
614                    None,
615                    Arc::new(self.io_tracker.clone()),
616                )
617                .await
618            }
619            #[cfg(target_os = "linux")]
620            "file+uring" => {
621                // Check if current-thread mode enabled
622                let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
623                    .map(|v| str_is_truthy(&v))
624                    .unwrap_or(false);
625
626                if use_current_thread {
627                    UringCurrentThreadReader::open(
628                        path,
629                        self.block_size,
630                        None,
631                        Arc::new(self.io_tracker.clone()),
632                    )
633                    .await
634                } else {
635                    UringReader::open(
636                        path,
637                        self.block_size,
638                        None,
639                        Arc::new(self.io_tracker.clone()),
640                    )
641                    .await
642                }
643            }
644            _ => Ok(Box::new(CloudObjectReader::new(
645                self.inner.clone(),
646                path.clone(),
647                self.block_size,
648                None,
649                self.download_retry_count,
650            )?)),
651        }
652    }
653
654    /// Open a reader for a file with known size.
655    ///
656    /// This size may either have been retrieved from a list operation or
657    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
658    /// call.
659    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
660        // If we know the file is really small, we can read the whole thing
661        // as a single request.
662        if known_size <= self.block_size {
663            return Ok(Box::new(SmallReader::new(
664                self.inner.clone(),
665                path.clone(),
666                self.download_retry_count,
667                known_size,
668            )));
669        }
670
671        match self.scheme.as_str() {
672            "file" => {
673                LocalObjectReader::open_with_tracker(
674                    path,
675                    self.block_size,
676                    Some(known_size),
677                    Arc::new(self.io_tracker.clone()),
678                )
679                .await
680            }
681            #[cfg(target_os = "linux")]
682            "file+uring" => {
683                // Check if current-thread mode enabled
684                let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
685                    .map(|v| str_is_truthy(&v))
686                    .unwrap_or(false);
687
688                if use_current_thread {
689                    UringCurrentThreadReader::open(
690                        path,
691                        self.block_size,
692                        Some(known_size),
693                        Arc::new(self.io_tracker.clone()),
694                    )
695                    .await
696                } else {
697                    UringReader::open(
698                        path,
699                        self.block_size,
700                        Some(known_size),
701                        Arc::new(self.io_tracker.clone()),
702                    )
703                    .await
704                }
705            }
706            _ => Ok(Box::new(CloudObjectReader::new(
707                self.inner.clone(),
708                path.clone(),
709                self.block_size,
710                Some(known_size),
711                self.download_retry_count,
712            )?)),
713        }
714    }
715
716    /// Create an [ObjectWriter] from local [std::path::Path]
717    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
718        let object_store = Self::local();
719        let absolute_path = expand_path(path.to_string_lossy())?;
720        let os_path = Path::from_absolute_path(absolute_path)?;
721        ObjectWriter::new(&object_store, &os_path).await
722    }
723
724    /// Open an [Reader] from local [std::path::Path]
725    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
726        let object_store = Self::local();
727        let absolute_path = expand_path(path.to_string_lossy())?;
728        let os_path = Path::from_absolute_path(absolute_path)?;
729        object_store.open(&os_path).await
730    }
731
732    /// Create a new file.
733    pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
734        match self.scheme.as_str() {
735            "file" => {
736                let local_path = super::local::to_local_path(path);
737                let local_path = std::path::PathBuf::from(&local_path);
738                if let Some(parent) = local_path.parent() {
739                    tokio::fs::create_dir_all(parent).await?;
740                }
741                let parent = local_path
742                    .parent()
743                    .expect("file path must have parent")
744                    .to_owned();
745                let named_temp =
746                    tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
747                        .await
748                        .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
749                let (std_file, temp_path) = named_temp.into_parts();
750                let file = tokio::fs::File::from_std(std_file);
751                Ok(Box::new(LocalWriter::new(
752                    file,
753                    path.clone(),
754                    temp_path,
755                    Arc::new(self.io_tracker.clone()),
756                )))
757            }
758            _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
759        }
760    }
761
762    /// A helper function to create a file and write content to it.
763    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
764        let mut writer = self.create(path).await?;
765        writer.write_all(content).await?;
766        Writer::shutdown(writer.as_mut()).await
767    }
768
769    pub async fn delete(&self, path: &Path) -> Result<()> {
770        self.inner.delete(path).await?;
771        Ok(())
772    }
773
774    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
775        if self.is_local() {
776            // Use std::fs::copy for local filesystem to support cross-filesystem copies
777            return super::local::copy_file(from, to);
778        }
779        Ok(self.inner.copy(from, to).await?)
780    }
781
782    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
783    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
784        let path = dir_path.into();
785        let path = Path::parse(&path)?;
786        let output = self.inner.list_with_delimiter(Some(&path)).await?;
787        Ok(output
788            .common_prefixes
789            .iter()
790            .chain(output.objects.iter().map(|o| &o.location))
791            .map(|s| s.filename().unwrap().to_string())
792            .collect())
793    }
794
795    pub fn list(
796        &self,
797        path: Option<Path>,
798    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
799        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
800    }
801
802    /// Read all files (start from base directory) recursively
803    ///
804    /// unmodified_since can be specified to only return files that have not been modified since the given time.
805    pub fn read_dir_all<'a, 'b>(
806        &'a self,
807        dir_path: impl Into<&'b Path> + Send,
808        unmodified_since: Option<DateTime<Utc>>,
809    ) -> BoxStream<'a, Result<ObjectMeta>> {
810        self.inner.read_dir_all(dir_path, unmodified_since)
811    }
812
813    /// Remove a directory recursively.
814    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
815        let path = dir_path.into();
816        let path = Path::parse(&path)?;
817
818        if self.is_local() {
819            // The local file system provider needs to delete both files and directories.
820            return super::local::remove_dir_all(&path);
821        }
822        let sub_entries = self
823            .inner
824            .list(Some(&path))
825            .map(|m| m.map(|meta| meta.location))
826            .boxed();
827        self.inner
828            .delete_stream(sub_entries)
829            .try_collect::<Vec<_>>()
830            .await?;
831        if self.scheme == "file-object-store" {
832            // file-object-store tries to do everything as similarly as possible to the remote
833            // object stores. But we still have to delete the directory entries afterwards.
834            return super::local::remove_dir_all(&path);
835        }
836        Ok(())
837    }
838
839    pub fn remove_stream<'a>(
840        &'a self,
841        locations: BoxStream<'a, Result<Path>>,
842    ) -> BoxStream<'a, Result<Path>> {
843        let store = Arc::clone(&self.inner);
844        locations
845            .and_then(move |location| {
846                let store = Arc::clone(&store);
847                async move {
848                    store.delete(&location).await?;
849                    Ok(location)
850                }
851            })
852            .boxed()
853    }
854
855    /// Check a file exists.
856    pub async fn exists(&self, path: &Path) -> Result<bool> {
857        match self.inner.head(path).await {
858            Ok(_) => Ok(true),
859            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
860            Err(e) => Err(e.into()),
861        }
862    }
863
864    /// Get file size.
865    pub async fn size(&self, path: &Path) -> Result<u64> {
866        Ok(self.inner.head(path).await?.size)
867    }
868
869    /// Convenience function to open a reader and read all the bytes
870    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
871        let reader = self.open(path).await?;
872        Ok(reader.get_all().await?)
873    }
874
875    /// Convenience function open a reader and make a single request
876    ///
877    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
878    /// and then call [`Reader::get_range`] multiple times.
879    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
880        let reader = self.open(path).await?;
881        Ok(reader.get_range(range).await?)
882    }
883}
884
885/// Options that can be set for multiple object stores
886#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
887pub enum LanceConfigKey {
888    /// Number of times to retry a download that fails
889    DownloadRetryCount,
890}
891
892impl FromStr for LanceConfigKey {
893    type Err = Error;
894
895    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
896        match s.to_ascii_lowercase().as_str() {
897            "download_retry_count" => Ok(Self::DownloadRetryCount),
898            _ => Err(Error::invalid_input_source(
899                format!("Invalid LanceConfigKey: {}", s).into(),
900            )),
901        }
902    }
903}
904
905#[derive(Clone, Debug, Default)]
906pub struct StorageOptions(pub HashMap<String, String>);
907
908impl StorageOptions {
909    /// Create a new instance of [`StorageOptions`]
910    pub fn new(options: HashMap<String, String>) -> Self {
911        let mut options = options;
912        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
913            options.insert("allow_http".into(), value);
914        }
915        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
916            options.insert("allow_http".into(), value);
917        }
918        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
919            options.insert("allow_http".into(), value);
920        }
921        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
922            options.insert("client_max_retries".into(), value);
923        }
924        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
925            options.insert("client_retry_timeout".into(), value);
926        }
927        Self(options)
928    }
929
930    /// Denotes if unsecure connections via http are allowed
931    pub fn allow_http(&self) -> bool {
932        self.0.iter().any(|(key, value)| {
933            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
934        })
935    }
936
937    /// Number of times to retry a download that fails
938    pub fn download_retry_count(&self) -> usize {
939        self.0
940            .iter()
941            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
942            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
943            .unwrap_or(3)
944    }
945
946    /// Max retry times to set in RetryConfig for object store client
947    pub fn client_max_retries(&self) -> usize {
948        self.0
949            .iter()
950            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
951            .and_then(|(_, value)| value.parse::<usize>().ok())
952            .unwrap_or(3)
953    }
954
955    /// Seconds of timeout to set in RetryConfig for object store client
956    pub fn client_retry_timeout(&self) -> u64 {
957        self.0
958            .iter()
959            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
960            .and_then(|(_, value)| value.parse::<u64>().ok())
961            .unwrap_or(180)
962    }
963
964    pub fn get(&self, key: &str) -> Option<&String> {
965        self.0.get(key)
966    }
967
968    /// Build [`ClientOptions`] with default headers extracted from `headers.*` keys.
969    ///
970    /// Keys prefixed with `headers.` are parsed into HTTP headers. For example,
971    /// `headers.x-ms-version = 2023-11-03` results in a default header
972    /// `x-ms-version: 2023-11-03`.
973    ///
974    /// Returns an error if any `headers.*` key has an invalid header name or value.
975    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
976    pub fn client_options(&self) -> Result<ClientOptions> {
977        let mut headers = HeaderMap::new();
978        for (key, value) in &self.0 {
979            if let Some(header_name) = key.strip_prefix("headers.") {
980                let name = header_name
981                    .parse::<http::header::HeaderName>()
982                    .map_err(|e| {
983                        Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
984                    })?;
985                let val = HeaderValue::from_str(value).map_err(|e| {
986                    Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
987                })?;
988                headers.insert(name, val);
989            }
990        }
991        let mut client_options = ClientOptions::default();
992        if !headers.is_empty() {
993            client_options = client_options.with_default_headers(headers);
994        }
995        Ok(client_options)
996    }
997
998    /// Get the expiration time in milliseconds since epoch, if present
999    pub fn expires_at_millis(&self) -> Option<u64> {
1000        self.0
1001            .get(EXPIRES_AT_MILLIS_KEY)
1002            .and_then(|s| s.parse::<u64>().ok())
1003    }
1004}
1005
1006impl From<HashMap<String, String>> for StorageOptions {
1007    fn from(value: HashMap<String, String>) -> Self {
1008        Self::new(value)
1009    }
1010}
1011
1012static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
1013    std::sync::LazyLock::new(ObjectStoreRegistry::default);
1014
1015impl ObjectStore {
1016    #[allow(clippy::too_many_arguments)]
1017    pub fn new(
1018        store: Arc<DynObjectStore>,
1019        location: Url,
1020        block_size: Option<usize>,
1021        wrapper: Option<Arc<dyn WrappingObjectStore>>,
1022        use_constant_size_upload_parts: bool,
1023        list_is_lexically_ordered: bool,
1024        io_parallelism: usize,
1025        download_retry_count: usize,
1026        storage_options: Option<&HashMap<String, String>>,
1027    ) -> Self {
1028        let scheme = location.scheme();
1029        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
1030        let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
1031            Some(provider) => provider
1032                .calculate_object_store_prefix(&location, storage_options)
1033                .unwrap(),
1034            None => {
1035                let store_prefix = format!("{}${}", location.scheme(), location.authority());
1036                log::warn!(
1037                    "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
1038                    store_prefix
1039                );
1040                store_prefix
1041            }
1042        };
1043        let store = match wrapper {
1044            Some(wrapper) => wrapper.wrap(&store_prefix, store),
1045            None => store,
1046        };
1047
1048        // Always wrap with IO tracking
1049        let io_tracker = IOTracker::default();
1050        let tracked_store = io_tracker.wrap("", store);
1051
1052        Self {
1053            inner: tracked_store,
1054            scheme: scheme.into(),
1055            block_size,
1056            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
1057            use_constant_size_upload_parts,
1058            list_is_lexically_ordered,
1059            io_parallelism,
1060            download_retry_count,
1061            io_tracker,
1062            store_prefix,
1063        }
1064    }
1065}
1066
1067fn infer_block_size(scheme: &str) -> usize {
1068    // Block size: On local file systems, we use 4KB block size. On cloud
1069    // object stores, we use 64KB block size. This is generally the largest
1070    // block size where we don't see a latency penalty.
1071    match scheme {
1072        "file" => 4 * 1024,
1073        _ => 64 * 1024,
1074    }
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079    use super::*;
1080    use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1081    use object_store::memory::InMemory;
1082    use rstest::rstest;
1083    use std::env::set_current_dir;
1084    use std::fs::{create_dir_all, write};
1085    use std::path::Path as StdPath;
1086    use std::sync::atomic::{AtomicBool, Ordering};
1087
1088    /// Write test content to file.
1089    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1090        let path = expand_path(path_str).map_err(std::io::Error::other)?;
1091        std::fs::create_dir_all(path.parent().unwrap())?;
1092        write(path, contents)
1093    }
1094
1095    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1096        let test_file_store = store.open(path).await.unwrap();
1097        let size = test_file_store.size().await.unwrap();
1098        let bytes = test_file_store.get_range(0..size).await.unwrap();
1099        let contents = String::from_utf8(bytes.to_vec()).unwrap();
1100        Ok(contents)
1101    }
1102
1103    #[tokio::test]
1104    async fn test_absolute_paths() {
1105        let tmp_path = TempStrDir::default();
1106        write_to_file(
1107            &format!("{tmp_path}/bar/foo.lance/test_file"),
1108            "TEST_CONTENT",
1109        )
1110        .unwrap();
1111
1112        // test a few variations of the same path
1113        for uri in &[
1114            format!("{tmp_path}/bar/foo.lance"),
1115            format!("{tmp_path}/./bar/foo.lance"),
1116            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1117        ] {
1118            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1119            let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1120                .await
1121                .unwrap();
1122            assert_eq!(contents, "TEST_CONTENT");
1123        }
1124    }
1125
1126    #[tokio::test]
1127    async fn test_cloud_paths() {
1128        let uri = "s3://bucket/foo.lance";
1129        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1130        assert_eq!(store.scheme, "s3");
1131        assert_eq!(path.to_string(), "foo.lance");
1132
1133        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1134            .await
1135            .unwrap();
1136        assert_eq!(store.scheme, "s3");
1137        assert_eq!(path.to_string(), "foo.lance");
1138
1139        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1140            .await
1141            .unwrap();
1142        assert_eq!(store.scheme, "gs");
1143        assert_eq!(path.to_string(), "foo.lance");
1144
1145        let (store, path) =
1146            ObjectStore::from_uri("abfss://filesystem@account.dfs.core.windows.net/foo.lance")
1147                .await
1148                .unwrap();
1149        assert_eq!(store.scheme, "abfss");
1150        assert_eq!(path.to_string(), "foo.lance");
1151    }
1152
1153    async fn test_block_size_used_test_helper(
1154        uri: &str,
1155        storage_options: Option<HashMap<String, String>>,
1156        default_expected_block_size: usize,
1157    ) {
1158        // Test the default
1159        let registry = Arc::new(ObjectStoreRegistry::default());
1160        let accessor = storage_options
1161            .clone()
1162            .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1163        let params = ObjectStoreParams {
1164            storage_options_accessor: accessor.clone(),
1165            ..ObjectStoreParams::default()
1166        };
1167        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1168            .await
1169            .unwrap();
1170        assert_eq!(store.block_size, default_expected_block_size);
1171
1172        // Ensure param is used
1173        let registry = Arc::new(ObjectStoreRegistry::default());
1174        let params = ObjectStoreParams {
1175            block_size: Some(1024),
1176            storage_options_accessor: accessor,
1177            ..ObjectStoreParams::default()
1178        };
1179        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1180            .await
1181            .unwrap();
1182        assert_eq!(store.block_size, 1024);
1183    }
1184
1185    #[rstest]
1186    #[case("s3://bucket/foo.lance", None)]
1187    #[case("gs://bucket/foo.lance", None)]
1188    #[case("az://account/bucket/foo.lance",
1189      Some(HashMap::from([
1190            (String::from("account_name"), String::from("account")),
1191            (String::from("container_name"), String::from("container"))
1192           ])))]
1193    #[case("abfss://filesystem@account.dfs.core.windows.net/foo.lance",
1194      Some(HashMap::from([
1195            (String::from("account_name"), String::from("account")),
1196            (String::from("container_name"), String::from("filesystem"))
1197           ])))]
1198    #[tokio::test]
1199    async fn test_block_size_used_cloud(
1200        #[case] uri: &str,
1201        #[case] storage_options: Option<HashMap<String, String>>,
1202    ) {
1203        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1204    }
1205
1206    #[rstest]
1207    #[case("file")]
1208    #[case("file-object-store")]
1209    #[case("memory:///bucket/foo.lance")]
1210    #[tokio::test]
1211    async fn test_block_size_used_file(#[case] prefix: &str) {
1212        let tmp_path = TempStrDir::default();
1213        let path = format!("{tmp_path}/bar/foo.lance/test_file");
1214        write_to_file(&path, "URL").unwrap();
1215        let uri = format!("{prefix}:///{path}");
1216        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1217    }
1218
1219    #[tokio::test]
1220    async fn test_relative_paths() {
1221        let tmp_path = TempStrDir::default();
1222        write_to_file(
1223            &format!("{tmp_path}/bar/foo.lance/test_file"),
1224            "RELATIVE_URL",
1225        )
1226        .unwrap();
1227
1228        set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1229        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1230
1231        let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1232            .await
1233            .unwrap();
1234        assert_eq!(contents, "RELATIVE_URL");
1235    }
1236
1237    #[tokio::test]
1238    async fn test_tilde_expansion() {
1239        let uri = "~/foo.lance";
1240        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1241        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1242        let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1243            .await
1244            .unwrap();
1245        assert_eq!(contents, "TILDE");
1246    }
1247
1248    #[tokio::test]
1249    async fn test_read_directory() {
1250        let path = TempStdDir::default();
1251        create_dir_all(path.join("foo").join("bar")).unwrap();
1252        create_dir_all(path.join("foo").join("zoo")).unwrap();
1253        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1254        write_to_file(
1255            path.join("foo").join("test_file").to_str().unwrap(),
1256            "read_dir",
1257        )
1258        .unwrap();
1259        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1260
1261        let sub_dirs = store.read_dir(base.clone().join("foo")).await.unwrap();
1262        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1263    }
1264
1265    #[tokio::test]
1266    async fn test_delete_directory_local_store() {
1267        test_delete_directory("").await;
1268    }
1269
1270    #[tokio::test]
1271    async fn test_delete_directory_file_object_store() {
1272        test_delete_directory("file-object-store").await;
1273    }
1274
1275    async fn test_delete_directory(scheme: &str) {
1276        let path = TempStdDir::default();
1277        create_dir_all(path.join("foo").join("bar")).unwrap();
1278        create_dir_all(path.join("foo").join("zoo")).unwrap();
1279        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1280        write_to_file(
1281            path.join("foo")
1282                .join("bar")
1283                .join("test_file")
1284                .to_str()
1285                .unwrap(),
1286            "delete",
1287        )
1288        .unwrap();
1289        let file_url = Url::from_directory_path(&path).unwrap();
1290        let url = if scheme.is_empty() {
1291            file_url
1292        } else {
1293            let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1294            // Use the file:// URL's normalized path so this works on Windows too.
1295            url.set_path(file_url.path());
1296            url
1297        };
1298        let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1299        store
1300            .remove_dir_all(base.clone().join("foo"))
1301            .await
1302            .unwrap();
1303
1304        assert!(!path.join("foo").exists());
1305    }
1306
1307    #[derive(Debug)]
1308    struct TestWrapper {
1309        called: AtomicBool,
1310
1311        return_value: Arc<dyn OSObjectStore>,
1312    }
1313
1314    impl WrappingObjectStore for TestWrapper {
1315        fn wrap(
1316            &self,
1317            _store_prefix: &str,
1318            _original: Arc<dyn OSObjectStore>,
1319        ) -> Arc<dyn OSObjectStore> {
1320            self.called.store(true, Ordering::Relaxed);
1321
1322            // return a mocked value so we can check if the final store is the one we expect
1323            self.return_value.clone()
1324        }
1325    }
1326
1327    impl TestWrapper {
1328        fn called(&self) -> bool {
1329            self.called.load(Ordering::Relaxed)
1330        }
1331    }
1332
1333    #[tokio::test]
1334    async fn test_wrapping_object_store_option_is_used() {
1335        // Make a store for the inner store first
1336        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1337        let registry = Arc::new(ObjectStoreRegistry::default());
1338
1339        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1340
1341        let wrapper = Arc::new(TestWrapper {
1342            called: AtomicBool::new(false),
1343            return_value: mock_inner_store.clone(),
1344        });
1345
1346        let params = ObjectStoreParams {
1347            object_store_wrapper: Some(wrapper.clone()),
1348            ..ObjectStoreParams::default()
1349        };
1350
1351        // not called yet
1352        assert!(!wrapper.called());
1353
1354        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
1355            .await
1356            .unwrap();
1357
1358        // called after construction
1359        assert!(wrapper.called());
1360
1361        // hard to compare two trait pointers as the point to vtables
1362        // using the ref count as a proxy to make sure that the store is correctly kept
1363        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1364    }
1365
1366    #[tokio::test]
1367    async fn test_local_paths() {
1368        let file_path = TempStdFile::default();
1369        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1370        writer.write_all(b"LOCAL").await.unwrap();
1371        Writer::shutdown(&mut writer).await.unwrap();
1372
1373        let reader = ObjectStore::open_local(&file_path).await.unwrap();
1374        let buf = reader.get_range(0..5).await.unwrap();
1375        assert_eq!(buf.as_ref(), b"LOCAL");
1376    }
1377
1378    #[tokio::test]
1379    async fn test_read_one() {
1380        let file_path = TempStdFile::default();
1381        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1382        writer.write_all(b"LOCAL").await.unwrap();
1383        Writer::shutdown(&mut writer).await.unwrap();
1384
1385        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1386        let obj_store = ObjectStore::local();
1387        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1388        assert_eq!(buf.as_ref(), b"LOCAL");
1389
1390        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1391        assert_eq!(buf.as_ref(), b"LOCAL");
1392    }
1393
1394    #[tokio::test]
1395    #[cfg(windows)]
1396    async fn test_windows_paths() {
1397        use std::path::Component;
1398        use std::path::Prefix;
1399        use std::path::Prefix::*;
1400
1401        fn get_path_prefix(path: &StdPath) -> Prefix<'_> {
1402            match path.components().next().unwrap() {
1403                Component::Prefix(prefix_component) => prefix_component.kind(),
1404                _ => panic!(),
1405            }
1406        }
1407
1408        fn get_drive_letter(prefix: Prefix) -> String {
1409            match prefix {
1410                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1411                _ => panic!(),
1412            }
1413        }
1414
1415        let tmp_path = TempStdFile::default();
1416        let prefix = get_path_prefix(&tmp_path);
1417        let drive_letter = get_drive_letter(prefix);
1418
1419        write_to_file(
1420            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1421            "WINDOWS",
1422        )
1423        .unwrap();
1424
1425        for uri in &[
1426            format!("{drive_letter}:/test_folder/test.lance"),
1427            format!("{drive_letter}:\\test_folder\\test.lance"),
1428        ] {
1429            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1430            let contents = read_from_store(store.as_ref(), &base.clone().join("test_file"))
1431                .await
1432                .unwrap();
1433            assert_eq!(contents, "WINDOWS");
1434        }
1435    }
1436
1437    #[tokio::test]
1438    async fn test_cross_filesystem_copy() {
1439        // Create two temporary directories that simulate different filesystems
1440        let source_dir = TempStdDir::default();
1441        let dest_dir = TempStdDir::default();
1442
1443        // Create a test file in the source directory
1444        let source_file_name = "test_file.txt";
1445        let source_file = source_dir.join(source_file_name);
1446        std::fs::write(&source_file, b"test content").unwrap();
1447
1448        // Create ObjectStore for local filesystem
1449        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1450            .await
1451            .unwrap();
1452
1453        // Create paths relative to the ObjectStore base
1454        let from_path = base_path.clone().join(source_file_name);
1455
1456        // Use object_store::Path::parse for the destination
1457        let dest_file = dest_dir.join("copied_file.txt");
1458        let dest_str = dest_file.to_str().unwrap();
1459        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1460
1461        // Perform the copy operation
1462        store.copy(&from_path, &to_path).await.unwrap();
1463
1464        // Verify the file was copied correctly
1465        assert!(dest_file.exists());
1466        let copied_content = std::fs::read(&dest_file).unwrap();
1467        assert_eq!(copied_content, b"test content");
1468    }
1469
1470    #[tokio::test]
1471    async fn test_copy_creates_parent_directories() {
1472        let source_dir = TempStdDir::default();
1473        let dest_dir = TempStdDir::default();
1474
1475        // Create a test file in the source directory
1476        let source_file_name = "test_file.txt";
1477        let source_file = source_dir.join(source_file_name);
1478        std::fs::write(&source_file, b"test content").unwrap();
1479
1480        // Create ObjectStore for local filesystem
1481        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1482            .await
1483            .unwrap();
1484
1485        // Create paths
1486        let from_path = base_path.clone().join(source_file_name);
1487
1488        // Create destination with nested directories that don't exist yet
1489        let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1490        let dest_str = dest_file.to_str().unwrap();
1491        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1492
1493        // Perform the copy operation - should create parent directories
1494        store.copy(&from_path, &to_path).await.unwrap();
1495
1496        // Verify the file was copied correctly and directories were created
1497        assert!(dest_file.exists());
1498        assert!(dest_file.parent().unwrap().exists());
1499        let copied_content = std::fs::read(&dest_file).unwrap();
1500        assert_eq!(copied_content, b"test content");
1501    }
1502
1503    #[test]
1504    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1505    fn test_client_options_extracts_headers() {
1506        let opts = StorageOptions(HashMap::from([
1507            ("headers.x-custom-foo".to_string(), "bar".to_string()),
1508            ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1509            ("region".to_string(), "us-west-2".to_string()),
1510        ]));
1511        let client_options = opts.client_options().unwrap();
1512
1513        // Verify non-header keys are not consumed as headers by creating
1514        // another StorageOptions with no headers.* keys.
1515        let opts_no_headers = StorageOptions(HashMap::from([(
1516            "region".to_string(),
1517            "us-west-2".to_string(),
1518        )]));
1519        opts_no_headers.client_options().unwrap();
1520
1521        // Smoke test: the client_options with headers should be usable
1522        // in a builder (we can't inspect the headers directly, but building
1523        // should not fail).
1524        #[cfg(feature = "gcp")]
1525        {
1526            use object_store::gcp::GoogleCloudStorageBuilder;
1527            let _builder = GoogleCloudStorageBuilder::new()
1528                .with_client_options(client_options)
1529                .with_url("gs://test-bucket");
1530        }
1531    }
1532
1533    #[test]
1534    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1535    fn test_client_options_rejects_invalid_header_name() {
1536        let opts = StorageOptions(HashMap::from([(
1537            "headers.bad header".to_string(),
1538            "value".to_string(),
1539        )]));
1540        let err = opts.client_options().unwrap_err();
1541        assert!(err.to_string().contains("invalid header name"));
1542    }
1543
1544    #[test]
1545    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1546    fn test_client_options_rejects_invalid_header_value() {
1547        let opts = StorageOptions(HashMap::from([(
1548            "headers.x-good-name".to_string(),
1549            "bad\x01value".to_string(),
1550        )]));
1551        let err = opts.client_options().unwrap_err();
1552        assert!(err.to_string().contains("invalid header value"));
1553    }
1554
1555    #[test]
1556    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1557    fn test_client_options_empty_when_no_header_keys() {
1558        let opts = StorageOptions(HashMap::from([
1559            ("region".to_string(), "us-east-1".to_string()),
1560            ("access_key_id".to_string(), "AKID".to_string()),
1561        ]));
1562        opts.client_options().unwrap();
1563    }
1564}