Skip to main content

lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{FutureExt, Stream};
18use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::Error as ObjectStoreError;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::{LocalWriter, WriteResult};
41use crate::traits::Writer;
42use crate::utils::tracking_store::{IOTracker, IoStats};
43use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
44use lance_core::{Error, Result};
45
46// Local disks tend to do fine with a few threads
47// Note: the number of threads here also impacts the number of files
48// we need to read in some situations.  So keeping this at 8 keeps the
49// RAM on our scanner down.
50pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
51// Cloud disks often need many many threads to saturate the network
52pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
53
54const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
55#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
56const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
57
58pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
59    std::env::var("LANCE_MAX_IOP_SIZE")
60        .map(|val| val.parse().unwrap())
61        .unwrap_or(16 * 1024 * 1024)
62});
63
64pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
65
66pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
67pub use storage_options::{
68    EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
69    StorageOptionsAccessor, StorageOptionsProvider,
70};
71
72#[async_trait]
73pub trait ObjectStoreExt {
74    /// Returns true if the file exists.
75    async fn exists(&self, path: &Path) -> Result<bool>;
76
77    /// Read all files (start from base directory) recursively
78    ///
79    /// unmodified_since can be specified to only return files that have not been modified since the given time.
80    fn read_dir_all<'a, 'b>(
81        &'a self,
82        dir_path: impl Into<&'b Path> + Send,
83        unmodified_since: Option<DateTime<Utc>>,
84    ) -> BoxStream<'a, Result<ObjectMeta>>;
85}
86
87#[async_trait]
88impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
89    fn read_dir_all<'a, 'b>(
90        &'a self,
91        dir_path: impl Into<&'b Path> + Send,
92        unmodified_since: Option<DateTime<Utc>>,
93    ) -> BoxStream<'a, Result<ObjectMeta>> {
94        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
95        if let Some(unmodified_since_val) = unmodified_since {
96            output
97                .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
98                .boxed()
99        } else {
100            output.boxed()
101        }
102    }
103
104    async fn exists(&self, path: &Path) -> Result<bool> {
105        match self.head(path).await {
106            Ok(_) => Ok(true),
107            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
108            Err(e) => Err(e.into()),
109        }
110    }
111}
112
113/// Wraps [ObjectStore](object_store::ObjectStore)
114#[derive(Debug, Clone)]
115pub struct ObjectStore {
116    // Inner object store
117    pub inner: Arc<dyn OSObjectStore>,
118    scheme: String,
119    block_size: usize,
120    max_iop_size: u64,
121    /// Whether to use constant size upload parts for multipart uploads. This
122    /// is only necessary for Cloudflare R2.
123    pub use_constant_size_upload_parts: bool,
124    /// Whether we can assume that the list of files is lexically ordered. This
125    /// is true for object stores, but not for local filesystems.
126    pub list_is_lexically_ordered: bool,
127    io_parallelism: usize,
128    /// Number of times to retry a failed download
129    download_retry_count: usize,
130    /// IO tracker for monitoring read/write operations
131    io_tracker: IOTracker,
132    /// The datastore prefix that uniquely identifies this object store. It encodes information
133    /// which usually cannot be found in the URL such as Azure account name. The prefix plus the
134    /// path uniquely identifies any object inside the store.
135    pub store_prefix: String,
136}
137
138impl DeepSizeOf for ObjectStore {
139    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
140        // We aren't counting `inner` here which is problematic but an ObjectStore
141        // shouldn't be too big.  The only exception might be the write cache but, if
142        // the writer cache has data, it means we're using it somewhere else that isn't
143        // a cache and so that doesn't really count.
144        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
145    }
146}
147
148impl std::fmt::Display for ObjectStore {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        write!(f, "ObjectStore({})", self.scheme)
151    }
152}
153
154pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
155    /// Wrap an object store with additional functionality
156    ///
157    /// The store_prefix is a string which uniquely identifies the object
158    /// store being wrapped.
159    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
160}
161
162#[derive(Debug, Clone)]
163pub struct ChainedWrappingObjectStore {
164    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
165}
166
167impl ChainedWrappingObjectStore {
168    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
169        Self { wrappers }
170    }
171
172    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
173        self.wrappers.push(wrapper);
174    }
175}
176
177impl WrappingObjectStore for ChainedWrappingObjectStore {
178    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
179        self.wrappers
180            .iter()
181            .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
182    }
183}
184
185/// Parameters to create an [ObjectStore]
186///
187#[derive(Debug, Clone)]
188pub struct ObjectStoreParams {
189    pub block_size: Option<usize>,
190    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
191    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
192    /// Refresh offset for AWS credentials when using the legacy AWS credentials path.
193    /// For StorageOptionsAccessor, use `refresh_offset_millis` storage option instead.
194    pub s3_credentials_refresh_offset: Duration,
195    #[cfg(feature = "aws")]
196    pub aws_credentials: Option<AwsCredentialProvider>,
197    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
198    /// Unified storage options accessor with caching and automatic refresh
199    ///
200    /// Provides storage options and optionally a dynamic provider for automatic
201    /// credential refresh. Use `StorageOptionsAccessor::with_static_options()` for static
202    /// options or `StorageOptionsAccessor::with_initial_and_provider()` for dynamic refresh.
203    pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
204    /// Use constant size upload parts for multipart uploads. Only necessary
205    /// for Cloudflare R2, which doesn't support variable size parts. When this
206    /// is false, max upload size is 2.5TB. When this is true, the max size is
207    /// 50GB.
208    pub use_constant_size_upload_parts: bool,
209    pub list_is_lexically_ordered: Option<bool>,
210}
211
212impl Default for ObjectStoreParams {
213    fn default() -> Self {
214        #[allow(deprecated)]
215        Self {
216            object_store: None,
217            block_size: None,
218            s3_credentials_refresh_offset: Duration::from_secs(60),
219            #[cfg(feature = "aws")]
220            aws_credentials: None,
221            object_store_wrapper: None,
222            storage_options_accessor: None,
223            use_constant_size_upload_parts: false,
224            list_is_lexically_ordered: None,
225        }
226    }
227}
228
229impl ObjectStoreParams {
230    /// Get the StorageOptionsAccessor from the params
231    pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
232        self.storage_options_accessor.clone()
233    }
234
235    /// Get storage options from the accessor, if any
236    ///
237    /// Returns the initial storage options from the accessor without triggering refresh.
238    pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
239        self.storage_options_accessor
240            .as_ref()
241            .and_then(|a| a.initial_storage_options())
242    }
243}
244
245// We implement hash for caching
246impl std::hash::Hash for ObjectStoreParams {
247    #[allow(deprecated)]
248    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
249        // For hashing, we use pointer values for ObjectStore, S3 credentials, wrapper
250        self.block_size.hash(state);
251        if let Some((store, url)) = &self.object_store {
252            Arc::as_ptr(store).hash(state);
253            url.hash(state);
254        }
255        self.s3_credentials_refresh_offset.hash(state);
256        #[cfg(feature = "aws")]
257        if let Some(aws_credentials) = &self.aws_credentials {
258            Arc::as_ptr(aws_credentials).hash(state);
259        }
260        if let Some(wrapper) = &self.object_store_wrapper {
261            Arc::as_ptr(wrapper).hash(state);
262        }
263        if let Some(accessor) = &self.storage_options_accessor {
264            accessor.accessor_id().hash(state);
265        }
266        self.use_constant_size_upload_parts.hash(state);
267        self.list_is_lexically_ordered.hash(state);
268    }
269}
270
271// We implement eq for caching
272impl Eq for ObjectStoreParams {}
273impl PartialEq for ObjectStoreParams {
274    #[allow(deprecated)]
275    fn eq(&self, other: &Self) -> bool {
276        #[cfg(feature = "aws")]
277        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
278            return false;
279        }
280
281        // For equality, we use pointer comparison for ObjectStore, S3 credentials, wrapper
282        // For accessor, we use accessor_id() for semantic equality
283        self.block_size == other.block_size
284            && self
285                .object_store
286                .as_ref()
287                .map(|(store, url)| (Arc::as_ptr(store), url))
288                == other
289                    .object_store
290                    .as_ref()
291                    .map(|(store, url)| (Arc::as_ptr(store), url))
292            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
293            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
294                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
295            && self
296                .storage_options_accessor
297                .as_ref()
298                .map(|a| a.accessor_id())
299                == other
300                    .storage_options_accessor
301                    .as_ref()
302                    .map(|a| a.accessor_id())
303            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
304            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
305    }
306}
307
308/// Convert a URI string or local path to a URL
309///
310/// This function handles both proper URIs (with schemes like `file://`, `s3://`, etc.)
311/// and plain local filesystem paths. On Windows, it correctly handles drive letters
312/// that might be parsed as URL schemes.
313///
314/// # Examples
315///
316/// ```
317/// # use lance_io::object_store::uri_to_url;
318/// // URIs are preserved
319/// let url = uri_to_url("s3://bucket/path").unwrap();
320/// assert_eq!(url.scheme(), "s3");
321///
322/// // Local paths are converted to file:// URIs
323/// # #[cfg(unix)]
324/// let url = uri_to_url("/tmp/data").unwrap();
325/// # #[cfg(unix)]
326/// assert_eq!(url.scheme(), "file");
327/// ```
328pub fn uri_to_url(uri: &str) -> Result<Url> {
329    match Url::parse(uri) {
330        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
331            // On Windows, the drive is parsed as a scheme
332            local_path_to_url(uri)
333        }
334        Ok(url) => Ok(url),
335        Err(_) => local_path_to_url(uri),
336    }
337}
338
339fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
340    let str_path = str_path.as_ref();
341    let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
342
343    let mut expanded_path = path_abs::PathAbs::new(expanded)
344        .unwrap()
345        .as_path()
346        .to_path_buf();
347    // path_abs::PathAbs::new(".") returns an empty string.
348    if let Some(s) = expanded_path.as_path().to_str()
349        && s.is_empty()
350    {
351        expanded_path = std::env::current_dir()?;
352    }
353
354    Ok(expanded_path)
355}
356
357fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
358    let home_dir = std::env::home_dir()?;
359    if path == "~" {
360        return Some(home_dir);
361    }
362    if let Some(stripped) = path.strip_prefix("~/") {
363        return Some(home_dir.join(stripped));
364    }
365    #[cfg(windows)]
366    if let Some(stripped) = path.strip_prefix("~\\") {
367        return Some(home_dir.join(stripped));
368    }
369
370    None
371}
372
373fn local_path_to_url(str_path: &str) -> Result<Url> {
374    let expanded_path = expand_path(str_path)?;
375
376    Url::from_directory_path(expanded_path).map_err(|_| {
377        Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
378    })
379}
380
381#[cfg(feature = "huggingface")]
382fn parse_hf_repo_id(url: &Url) -> Result<String> {
383    // Accept forms with repo type prefix (models/datasets/spaces) or legacy without.
384    let mut segments: Vec<String> = Vec::new();
385    if let Some(host) = url.host_str() {
386        segments.push(host.to_string());
387    }
388    segments.extend(
389        url.path()
390            .trim_start_matches('/')
391            .split('/')
392            .map(|s| s.to_string()),
393    );
394
395    if segments.len() < 2 {
396        return Err(Error::invalid_input(
397            "Huggingface URL must contain at least owner and repo",
398        ));
399    }
400
401    let repo_type_candidates = ["models", "datasets", "spaces"];
402    let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
403        if segments.len() < 3 {
404            return Err(Error::invalid_input(
405                "Huggingface URL missing owner/repo after repo type",
406            ));
407        }
408        (segments[1].as_str(), segments[2].as_str())
409    } else {
410        (segments[0].as_str(), segments[1].as_str())
411    };
412
413    let repo = repo_with_rev
414        .split_once('@')
415        .map(|(r, _)| r)
416        .unwrap_or(repo_with_rev);
417    Ok(format!("{owner}/{repo}"))
418}
419
420impl ObjectStore {
421    /// Parse from a string URI.
422    ///
423    /// Returns the ObjectStore instance and the absolute path to the object.
424    ///
425    /// This uses the default [ObjectStoreRegistry] to find the object store. To
426    /// allow for potential re-use of object store instances, it's recommended to
427    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
428    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
429        let registry = Arc::new(ObjectStoreRegistry::default());
430
431        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
432    }
433
434    /// Parse from a string URI.
435    ///
436    /// Returns the ObjectStore instance and the absolute path to the object.
437    pub async fn from_uri_and_params(
438        registry: Arc<ObjectStoreRegistry>,
439        uri: &str,
440        params: &ObjectStoreParams,
441    ) -> Result<(Arc<Self>, Path)> {
442        #[allow(deprecated)]
443        if let Some((store, path)) = params.object_store.as_ref() {
444            let mut inner = store.clone();
445            let store_prefix =
446                registry.calculate_object_store_prefix(uri, params.storage_options())?;
447            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
448                inner = wrapper.wrap(&store_prefix, inner);
449            }
450
451            // Always wrap with IO tracking
452            let io_tracker = IOTracker::default();
453            let tracked_store = io_tracker.wrap("", inner);
454
455            let store = Self {
456                inner: tracked_store,
457                scheme: path.scheme().to_string(),
458                block_size: params.block_size.unwrap_or(64 * 1024),
459                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
460                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
461                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
462                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
463                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
464                io_tracker,
465                store_prefix,
466            };
467            let path = Path::parse(path.path())?;
468            return Ok((Arc::new(store), path));
469        }
470        let url = uri_to_url(uri)?;
471
472        let store = registry.get_store(url.clone(), params).await?;
473        // We know the scheme is valid if we got a store back.
474        let provider = registry.get_provider(url.scheme()).expect_ok()?;
475        let path = provider.extract_path(&url)?;
476
477        Ok((store, path))
478    }
479
480    /// Extract the path component from a URI without initializing the object store.
481    ///
482    /// This is a synchronous operation that only parses the URI and extracts the path,
483    /// without creating or initializing any object store instance.
484    ///
485    /// # Arguments
486    ///
487    /// * `registry` - The object store registry to get the provider
488    /// * `uri` - The URI to extract the path from
489    ///
490    /// # Returns
491    ///
492    /// The extracted path component
493    pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
494        let url = uri_to_url(uri)?;
495        let provider = registry
496            .get_provider(url.scheme())
497            .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
498        provider.extract_path(&url)
499    }
500
501    #[deprecated(note = "Use `from_uri` instead")]
502    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
503        Self::from_uri_and_params(
504            Arc::new(ObjectStoreRegistry::default()),
505            str_path,
506            &Default::default(),
507        )
508        .now_or_never()
509        .unwrap()
510    }
511
512    /// Local object store.
513    pub fn local() -> Self {
514        let provider = FileStoreProvider;
515        provider
516            .new_store(Url::parse("file:///").unwrap(), &Default::default())
517            .now_or_never()
518            .unwrap()
519            .unwrap()
520    }
521
522    /// Create a in-memory object store directly for testing.
523    pub fn memory() -> Self {
524        let provider = MemoryStoreProvider;
525        provider
526            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
527            .now_or_never()
528            .unwrap()
529            .unwrap()
530    }
531
532    /// Returns true if the object store pointed to a local file system.
533    pub fn is_local(&self) -> bool {
534        self.scheme == "file"
535    }
536
537    pub fn is_cloud(&self) -> bool {
538        self.scheme != "file" && self.scheme != "memory"
539    }
540
541    pub fn scheme(&self) -> &str {
542        &self.scheme
543    }
544
545    pub fn block_size(&self) -> usize {
546        self.block_size
547    }
548
549    pub fn max_iop_size(&self) -> u64 {
550        self.max_iop_size
551    }
552
553    pub fn io_parallelism(&self) -> usize {
554        std::env::var("LANCE_IO_THREADS")
555            .map(|val| val.parse::<usize>().unwrap())
556            .unwrap_or(self.io_parallelism)
557    }
558
559    /// Get the IO tracker for this object store
560    ///
561    /// The IO tracker can be used to get statistics about read/write operations
562    /// performed on this object store.
563    pub fn io_tracker(&self) -> &IOTracker {
564        &self.io_tracker
565    }
566
567    /// Get a snapshot of current IO statistics without resetting counters
568    ///
569    /// Returns the current IO statistics without modifying the internal state.
570    /// Use this when you need to check stats without resetting them.
571    pub fn io_stats_snapshot(&self) -> IoStats {
572        self.io_tracker.stats()
573    }
574
575    /// Get incremental IO statistics since the last call to this method
576    ///
577    /// Returns the accumulated statistics since the last call and resets the
578    /// counters to zero. This is useful for tracking IO operations between
579    /// different stages of processing.
580    pub fn io_stats_incremental(&self) -> IoStats {
581        self.io_tracker.incremental_stats()
582    }
583
584    /// Open a file for path.
585    ///
586    /// Parameters
587    /// - ``path``: Absolute path to the file.
588    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
589        match self.scheme.as_str() {
590            "file" => {
591                LocalObjectReader::open_with_tracker(
592                    path,
593                    self.block_size,
594                    None,
595                    Arc::new(self.io_tracker.clone()),
596                )
597                .await
598            }
599            _ => Ok(Box::new(CloudObjectReader::new(
600                self.inner.clone(),
601                path.clone(),
602                self.block_size,
603                None,
604                self.download_retry_count,
605            )?)),
606        }
607    }
608
609    /// Open a reader for a file with known size.
610    ///
611    /// This size may either have been retrieved from a list operation or
612    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
613    /// call.
614    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
615        // If we know the file is really small, we can read the whole thing
616        // as a single request.
617        if known_size <= self.block_size {
618            return Ok(Box::new(SmallReader::new(
619                self.inner.clone(),
620                path.clone(),
621                self.download_retry_count,
622                known_size,
623            )));
624        }
625
626        match self.scheme.as_str() {
627            "file" => {
628                LocalObjectReader::open_with_tracker(
629                    path,
630                    self.block_size,
631                    Some(known_size),
632                    Arc::new(self.io_tracker.clone()),
633                )
634                .await
635            }
636            _ => Ok(Box::new(CloudObjectReader::new(
637                self.inner.clone(),
638                path.clone(),
639                self.block_size,
640                Some(known_size),
641                self.download_retry_count,
642            )?)),
643        }
644    }
645
646    /// Create an [ObjectWriter] from local [std::path::Path]
647    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
648        let object_store = Self::local();
649        let absolute_path = expand_path(path.to_string_lossy())?;
650        let os_path = Path::from_absolute_path(absolute_path)?;
651        ObjectWriter::new(&object_store, &os_path).await
652    }
653
654    /// Open an [Reader] from local [std::path::Path]
655    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
656        let object_store = Self::local();
657        let absolute_path = expand_path(path.to_string_lossy())?;
658        let os_path = Path::from_absolute_path(absolute_path)?;
659        object_store.open(&os_path).await
660    }
661
662    /// Create a new file.
663    pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
664        match self.scheme.as_str() {
665            "file" => {
666                let local_path = super::local::to_local_path(path);
667                let local_path = std::path::PathBuf::from(&local_path);
668                if let Some(parent) = local_path.parent() {
669                    tokio::fs::create_dir_all(parent).await?;
670                }
671                let parent = local_path
672                    .parent()
673                    .expect("file path must have parent")
674                    .to_owned();
675                let named_temp =
676                    tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
677                        .await
678                        .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
679                let (std_file, temp_path) = named_temp.into_parts();
680                let file = tokio::fs::File::from_std(std_file);
681                Ok(Box::new(LocalWriter::new(
682                    file,
683                    path.clone(),
684                    temp_path,
685                    Arc::new(self.io_tracker.clone()),
686                )))
687            }
688            _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
689        }
690    }
691
692    /// A helper function to create a file and write content to it.
693    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
694        let mut writer = self.create(path).await?;
695        writer.write_all(content).await?;
696        Writer::shutdown(writer.as_mut()).await
697    }
698
699    pub async fn delete(&self, path: &Path) -> Result<()> {
700        self.inner.delete(path).await?;
701        Ok(())
702    }
703
704    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
705        if self.is_local() {
706            // Use std::fs::copy for local filesystem to support cross-filesystem copies
707            return super::local::copy_file(from, to);
708        }
709        Ok(self.inner.copy(from, to).await?)
710    }
711
712    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
713    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
714        let path = dir_path.into();
715        let path = Path::parse(&path)?;
716        let output = self.inner.list_with_delimiter(Some(&path)).await?;
717        Ok(output
718            .common_prefixes
719            .iter()
720            .chain(output.objects.iter().map(|o| &o.location))
721            .map(|s| s.filename().unwrap().to_string())
722            .collect())
723    }
724
725    pub fn list(
726        &self,
727        path: Option<Path>,
728    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
729        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
730    }
731
732    /// Read all files (start from base directory) recursively
733    ///
734    /// unmodified_since can be specified to only return files that have not been modified since the given time.
735    pub fn read_dir_all<'a, 'b>(
736        &'a self,
737        dir_path: impl Into<&'b Path> + Send,
738        unmodified_since: Option<DateTime<Utc>>,
739    ) -> BoxStream<'a, Result<ObjectMeta>> {
740        self.inner.read_dir_all(dir_path, unmodified_since)
741    }
742
743    /// Remove a directory recursively.
744    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
745        let path = dir_path.into();
746        let path = Path::parse(&path)?;
747
748        if self.is_local() {
749            // The local file system provider needs to delete both files and directories.
750            return super::local::remove_dir_all(&path);
751        }
752        let sub_entries = self
753            .inner
754            .list(Some(&path))
755            .map(|m| m.map(|meta| meta.location))
756            .boxed();
757        self.inner
758            .delete_stream(sub_entries)
759            .try_collect::<Vec<_>>()
760            .await?;
761        if self.scheme == "file-object-store" {
762            // file-object-store tries to do everything as similarly as possible to the remote
763            // object stores. But we still have to delete the directory entries afterwards.
764            return super::local::remove_dir_all(&path);
765        }
766        Ok(())
767    }
768
769    pub fn remove_stream<'a>(
770        &'a self,
771        locations: BoxStream<'a, Result<Path>>,
772    ) -> BoxStream<'a, Result<Path>> {
773        self.inner
774            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
775            .err_into::<Error>()
776            .boxed()
777    }
778
779    /// Check a file exists.
780    pub async fn exists(&self, path: &Path) -> Result<bool> {
781        match self.inner.head(path).await {
782            Ok(_) => Ok(true),
783            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
784            Err(e) => Err(e.into()),
785        }
786    }
787
788    /// Get file size.
789    pub async fn size(&self, path: &Path) -> Result<u64> {
790        Ok(self.inner.head(path).await?.size)
791    }
792
793    /// Convenience function to open a reader and read all the bytes
794    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
795        let reader = self.open(path).await?;
796        Ok(reader.get_all().await?)
797    }
798
799    /// Convenience function open a reader and make a single request
800    ///
801    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
802    /// and then call [`Reader::get_range`] multiple times.
803    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
804        let reader = self.open(path).await?;
805        Ok(reader.get_range(range).await?)
806    }
807}
808
809/// Options that can be set for multiple object stores
810#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
811pub enum LanceConfigKey {
812    /// Number of times to retry a download that fails
813    DownloadRetryCount,
814}
815
816impl FromStr for LanceConfigKey {
817    type Err = Error;
818
819    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
820        match s.to_ascii_lowercase().as_str() {
821            "download_retry_count" => Ok(Self::DownloadRetryCount),
822            _ => Err(Error::invalid_input_source(
823                format!("Invalid LanceConfigKey: {}", s).into(),
824            )),
825        }
826    }
827}
828
829#[derive(Clone, Debug, Default)]
830pub struct StorageOptions(pub HashMap<String, String>);
831
832impl StorageOptions {
833    /// Create a new instance of [`StorageOptions`]
834    pub fn new(options: HashMap<String, String>) -> Self {
835        let mut options = options;
836        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
837            options.insert("allow_http".into(), value);
838        }
839        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
840            options.insert("allow_http".into(), value);
841        }
842        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
843            options.insert("allow_http".into(), value);
844        }
845        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
846            options.insert("client_max_retries".into(), value);
847        }
848        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
849            options.insert("client_retry_timeout".into(), value);
850        }
851        Self(options)
852    }
853
854    /// Denotes if unsecure connections via http are allowed
855    pub fn allow_http(&self) -> bool {
856        self.0.iter().any(|(key, value)| {
857            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
858        })
859    }
860
861    /// Number of times to retry a download that fails
862    pub fn download_retry_count(&self) -> usize {
863        self.0
864            .iter()
865            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
866            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
867            .unwrap_or(3)
868    }
869
870    /// Max retry times to set in RetryConfig for object store client
871    pub fn client_max_retries(&self) -> usize {
872        self.0
873            .iter()
874            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
875            .and_then(|(_, value)| value.parse::<usize>().ok())
876            .unwrap_or(10)
877    }
878
879    /// Seconds of timeout to set in RetryConfig for object store client
880    pub fn client_retry_timeout(&self) -> u64 {
881        self.0
882            .iter()
883            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
884            .and_then(|(_, value)| value.parse::<u64>().ok())
885            .unwrap_or(180)
886    }
887
888    pub fn get(&self, key: &str) -> Option<&String> {
889        self.0.get(key)
890    }
891
892    /// Build [`ClientOptions`] with default headers extracted from `headers.*` keys.
893    ///
894    /// Keys prefixed with `headers.` are parsed into HTTP headers. For example,
895    /// `headers.x-ms-version = 2023-11-03` results in a default header
896    /// `x-ms-version: 2023-11-03`.
897    ///
898    /// Returns an error if any `headers.*` key has an invalid header name or value.
899    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
900    pub fn client_options(&self) -> Result<ClientOptions> {
901        let mut headers = HeaderMap::new();
902        for (key, value) in &self.0 {
903            if let Some(header_name) = key.strip_prefix("headers.") {
904                let name = header_name
905                    .parse::<http::header::HeaderName>()
906                    .map_err(|e| {
907                        Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
908                    })?;
909                let val = HeaderValue::from_str(value).map_err(|e| {
910                    Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
911                })?;
912                headers.insert(name, val);
913            }
914        }
915        let mut client_options = ClientOptions::default();
916        if !headers.is_empty() {
917            client_options = client_options.with_default_headers(headers);
918        }
919        Ok(client_options)
920    }
921
922    /// Get the expiration time in milliseconds since epoch, if present
923    pub fn expires_at_millis(&self) -> Option<u64> {
924        self.0
925            .get(EXPIRES_AT_MILLIS_KEY)
926            .and_then(|s| s.parse::<u64>().ok())
927    }
928}
929
930impl From<HashMap<String, String>> for StorageOptions {
931    fn from(value: HashMap<String, String>) -> Self {
932        Self::new(value)
933    }
934}
935
936static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
937    std::sync::LazyLock::new(ObjectStoreRegistry::default);
938
939impl ObjectStore {
940    #[allow(clippy::too_many_arguments)]
941    pub fn new(
942        store: Arc<DynObjectStore>,
943        location: Url,
944        block_size: Option<usize>,
945        wrapper: Option<Arc<dyn WrappingObjectStore>>,
946        use_constant_size_upload_parts: bool,
947        list_is_lexically_ordered: bool,
948        io_parallelism: usize,
949        download_retry_count: usize,
950        storage_options: Option<&HashMap<String, String>>,
951    ) -> Self {
952        let scheme = location.scheme();
953        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
954        let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
955            Some(provider) => provider
956                .calculate_object_store_prefix(&location, storage_options)
957                .unwrap(),
958            None => {
959                let store_prefix = format!("{}${}", location.scheme(), location.authority());
960                log::warn!(
961                    "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
962                    store_prefix
963                );
964                store_prefix
965            }
966        };
967        let store = match wrapper {
968            Some(wrapper) => wrapper.wrap(&store_prefix, store),
969            None => store,
970        };
971
972        // Always wrap with IO tracking
973        let io_tracker = IOTracker::default();
974        let tracked_store = io_tracker.wrap("", store);
975
976        Self {
977            inner: tracked_store,
978            scheme: scheme.into(),
979            block_size,
980            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
981            use_constant_size_upload_parts,
982            list_is_lexically_ordered,
983            io_parallelism,
984            download_retry_count,
985            io_tracker,
986            store_prefix,
987        }
988    }
989}
990
991fn infer_block_size(scheme: &str) -> usize {
992    // Block size: On local file systems, we use 4KB block size. On cloud
993    // object stores, we use 64KB block size. This is generally the largest
994    // block size where we don't see a latency penalty.
995    match scheme {
996        "file" => 4 * 1024,
997        _ => 64 * 1024,
998    }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004    use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1005    use object_store::memory::InMemory;
1006    use rstest::rstest;
1007    use std::env::set_current_dir;
1008    use std::fs::{create_dir_all, write};
1009    use std::path::Path as StdPath;
1010    use std::sync::atomic::{AtomicBool, Ordering};
1011
1012    /// Write test content to file.
1013    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1014        let path = expand_path(path_str).map_err(std::io::Error::other)?;
1015        std::fs::create_dir_all(path.parent().unwrap())?;
1016        write(path, contents)
1017    }
1018
1019    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1020        let test_file_store = store.open(path).await.unwrap();
1021        let size = test_file_store.size().await.unwrap();
1022        let bytes = test_file_store.get_range(0..size).await.unwrap();
1023        let contents = String::from_utf8(bytes.to_vec()).unwrap();
1024        Ok(contents)
1025    }
1026
1027    #[tokio::test]
1028    async fn test_absolute_paths() {
1029        let tmp_path = TempStrDir::default();
1030        write_to_file(
1031            &format!("{tmp_path}/bar/foo.lance/test_file"),
1032            "TEST_CONTENT",
1033        )
1034        .unwrap();
1035
1036        // test a few variations of the same path
1037        for uri in &[
1038            format!("{tmp_path}/bar/foo.lance"),
1039            format!("{tmp_path}/./bar/foo.lance"),
1040            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1041        ] {
1042            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1043            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1044                .await
1045                .unwrap();
1046            assert_eq!(contents, "TEST_CONTENT");
1047        }
1048    }
1049
1050    #[tokio::test]
1051    async fn test_cloud_paths() {
1052        let uri = "s3://bucket/foo.lance";
1053        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1054        assert_eq!(store.scheme, "s3");
1055        assert_eq!(path.to_string(), "foo.lance");
1056
1057        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1058            .await
1059            .unwrap();
1060        assert_eq!(store.scheme, "s3");
1061        assert_eq!(path.to_string(), "foo.lance");
1062
1063        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1064            .await
1065            .unwrap();
1066        assert_eq!(store.scheme, "gs");
1067        assert_eq!(path.to_string(), "foo.lance");
1068    }
1069
1070    async fn test_block_size_used_test_helper(
1071        uri: &str,
1072        storage_options: Option<HashMap<String, String>>,
1073        default_expected_block_size: usize,
1074    ) {
1075        // Test the default
1076        let registry = Arc::new(ObjectStoreRegistry::default());
1077        let accessor = storage_options
1078            .clone()
1079            .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1080        let params = ObjectStoreParams {
1081            storage_options_accessor: accessor.clone(),
1082            ..ObjectStoreParams::default()
1083        };
1084        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1085            .await
1086            .unwrap();
1087        assert_eq!(store.block_size, default_expected_block_size);
1088
1089        // Ensure param is used
1090        let registry = Arc::new(ObjectStoreRegistry::default());
1091        let params = ObjectStoreParams {
1092            block_size: Some(1024),
1093            storage_options_accessor: accessor,
1094            ..ObjectStoreParams::default()
1095        };
1096        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1097            .await
1098            .unwrap();
1099        assert_eq!(store.block_size, 1024);
1100    }
1101
1102    #[rstest]
1103    #[case("s3://bucket/foo.lance", None)]
1104    #[case("gs://bucket/foo.lance", None)]
1105    #[case("az://account/bucket/foo.lance",
1106      Some(HashMap::from([
1107            (String::from("account_name"), String::from("account")),
1108            (String::from("container_name"), String::from("container"))
1109           ])))]
1110    #[tokio::test]
1111    async fn test_block_size_used_cloud(
1112        #[case] uri: &str,
1113        #[case] storage_options: Option<HashMap<String, String>>,
1114    ) {
1115        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1116    }
1117
1118    #[rstest]
1119    #[case("file")]
1120    #[case("file-object-store")]
1121    #[case("memory:///bucket/foo.lance")]
1122    #[tokio::test]
1123    async fn test_block_size_used_file(#[case] prefix: &str) {
1124        let tmp_path = TempStrDir::default();
1125        let path = format!("{tmp_path}/bar/foo.lance/test_file");
1126        write_to_file(&path, "URL").unwrap();
1127        let uri = format!("{prefix}:///{path}");
1128        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1129    }
1130
1131    #[tokio::test]
1132    async fn test_relative_paths() {
1133        let tmp_path = TempStrDir::default();
1134        write_to_file(
1135            &format!("{tmp_path}/bar/foo.lance/test_file"),
1136            "RELATIVE_URL",
1137        )
1138        .unwrap();
1139
1140        set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1141        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1142
1143        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1144            .await
1145            .unwrap();
1146        assert_eq!(contents, "RELATIVE_URL");
1147    }
1148
1149    #[tokio::test]
1150    async fn test_tilde_expansion() {
1151        let uri = "~/foo.lance";
1152        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1153        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1154        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1155            .await
1156            .unwrap();
1157        assert_eq!(contents, "TILDE");
1158    }
1159
1160    #[tokio::test]
1161    async fn test_read_directory() {
1162        let path = TempStdDir::default();
1163        create_dir_all(path.join("foo").join("bar")).unwrap();
1164        create_dir_all(path.join("foo").join("zoo")).unwrap();
1165        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1166        write_to_file(
1167            path.join("foo").join("test_file").to_str().unwrap(),
1168            "read_dir",
1169        )
1170        .unwrap();
1171        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1172
1173        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1174        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1175    }
1176
1177    #[tokio::test]
1178    async fn test_delete_directory_local_store() {
1179        test_delete_directory("").await;
1180    }
1181
1182    #[tokio::test]
1183    async fn test_delete_directory_file_object_store() {
1184        test_delete_directory("file-object-store").await;
1185    }
1186
1187    async fn test_delete_directory(scheme: &str) {
1188        let path = TempStdDir::default();
1189        create_dir_all(path.join("foo").join("bar")).unwrap();
1190        create_dir_all(path.join("foo").join("zoo")).unwrap();
1191        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1192        write_to_file(
1193            path.join("foo")
1194                .join("bar")
1195                .join("test_file")
1196                .to_str()
1197                .unwrap(),
1198            "delete",
1199        )
1200        .unwrap();
1201        let file_url = Url::from_directory_path(&path).unwrap();
1202        let url = if scheme.is_empty() {
1203            file_url
1204        } else {
1205            let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1206            // Use the file:// URL's normalized path so this works on Windows too.
1207            url.set_path(file_url.path());
1208            url
1209        };
1210        let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1211        store.remove_dir_all(base.child("foo")).await.unwrap();
1212
1213        assert!(!path.join("foo").exists());
1214    }
1215
1216    #[derive(Debug)]
1217    struct TestWrapper {
1218        called: AtomicBool,
1219
1220        return_value: Arc<dyn OSObjectStore>,
1221    }
1222
1223    impl WrappingObjectStore for TestWrapper {
1224        fn wrap(
1225            &self,
1226            _store_prefix: &str,
1227            _original: Arc<dyn OSObjectStore>,
1228        ) -> Arc<dyn OSObjectStore> {
1229            self.called.store(true, Ordering::Relaxed);
1230
1231            // return a mocked value so we can check if the final store is the one we expect
1232            self.return_value.clone()
1233        }
1234    }
1235
1236    impl TestWrapper {
1237        fn called(&self) -> bool {
1238            self.called.load(Ordering::Relaxed)
1239        }
1240    }
1241
1242    #[tokio::test]
1243    async fn test_wrapping_object_store_option_is_used() {
1244        // Make a store for the inner store first
1245        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1246        let registry = Arc::new(ObjectStoreRegistry::default());
1247
1248        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1249
1250        let wrapper = Arc::new(TestWrapper {
1251            called: AtomicBool::new(false),
1252            return_value: mock_inner_store.clone(),
1253        });
1254
1255        let params = ObjectStoreParams {
1256            object_store_wrapper: Some(wrapper.clone()),
1257            ..ObjectStoreParams::default()
1258        };
1259
1260        // not called yet
1261        assert!(!wrapper.called());
1262
1263        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
1264            .await
1265            .unwrap();
1266
1267        // called after construction
1268        assert!(wrapper.called());
1269
1270        // hard to compare two trait pointers as the point to vtables
1271        // using the ref count as a proxy to make sure that the store is correctly kept
1272        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1273    }
1274
1275    #[tokio::test]
1276    async fn test_local_paths() {
1277        let file_path = TempStdFile::default();
1278        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1279        writer.write_all(b"LOCAL").await.unwrap();
1280        Writer::shutdown(&mut writer).await.unwrap();
1281
1282        let reader = ObjectStore::open_local(&file_path).await.unwrap();
1283        let buf = reader.get_range(0..5).await.unwrap();
1284        assert_eq!(buf.as_ref(), b"LOCAL");
1285    }
1286
1287    #[tokio::test]
1288    async fn test_read_one() {
1289        let file_path = TempStdFile::default();
1290        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1291        writer.write_all(b"LOCAL").await.unwrap();
1292        Writer::shutdown(&mut writer).await.unwrap();
1293
1294        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1295        let obj_store = ObjectStore::local();
1296        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1297        assert_eq!(buf.as_ref(), b"LOCAL");
1298
1299        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1300        assert_eq!(buf.as_ref(), b"LOCAL");
1301    }
1302
1303    #[tokio::test]
1304    #[cfg(windows)]
1305    async fn test_windows_paths() {
1306        use std::path::Component;
1307        use std::path::Prefix;
1308        use std::path::Prefix::*;
1309
1310        fn get_path_prefix(path: &StdPath) -> Prefix {
1311            match path.components().next().unwrap() {
1312                Component::Prefix(prefix_component) => prefix_component.kind(),
1313                _ => panic!(),
1314            }
1315        }
1316
1317        fn get_drive_letter(prefix: Prefix) -> String {
1318            match prefix {
1319                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1320                _ => panic!(),
1321            }
1322        }
1323
1324        let tmp_path = TempStdFile::default();
1325        let prefix = get_path_prefix(&tmp_path);
1326        let drive_letter = get_drive_letter(prefix);
1327
1328        write_to_file(
1329            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1330            "WINDOWS",
1331        )
1332        .unwrap();
1333
1334        for uri in &[
1335            format!("{drive_letter}:/test_folder/test.lance"),
1336            format!("{drive_letter}:\\test_folder\\test.lance"),
1337        ] {
1338            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1339            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1340                .await
1341                .unwrap();
1342            assert_eq!(contents, "WINDOWS");
1343        }
1344    }
1345
1346    #[tokio::test]
1347    async fn test_cross_filesystem_copy() {
1348        // Create two temporary directories that simulate different filesystems
1349        let source_dir = TempStdDir::default();
1350        let dest_dir = TempStdDir::default();
1351
1352        // Create a test file in the source directory
1353        let source_file_name = "test_file.txt";
1354        let source_file = source_dir.join(source_file_name);
1355        std::fs::write(&source_file, b"test content").unwrap();
1356
1357        // Create ObjectStore for local filesystem
1358        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1359            .await
1360            .unwrap();
1361
1362        // Create paths relative to the ObjectStore base
1363        let from_path = base_path.child(source_file_name);
1364
1365        // Use object_store::Path::parse for the destination
1366        let dest_file = dest_dir.join("copied_file.txt");
1367        let dest_str = dest_file.to_str().unwrap();
1368        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1369
1370        // Perform the copy operation
1371        store.copy(&from_path, &to_path).await.unwrap();
1372
1373        // Verify the file was copied correctly
1374        assert!(dest_file.exists());
1375        let copied_content = std::fs::read(&dest_file).unwrap();
1376        assert_eq!(copied_content, b"test content");
1377    }
1378
1379    #[tokio::test]
1380    async fn test_copy_creates_parent_directories() {
1381        let source_dir = TempStdDir::default();
1382        let dest_dir = TempStdDir::default();
1383
1384        // Create a test file in the source directory
1385        let source_file_name = "test_file.txt";
1386        let source_file = source_dir.join(source_file_name);
1387        std::fs::write(&source_file, b"test content").unwrap();
1388
1389        // Create ObjectStore for local filesystem
1390        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1391            .await
1392            .unwrap();
1393
1394        // Create paths
1395        let from_path = base_path.child(source_file_name);
1396
1397        // Create destination with nested directories that don't exist yet
1398        let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1399        let dest_str = dest_file.to_str().unwrap();
1400        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1401
1402        // Perform the copy operation - should create parent directories
1403        store.copy(&from_path, &to_path).await.unwrap();
1404
1405        // Verify the file was copied correctly and directories were created
1406        assert!(dest_file.exists());
1407        assert!(dest_file.parent().unwrap().exists());
1408        let copied_content = std::fs::read(&dest_file).unwrap();
1409        assert_eq!(copied_content, b"test content");
1410    }
1411
1412    #[test]
1413    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1414    fn test_client_options_extracts_headers() {
1415        let opts = StorageOptions(HashMap::from([
1416            ("headers.x-custom-foo".to_string(), "bar".to_string()),
1417            ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1418            ("region".to_string(), "us-west-2".to_string()),
1419        ]));
1420        let client_options = opts.client_options().unwrap();
1421
1422        // Verify non-header keys are not consumed as headers by creating
1423        // another StorageOptions with no headers.* keys.
1424        let opts_no_headers = StorageOptions(HashMap::from([(
1425            "region".to_string(),
1426            "us-west-2".to_string(),
1427        )]));
1428        opts_no_headers.client_options().unwrap();
1429
1430        // Smoke test: the client_options with headers should be usable
1431        // in a builder (we can't inspect the headers directly, but building
1432        // should not fail).
1433        #[cfg(feature = "gcp")]
1434        {
1435            use object_store::gcp::GoogleCloudStorageBuilder;
1436            let _builder = GoogleCloudStorageBuilder::new()
1437                .with_client_options(client_options)
1438                .with_url("gs://test-bucket");
1439        }
1440    }
1441
1442    #[test]
1443    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1444    fn test_client_options_rejects_invalid_header_name() {
1445        let opts = StorageOptions(HashMap::from([(
1446            "headers.bad header".to_string(),
1447            "value".to_string(),
1448        )]));
1449        let err = opts.client_options().unwrap_err();
1450        assert!(err.to_string().contains("invalid header name"));
1451    }
1452
1453    #[test]
1454    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1455    fn test_client_options_rejects_invalid_header_value() {
1456        let opts = StorageOptions(HashMap::from([(
1457            "headers.x-good-name".to_string(),
1458            "bad\x01value".to_string(),
1459        )]));
1460        let err = opts.client_options().unwrap_err();
1461        assert!(err.to_string().contains("invalid header value"));
1462    }
1463
1464    #[test]
1465    #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1466    fn test_client_options_empty_when_no_header_keys() {
1467        let opts = StorageOptions(HashMap::from([
1468            ("region".to_string(), "us-east-1".to_string()),
1469            ("access_key_id".to_string(), "AKID".to_string()),
1470        ]));
1471        opts.client_options().unwrap();
1472    }
1473}