Skip to main content

lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::WriteResult;
41use crate::utils::tracking_store::{IOTracker, IoStats};
42use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
43use lance_core::{Error, Result};
44
45// Local disks tend to do fine with a few threads
46// Note: the number of threads here also impacts the number of files
47// we need to read in some situations.  So keeping this at 8 keeps the
48// RAM on our scanner down.
49pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
50// Cloud disks often need many many threads to saturate the network
51pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
52
53const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
54#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
55const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
56
57pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
58    std::env::var("LANCE_MAX_IOP_SIZE")
59        .map(|val| val.parse().unwrap())
60        .unwrap_or(16 * 1024 * 1024)
61});
62
63pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
64
65pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
66pub use storage_options::{
67    LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor, StorageOptionsProvider,
68    EXPIRES_AT_MILLIS_KEY, REFRESH_OFFSET_MILLIS_KEY,
69};
70
71#[async_trait]
72pub trait ObjectStoreExt {
73    /// Returns true if the file exists.
74    async fn exists(&self, path: &Path) -> Result<bool>;
75
76    /// Read all files (start from base directory) recursively
77    ///
78    /// unmodified_since can be specified to only return files that have not been modified since the given time.
79    fn read_dir_all<'a, 'b>(
80        &'a self,
81        dir_path: impl Into<&'b Path> + Send,
82        unmodified_since: Option<DateTime<Utc>>,
83    ) -> BoxStream<'a, Result<ObjectMeta>>;
84}
85
86#[async_trait]
87impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
88    fn read_dir_all<'a, 'b>(
89        &'a self,
90        dir_path: impl Into<&'b Path> + Send,
91        unmodified_since: Option<DateTime<Utc>>,
92    ) -> BoxStream<'a, Result<ObjectMeta>> {
93        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
94        if let Some(unmodified_since_val) = unmodified_since {
95            output
96                .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
97                .boxed()
98        } else {
99            output.boxed()
100        }
101    }
102
103    async fn exists(&self, path: &Path) -> Result<bool> {
104        match self.head(path).await {
105            Ok(_) => Ok(true),
106            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
107            Err(e) => Err(e.into()),
108        }
109    }
110}
111
112/// Wraps [ObjectStore](object_store::ObjectStore)
113#[derive(Debug, Clone)]
114pub struct ObjectStore {
115    // Inner object store
116    pub inner: Arc<dyn OSObjectStore>,
117    scheme: String,
118    block_size: usize,
119    max_iop_size: u64,
120    /// Whether to use constant size upload parts for multipart uploads. This
121    /// is only necessary for Cloudflare R2.
122    pub use_constant_size_upload_parts: bool,
123    /// Whether we can assume that the list of files is lexically ordered. This
124    /// is true for object stores, but not for local filesystems.
125    pub list_is_lexically_ordered: bool,
126    io_parallelism: usize,
127    /// Number of times to retry a failed download
128    download_retry_count: usize,
129    /// IO tracker for monitoring read/write operations
130    io_tracker: IOTracker,
131    /// The datastore prefix that uniquely identifies this object store. It encodes information
132    /// which usually cannot be found in the URL such as Azure account name. The prefix plus the
133    /// path uniquely identifies any object inside the store.
134    pub store_prefix: String,
135}
136
137impl DeepSizeOf for ObjectStore {
138    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
139        // We aren't counting `inner` here which is problematic but an ObjectStore
140        // shouldn't be too big.  The only exception might be the write cache but, if
141        // the writer cache has data, it means we're using it somewhere else that isn't
142        // a cache and so that doesn't really count.
143        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
144    }
145}
146
147impl std::fmt::Display for ObjectStore {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        write!(f, "ObjectStore({})", self.scheme)
150    }
151}
152
153pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
154    /// Wrap an object store with additional functionality
155    ///
156    /// The store_prefix is a string which uniquely identifies the object
157    /// store being wrapped.
158    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
159}
160
161#[derive(Debug, Clone)]
162pub struct ChainedWrappingObjectStore {
163    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
164}
165
166impl ChainedWrappingObjectStore {
167    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
168        Self { wrappers }
169    }
170
171    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
172        self.wrappers.push(wrapper);
173    }
174}
175
176impl WrappingObjectStore for ChainedWrappingObjectStore {
177    fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
178        self.wrappers
179            .iter()
180            .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
181    }
182}
183
184/// Parameters to create an [ObjectStore]
185///
186#[derive(Debug, Clone)]
187pub struct ObjectStoreParams {
188    pub block_size: Option<usize>,
189    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
190    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
191    /// Refresh offset for AWS credentials when using the legacy AWS credentials path.
192    /// For StorageOptionsAccessor, use `refresh_offset_millis` storage option instead.
193    pub s3_credentials_refresh_offset: Duration,
194    #[cfg(feature = "aws")]
195    pub aws_credentials: Option<AwsCredentialProvider>,
196    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
197    /// Unified storage options accessor with caching and automatic refresh
198    ///
199    /// Provides storage options and optionally a dynamic provider for automatic
200    /// credential refresh. Use `StorageOptionsAccessor::with_static_options()` for static
201    /// options or `StorageOptionsAccessor::with_initial_and_provider()` for dynamic refresh.
202    pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
203    /// Use constant size upload parts for multipart uploads. Only necessary
204    /// for Cloudflare R2, which doesn't support variable size parts. When this
205    /// is false, max upload size is 2.5TB. When this is true, the max size is
206    /// 50GB.
207    pub use_constant_size_upload_parts: bool,
208    pub list_is_lexically_ordered: Option<bool>,
209}
210
211impl Default for ObjectStoreParams {
212    fn default() -> Self {
213        #[allow(deprecated)]
214        Self {
215            object_store: None,
216            block_size: None,
217            s3_credentials_refresh_offset: Duration::from_secs(60),
218            #[cfg(feature = "aws")]
219            aws_credentials: None,
220            object_store_wrapper: None,
221            storage_options_accessor: None,
222            use_constant_size_upload_parts: false,
223            list_is_lexically_ordered: None,
224        }
225    }
226}
227
228impl ObjectStoreParams {
229    /// Get the StorageOptionsAccessor from the params
230    pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
231        self.storage_options_accessor.clone()
232    }
233
234    /// Get storage options from the accessor, if any
235    ///
236    /// Returns the initial storage options from the accessor without triggering refresh.
237    pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
238        self.storage_options_accessor
239            .as_ref()
240            .and_then(|a| a.initial_storage_options())
241    }
242}
243
244// We implement hash for caching
245impl std::hash::Hash for ObjectStoreParams {
246    #[allow(deprecated)]
247    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
248        // For hashing, we use pointer values for ObjectStore, S3 credentials, wrapper
249        self.block_size.hash(state);
250        if let Some((store, url)) = &self.object_store {
251            Arc::as_ptr(store).hash(state);
252            url.hash(state);
253        }
254        self.s3_credentials_refresh_offset.hash(state);
255        #[cfg(feature = "aws")]
256        if let Some(aws_credentials) = &self.aws_credentials {
257            Arc::as_ptr(aws_credentials).hash(state);
258        }
259        if let Some(wrapper) = &self.object_store_wrapper {
260            Arc::as_ptr(wrapper).hash(state);
261        }
262        if let Some(accessor) = &self.storage_options_accessor {
263            accessor.accessor_id().hash(state);
264        }
265        self.use_constant_size_upload_parts.hash(state);
266        self.list_is_lexically_ordered.hash(state);
267    }
268}
269
270// We implement eq for caching
271impl Eq for ObjectStoreParams {}
272impl PartialEq for ObjectStoreParams {
273    #[allow(deprecated)]
274    fn eq(&self, other: &Self) -> bool {
275        #[cfg(feature = "aws")]
276        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
277            return false;
278        }
279
280        // For equality, we use pointer comparison for ObjectStore, S3 credentials, wrapper
281        // For accessor, we use accessor_id() for semantic equality
282        self.block_size == other.block_size
283            && self
284                .object_store
285                .as_ref()
286                .map(|(store, url)| (Arc::as_ptr(store), url))
287                == other
288                    .object_store
289                    .as_ref()
290                    .map(|(store, url)| (Arc::as_ptr(store), url))
291            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
292            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
293                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
294            && self
295                .storage_options_accessor
296                .as_ref()
297                .map(|a| a.accessor_id())
298                == other
299                    .storage_options_accessor
300                    .as_ref()
301                    .map(|a| a.accessor_id())
302            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
303            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
304    }
305}
306
307/// Convert a URI string or local path to a URL
308///
309/// This function handles both proper URIs (with schemes like `file://`, `s3://`, etc.)
310/// and plain local filesystem paths. On Windows, it correctly handles drive letters
311/// that might be parsed as URL schemes.
312///
313/// # Examples
314///
315/// ```
316/// # use lance_io::object_store::uri_to_url;
317/// // URIs are preserved
318/// let url = uri_to_url("s3://bucket/path").unwrap();
319/// assert_eq!(url.scheme(), "s3");
320///
321/// // Local paths are converted to file:// URIs
322/// # #[cfg(unix)]
323/// let url = uri_to_url("/tmp/data").unwrap();
324/// # #[cfg(unix)]
325/// assert_eq!(url.scheme(), "file");
326/// ```
327pub fn uri_to_url(uri: &str) -> Result<Url> {
328    match Url::parse(uri) {
329        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
330            // On Windows, the drive is parsed as a scheme
331            local_path_to_url(uri)
332        }
333        Ok(url) => Ok(url),
334        Err(_) => local_path_to_url(uri),
335    }
336}
337
338fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
339    let expanded = tilde(str_path.as_ref()).to_string();
340
341    let mut expanded_path = path_abs::PathAbs::new(expanded)
342        .unwrap()
343        .as_path()
344        .to_path_buf();
345    // path_abs::PathAbs::new(".") returns an empty string.
346    if let Some(s) = expanded_path.as_path().to_str() {
347        if s.is_empty() {
348            expanded_path = std::env::current_dir()?;
349        }
350    }
351
352    Ok(expanded_path)
353}
354
355fn local_path_to_url(str_path: &str) -> Result<Url> {
356    let expanded_path = expand_path(str_path)?;
357
358    Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
359        source: format!("Invalid table location: '{}'", str_path).into(),
360        location: location!(),
361    })
362}
363
364#[cfg(feature = "huggingface")]
365fn parse_hf_repo_id(url: &Url) -> Result<String> {
366    // Accept forms with repo type prefix (models/datasets/spaces) or legacy without.
367    let mut segments: Vec<String> = Vec::new();
368    if let Some(host) = url.host_str() {
369        segments.push(host.to_string());
370    }
371    segments.extend(
372        url.path()
373            .trim_start_matches('/')
374            .split('/')
375            .map(|s| s.to_string()),
376    );
377
378    if segments.len() < 2 {
379        return Err(Error::invalid_input(
380            "Huggingface URL must contain at least owner and repo",
381            location!(),
382        ));
383    }
384
385    let repo_type_candidates = ["models", "datasets", "spaces"];
386    let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
387        if segments.len() < 3 {
388            return Err(Error::invalid_input(
389                "Huggingface URL missing owner/repo after repo type",
390                location!(),
391            ));
392        }
393        (segments[1].as_str(), segments[2].as_str())
394    } else {
395        (segments[0].as_str(), segments[1].as_str())
396    };
397
398    let repo = repo_with_rev
399        .split_once('@')
400        .map(|(r, _)| r)
401        .unwrap_or(repo_with_rev);
402    Ok(format!("{owner}/{repo}"))
403}
404
405impl ObjectStore {
406    /// Parse from a string URI.
407    ///
408    /// Returns the ObjectStore instance and the absolute path to the object.
409    ///
410    /// This uses the default [ObjectStoreRegistry] to find the object store. To
411    /// allow for potential re-use of object store instances, it's recommended to
412    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
413    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
414        let registry = Arc::new(ObjectStoreRegistry::default());
415
416        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
417    }
418
419    /// Parse from a string URI.
420    ///
421    /// Returns the ObjectStore instance and the absolute path to the object.
422    pub async fn from_uri_and_params(
423        registry: Arc<ObjectStoreRegistry>,
424        uri: &str,
425        params: &ObjectStoreParams,
426    ) -> Result<(Arc<Self>, Path)> {
427        #[allow(deprecated)]
428        if let Some((store, path)) = params.object_store.as_ref() {
429            let mut inner = store.clone();
430            let store_prefix =
431                registry.calculate_object_store_prefix(uri, params.storage_options())?;
432            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
433                inner = wrapper.wrap(&store_prefix, inner);
434            }
435
436            // Always wrap with IO tracking
437            let io_tracker = IOTracker::default();
438            let tracked_store = io_tracker.wrap("", inner);
439
440            let store = Self {
441                inner: tracked_store,
442                scheme: path.scheme().to_string(),
443                block_size: params.block_size.unwrap_or(64 * 1024),
444                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
445                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
446                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
447                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
448                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
449                io_tracker,
450                store_prefix,
451            };
452            let path = Path::parse(path.path())?;
453            return Ok((Arc::new(store), path));
454        }
455        let url = uri_to_url(uri)?;
456
457        let store = registry.get_store(url.clone(), params).await?;
458        // We know the scheme is valid if we got a store back.
459        let provider = registry.get_provider(url.scheme()).expect_ok()?;
460        let path = provider.extract_path(&url)?;
461
462        Ok((store, path))
463    }
464
465    /// Extract the path component from a URI without initializing the object store.
466    ///
467    /// This is a synchronous operation that only parses the URI and extracts the path,
468    /// without creating or initializing any object store instance.
469    ///
470    /// # Arguments
471    ///
472    /// * `registry` - The object store registry to get the provider
473    /// * `uri` - The URI to extract the path from
474    ///
475    /// # Returns
476    ///
477    /// The extracted path component
478    pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
479        let url = uri_to_url(uri)?;
480        let provider = registry.get_provider(url.scheme()).ok_or_else(|| {
481            Error::invalid_input(format!("Unknown scheme: {}", url.scheme()), location!())
482        })?;
483        provider.extract_path(&url)
484    }
485
486    #[deprecated(note = "Use `from_uri` instead")]
487    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
488        Self::from_uri_and_params(
489            Arc::new(ObjectStoreRegistry::default()),
490            str_path,
491            &Default::default(),
492        )
493        .now_or_never()
494        .unwrap()
495    }
496
497    /// Local object store.
498    pub fn local() -> Self {
499        let provider = FileStoreProvider;
500        provider
501            .new_store(Url::parse("file:///").unwrap(), &Default::default())
502            .now_or_never()
503            .unwrap()
504            .unwrap()
505    }
506
507    /// Create a in-memory object store directly for testing.
508    pub fn memory() -> Self {
509        let provider = MemoryStoreProvider;
510        provider
511            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
512            .now_or_never()
513            .unwrap()
514            .unwrap()
515    }
516
517    /// Returns true if the object store pointed to a local file system.
518    pub fn is_local(&self) -> bool {
519        self.scheme == "file"
520    }
521
522    pub fn is_cloud(&self) -> bool {
523        self.scheme != "file" && self.scheme != "memory"
524    }
525
526    pub fn scheme(&self) -> &str {
527        &self.scheme
528    }
529
530    pub fn block_size(&self) -> usize {
531        self.block_size
532    }
533
534    pub fn max_iop_size(&self) -> u64 {
535        self.max_iop_size
536    }
537
538    pub fn io_parallelism(&self) -> usize {
539        std::env::var("LANCE_IO_THREADS")
540            .map(|val| val.parse::<usize>().unwrap())
541            .unwrap_or(self.io_parallelism)
542    }
543
544    /// Get the IO tracker for this object store
545    ///
546    /// The IO tracker can be used to get statistics about read/write operations
547    /// performed on this object store.
548    pub fn io_tracker(&self) -> &IOTracker {
549        &self.io_tracker
550    }
551
552    /// Get a snapshot of current IO statistics without resetting counters
553    ///
554    /// Returns the current IO statistics without modifying the internal state.
555    /// Use this when you need to check stats without resetting them.
556    pub fn io_stats_snapshot(&self) -> IoStats {
557        self.io_tracker.stats()
558    }
559
560    /// Get incremental IO statistics since the last call to this method
561    ///
562    /// Returns the accumulated statistics since the last call and resets the
563    /// counters to zero. This is useful for tracking IO operations between
564    /// different stages of processing.
565    pub fn io_stats_incremental(&self) -> IoStats {
566        self.io_tracker.incremental_stats()
567    }
568
569    /// Open a file for path.
570    ///
571    /// Parameters
572    /// - ``path``: Absolute path to the file.
573    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
574        match self.scheme.as_str() {
575            "file" => {
576                LocalObjectReader::open_with_tracker(
577                    path,
578                    self.block_size,
579                    None,
580                    Arc::new(self.io_tracker.clone()),
581                )
582                .await
583            }
584            _ => Ok(Box::new(CloudObjectReader::new(
585                self.inner.clone(),
586                path.clone(),
587                self.block_size,
588                None,
589                self.download_retry_count,
590            )?)),
591        }
592    }
593
594    /// Open a reader for a file with known size.
595    ///
596    /// This size may either have been retrieved from a list operation or
597    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
598    /// call.
599    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
600        // If we know the file is really small, we can read the whole thing
601        // as a single request.
602        if known_size <= self.block_size {
603            return Ok(Box::new(SmallReader::new(
604                self.inner.clone(),
605                path.clone(),
606                self.download_retry_count,
607                known_size,
608            )));
609        }
610
611        match self.scheme.as_str() {
612            "file" => {
613                LocalObjectReader::open_with_tracker(
614                    path,
615                    self.block_size,
616                    Some(known_size),
617                    Arc::new(self.io_tracker.clone()),
618                )
619                .await
620            }
621            _ => Ok(Box::new(CloudObjectReader::new(
622                self.inner.clone(),
623                path.clone(),
624                self.block_size,
625                Some(known_size),
626                self.download_retry_count,
627            )?)),
628        }
629    }
630
631    /// Create an [ObjectWriter] from local [std::path::Path]
632    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
633        let object_store = Self::local();
634        let absolute_path = expand_path(path.to_string_lossy())?;
635        let os_path = Path::from_absolute_path(absolute_path)?;
636        object_store.create(&os_path).await
637    }
638
639    /// Open an [Reader] from local [std::path::Path]
640    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
641        let object_store = Self::local();
642        let absolute_path = expand_path(path.to_string_lossy())?;
643        let os_path = Path::from_absolute_path(absolute_path)?;
644        object_store.open(&os_path).await
645    }
646
647    /// Create a new file.
648    pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
649        ObjectWriter::new(self, path).await
650    }
651
652    /// A helper function to create a file and write content to it.
653    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
654        let mut writer = self.create(path).await?;
655        writer.write_all(content).await?;
656        writer.shutdown().await
657    }
658
659    pub async fn delete(&self, path: &Path) -> Result<()> {
660        self.inner.delete(path).await?;
661        Ok(())
662    }
663
664    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
665        if self.is_local() {
666            // Use std::fs::copy for local filesystem to support cross-filesystem copies
667            return super::local::copy_file(from, to);
668        }
669        Ok(self.inner.copy(from, to).await?)
670    }
671
672    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
673    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
674        let path = dir_path.into();
675        let path = Path::parse(&path)?;
676        let output = self.inner.list_with_delimiter(Some(&path)).await?;
677        Ok(output
678            .common_prefixes
679            .iter()
680            .chain(output.objects.iter().map(|o| &o.location))
681            .map(|s| s.filename().unwrap().to_string())
682            .collect())
683    }
684
685    pub fn list(
686        &self,
687        path: Option<Path>,
688    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
689        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
690    }
691
692    /// Read all files (start from base directory) recursively
693    ///
694    /// unmodified_since can be specified to only return files that have not been modified since the given time.
695    pub fn read_dir_all<'a, 'b>(
696        &'a self,
697        dir_path: impl Into<&'b Path> + Send,
698        unmodified_since: Option<DateTime<Utc>>,
699    ) -> BoxStream<'a, Result<ObjectMeta>> {
700        self.inner.read_dir_all(dir_path, unmodified_since)
701    }
702
703    /// Remove a directory recursively.
704    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
705        let path = dir_path.into();
706        let path = Path::parse(&path)?;
707
708        if self.is_local() {
709            // Local file system needs to delete directories as well.
710            return super::local::remove_dir_all(&path);
711        }
712        let sub_entries = self
713            .inner
714            .list(Some(&path))
715            .map(|m| m.map(|meta| meta.location))
716            .boxed();
717        self.inner
718            .delete_stream(sub_entries)
719            .try_collect::<Vec<_>>()
720            .await?;
721        Ok(())
722    }
723
724    pub fn remove_stream<'a>(
725        &'a self,
726        locations: BoxStream<'a, Result<Path>>,
727    ) -> BoxStream<'a, Result<Path>> {
728        self.inner
729            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
730            .err_into::<Error>()
731            .boxed()
732    }
733
734    /// Check a file exists.
735    pub async fn exists(&self, path: &Path) -> Result<bool> {
736        match self.inner.head(path).await {
737            Ok(_) => Ok(true),
738            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
739            Err(e) => Err(e.into()),
740        }
741    }
742
743    /// Get file size.
744    pub async fn size(&self, path: &Path) -> Result<u64> {
745        Ok(self.inner.head(path).await?.size)
746    }
747
748    /// Convenience function to open a reader and read all the bytes
749    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
750        let reader = self.open(path).await?;
751        Ok(reader.get_all().await?)
752    }
753
754    /// Convenience function open a reader and make a single request
755    ///
756    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
757    /// and then call [`Reader::get_range`] multiple times.
758    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
759        let reader = self.open(path).await?;
760        Ok(reader.get_range(range).await?)
761    }
762}
763
764/// Options that can be set for multiple object stores
765#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
766pub enum LanceConfigKey {
767    /// Number of times to retry a download that fails
768    DownloadRetryCount,
769}
770
771impl FromStr for LanceConfigKey {
772    type Err = Error;
773
774    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
775        match s.to_ascii_lowercase().as_str() {
776            "download_retry_count" => Ok(Self::DownloadRetryCount),
777            _ => Err(Error::InvalidInput {
778                source: format!("Invalid LanceConfigKey: {}", s).into(),
779                location: location!(),
780            }),
781        }
782    }
783}
784
785#[derive(Clone, Debug, Default)]
786pub struct StorageOptions(pub HashMap<String, String>);
787
788impl StorageOptions {
789    /// Create a new instance of [`StorageOptions`]
790    pub fn new(options: HashMap<String, String>) -> Self {
791        let mut options = options;
792        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
793            options.insert("allow_http".into(), value);
794        }
795        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
796            options.insert("allow_http".into(), value);
797        }
798        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
799            options.insert("allow_http".into(), value);
800        }
801        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
802            options.insert("client_max_retries".into(), value);
803        }
804        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
805            options.insert("client_retry_timeout".into(), value);
806        }
807        Self(options)
808    }
809
810    /// Denotes if unsecure connections via http are allowed
811    pub fn allow_http(&self) -> bool {
812        self.0.iter().any(|(key, value)| {
813            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
814        })
815    }
816
817    /// Number of times to retry a download that fails
818    pub fn download_retry_count(&self) -> usize {
819        self.0
820            .iter()
821            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
822            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
823            .unwrap_or(3)
824    }
825
826    /// Max retry times to set in RetryConfig for object store client
827    pub fn client_max_retries(&self) -> usize {
828        self.0
829            .iter()
830            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
831            .and_then(|(_, value)| value.parse::<usize>().ok())
832            .unwrap_or(10)
833    }
834
835    /// Seconds of timeout to set in RetryConfig for object store client
836    pub fn client_retry_timeout(&self) -> u64 {
837        self.0
838            .iter()
839            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
840            .and_then(|(_, value)| value.parse::<u64>().ok())
841            .unwrap_or(180)
842    }
843
844    pub fn get(&self, key: &str) -> Option<&String> {
845        self.0.get(key)
846    }
847
848    /// Get the expiration time in milliseconds since epoch, if present
849    pub fn expires_at_millis(&self) -> Option<u64> {
850        self.0
851            .get(EXPIRES_AT_MILLIS_KEY)
852            .and_then(|s| s.parse::<u64>().ok())
853    }
854}
855
856impl From<HashMap<String, String>> for StorageOptions {
857    fn from(value: HashMap<String, String>) -> Self {
858        Self::new(value)
859    }
860}
861
862static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
863    std::sync::LazyLock::new(ObjectStoreRegistry::default);
864
865impl ObjectStore {
866    #[allow(clippy::too_many_arguments)]
867    pub fn new(
868        store: Arc<DynObjectStore>,
869        location: Url,
870        block_size: Option<usize>,
871        wrapper: Option<Arc<dyn WrappingObjectStore>>,
872        use_constant_size_upload_parts: bool,
873        list_is_lexically_ordered: bool,
874        io_parallelism: usize,
875        download_retry_count: usize,
876        storage_options: Option<&HashMap<String, String>>,
877    ) -> Self {
878        let scheme = location.scheme();
879        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
880        let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
881            Some(provider) => provider
882                .calculate_object_store_prefix(&location, storage_options)
883                .unwrap(),
884            None => {
885                let store_prefix = format!("{}${}", location.scheme(), location.authority());
886                log::warn!("Guessing that object store prefix is {}, since object store scheme is not found in registry.", store_prefix);
887                store_prefix
888            }
889        };
890        let store = match wrapper {
891            Some(wrapper) => wrapper.wrap(&store_prefix, store),
892            None => store,
893        };
894
895        // Always wrap with IO tracking
896        let io_tracker = IOTracker::default();
897        let tracked_store = io_tracker.wrap("", store);
898
899        Self {
900            inner: tracked_store,
901            scheme: scheme.into(),
902            block_size,
903            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
904            use_constant_size_upload_parts,
905            list_is_lexically_ordered,
906            io_parallelism,
907            download_retry_count,
908            io_tracker,
909            store_prefix,
910        }
911    }
912}
913
914fn infer_block_size(scheme: &str) -> usize {
915    // Block size: On local file systems, we use 4KB block size. On cloud
916    // object stores, we use 64KB block size. This is generally the largest
917    // block size where we don't see a latency penalty.
918    match scheme {
919        "file" => 4 * 1024,
920        _ => 64 * 1024,
921    }
922}
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927    use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
928    use object_store::memory::InMemory;
929    use rstest::rstest;
930    use std::env::set_current_dir;
931    use std::fs::{create_dir_all, write};
932    use std::path::Path as StdPath;
933    use std::sync::atomic::{AtomicBool, Ordering};
934
935    /// Write test content to file.
936    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
937        let expanded = tilde(path_str).to_string();
938        let path = StdPath::new(&expanded);
939        std::fs::create_dir_all(path.parent().unwrap())?;
940        write(path, contents)
941    }
942
943    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
944        let test_file_store = store.open(path).await.unwrap();
945        let size = test_file_store.size().await.unwrap();
946        let bytes = test_file_store.get_range(0..size).await.unwrap();
947        let contents = String::from_utf8(bytes.to_vec()).unwrap();
948        Ok(contents)
949    }
950
951    #[tokio::test]
952    async fn test_absolute_paths() {
953        let tmp_path = TempStrDir::default();
954        write_to_file(
955            &format!("{tmp_path}/bar/foo.lance/test_file"),
956            "TEST_CONTENT",
957        )
958        .unwrap();
959
960        // test a few variations of the same path
961        for uri in &[
962            format!("{tmp_path}/bar/foo.lance"),
963            format!("{tmp_path}/./bar/foo.lance"),
964            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
965        ] {
966            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
967            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
968                .await
969                .unwrap();
970            assert_eq!(contents, "TEST_CONTENT");
971        }
972    }
973
974    #[tokio::test]
975    async fn test_cloud_paths() {
976        let uri = "s3://bucket/foo.lance";
977        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
978        assert_eq!(store.scheme, "s3");
979        assert_eq!(path.to_string(), "foo.lance");
980
981        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
982            .await
983            .unwrap();
984        assert_eq!(store.scheme, "s3");
985        assert_eq!(path.to_string(), "foo.lance");
986
987        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
988            .await
989            .unwrap();
990        assert_eq!(store.scheme, "gs");
991        assert_eq!(path.to_string(), "foo.lance");
992    }
993
994    async fn test_block_size_used_test_helper(
995        uri: &str,
996        storage_options: Option<HashMap<String, String>>,
997        default_expected_block_size: usize,
998    ) {
999        // Test the default
1000        let registry = Arc::new(ObjectStoreRegistry::default());
1001        let accessor = storage_options
1002            .clone()
1003            .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1004        let params = ObjectStoreParams {
1005            storage_options_accessor: accessor.clone(),
1006            ..ObjectStoreParams::default()
1007        };
1008        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1009            .await
1010            .unwrap();
1011        assert_eq!(store.block_size, default_expected_block_size);
1012
1013        // Ensure param is used
1014        let registry = Arc::new(ObjectStoreRegistry::default());
1015        let params = ObjectStoreParams {
1016            block_size: Some(1024),
1017            storage_options_accessor: accessor,
1018            ..ObjectStoreParams::default()
1019        };
1020        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
1021            .await
1022            .unwrap();
1023        assert_eq!(store.block_size, 1024);
1024    }
1025
1026    #[rstest]
1027    #[case("s3://bucket/foo.lance", None)]
1028    #[case("gs://bucket/foo.lance", None)]
1029    #[case("az://account/bucket/foo.lance",
1030      Some(HashMap::from([
1031            (String::from("account_name"), String::from("account")),
1032            (String::from("container_name"), String::from("container"))
1033           ])))]
1034    #[tokio::test]
1035    async fn test_block_size_used_cloud(
1036        #[case] uri: &str,
1037        #[case] storage_options: Option<HashMap<String, String>>,
1038    ) {
1039        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1040    }
1041
1042    #[rstest]
1043    #[case("file")]
1044    #[case("file-object-store")]
1045    #[case("memory:///bucket/foo.lance")]
1046    #[tokio::test]
1047    async fn test_block_size_used_file(#[case] prefix: &str) {
1048        let tmp_path = TempStrDir::default();
1049        let path = format!("{tmp_path}/bar/foo.lance/test_file");
1050        write_to_file(&path, "URL").unwrap();
1051        let uri = format!("{prefix}:///{path}");
1052        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1053    }
1054
1055    #[tokio::test]
1056    async fn test_relative_paths() {
1057        let tmp_path = TempStrDir::default();
1058        write_to_file(
1059            &format!("{tmp_path}/bar/foo.lance/test_file"),
1060            "RELATIVE_URL",
1061        )
1062        .unwrap();
1063
1064        set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1065        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1066
1067        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1068            .await
1069            .unwrap();
1070        assert_eq!(contents, "RELATIVE_URL");
1071    }
1072
1073    #[tokio::test]
1074    async fn test_tilde_expansion() {
1075        let uri = "~/foo.lance";
1076        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1077        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1078        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1079            .await
1080            .unwrap();
1081        assert_eq!(contents, "TILDE");
1082    }
1083
1084    #[tokio::test]
1085    async fn test_read_directory() {
1086        let path = TempStdDir::default();
1087        create_dir_all(path.join("foo").join("bar")).unwrap();
1088        create_dir_all(path.join("foo").join("zoo")).unwrap();
1089        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1090        write_to_file(
1091            path.join("foo").join("test_file").to_str().unwrap(),
1092            "read_dir",
1093        )
1094        .unwrap();
1095        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1096
1097        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1098        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1099    }
1100
1101    #[tokio::test]
1102    async fn test_delete_directory() {
1103        let path = TempStdDir::default();
1104        create_dir_all(path.join("foo").join("bar")).unwrap();
1105        create_dir_all(path.join("foo").join("zoo")).unwrap();
1106        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1107        write_to_file(
1108            path.join("foo")
1109                .join("bar")
1110                .join("test_file")
1111                .to_str()
1112                .unwrap(),
1113            "delete",
1114        )
1115        .unwrap();
1116        write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
1117        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1118        store.remove_dir_all(base.child("foo")).await.unwrap();
1119
1120        assert!(!path.join("foo").exists());
1121    }
1122
1123    #[derive(Debug)]
1124    struct TestWrapper {
1125        called: AtomicBool,
1126
1127        return_value: Arc<dyn OSObjectStore>,
1128    }
1129
1130    impl WrappingObjectStore for TestWrapper {
1131        fn wrap(
1132            &self,
1133            _store_prefix: &str,
1134            _original: Arc<dyn OSObjectStore>,
1135        ) -> Arc<dyn OSObjectStore> {
1136            self.called.store(true, Ordering::Relaxed);
1137
1138            // return a mocked value so we can check if the final store is the one we expect
1139            self.return_value.clone()
1140        }
1141    }
1142
1143    impl TestWrapper {
1144        fn called(&self) -> bool {
1145            self.called.load(Ordering::Relaxed)
1146        }
1147    }
1148
1149    #[tokio::test]
1150    async fn test_wrapping_object_store_option_is_used() {
1151        // Make a store for the inner store first
1152        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1153        let registry = Arc::new(ObjectStoreRegistry::default());
1154
1155        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1156
1157        let wrapper = Arc::new(TestWrapper {
1158            called: AtomicBool::new(false),
1159            return_value: mock_inner_store.clone(),
1160        });
1161
1162        let params = ObjectStoreParams {
1163            object_store_wrapper: Some(wrapper.clone()),
1164            ..ObjectStoreParams::default()
1165        };
1166
1167        // not called yet
1168        assert!(!wrapper.called());
1169
1170        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
1171            .await
1172            .unwrap();
1173
1174        // called after construction
1175        assert!(wrapper.called());
1176
1177        // hard to compare two trait pointers as the point to vtables
1178        // using the ref count as a proxy to make sure that the store is correctly kept
1179        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1180    }
1181
1182    #[tokio::test]
1183    async fn test_local_paths() {
1184        let file_path = TempStdFile::default();
1185        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1186        writer.write_all(b"LOCAL").await.unwrap();
1187        writer.shutdown().await.unwrap();
1188
1189        let reader = ObjectStore::open_local(&file_path).await.unwrap();
1190        let buf = reader.get_range(0..5).await.unwrap();
1191        assert_eq!(buf.as_ref(), b"LOCAL");
1192    }
1193
1194    #[tokio::test]
1195    async fn test_read_one() {
1196        let file_path = TempStdFile::default();
1197        let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1198        writer.write_all(b"LOCAL").await.unwrap();
1199        writer.shutdown().await.unwrap();
1200
1201        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1202        let obj_store = ObjectStore::local();
1203        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1204        assert_eq!(buf.as_ref(), b"LOCAL");
1205
1206        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1207        assert_eq!(buf.as_ref(), b"LOCAL");
1208    }
1209
1210    #[tokio::test]
1211    #[cfg(windows)]
1212    async fn test_windows_paths() {
1213        use std::path::Component;
1214        use std::path::Prefix;
1215        use std::path::Prefix::*;
1216
1217        fn get_path_prefix(path: &StdPath) -> Prefix {
1218            match path.components().next().unwrap() {
1219                Component::Prefix(prefix_component) => prefix_component.kind(),
1220                _ => panic!(),
1221            }
1222        }
1223
1224        fn get_drive_letter(prefix: Prefix) -> String {
1225            match prefix {
1226                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1227                _ => panic!(),
1228            }
1229        }
1230
1231        let tmp_path = TempStdFile::default();
1232        let prefix = get_path_prefix(&tmp_path);
1233        let drive_letter = get_drive_letter(prefix);
1234
1235        write_to_file(
1236            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1237            "WINDOWS",
1238        )
1239        .unwrap();
1240
1241        for uri in &[
1242            format!("{drive_letter}:/test_folder/test.lance"),
1243            format!("{drive_letter}:\\test_folder\\test.lance"),
1244        ] {
1245            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1246            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1247                .await
1248                .unwrap();
1249            assert_eq!(contents, "WINDOWS");
1250        }
1251    }
1252
1253    #[tokio::test]
1254    async fn test_cross_filesystem_copy() {
1255        // Create two temporary directories that simulate different filesystems
1256        let source_dir = TempStdDir::default();
1257        let dest_dir = TempStdDir::default();
1258
1259        // Create a test file in the source directory
1260        let source_file_name = "test_file.txt";
1261        let source_file = source_dir.join(source_file_name);
1262        std::fs::write(&source_file, b"test content").unwrap();
1263
1264        // Create ObjectStore for local filesystem
1265        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1266            .await
1267            .unwrap();
1268
1269        // Create paths relative to the ObjectStore base
1270        let from_path = base_path.child(source_file_name);
1271
1272        // Use object_store::Path::parse for the destination
1273        let dest_file = dest_dir.join("copied_file.txt");
1274        let dest_str = dest_file.to_str().unwrap();
1275        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1276
1277        // Perform the copy operation
1278        store.copy(&from_path, &to_path).await.unwrap();
1279
1280        // Verify the file was copied correctly
1281        assert!(dest_file.exists());
1282        let copied_content = std::fs::read(&dest_file).unwrap();
1283        assert_eq!(copied_content, b"test content");
1284    }
1285
1286    #[tokio::test]
1287    async fn test_copy_creates_parent_directories() {
1288        let source_dir = TempStdDir::default();
1289        let dest_dir = TempStdDir::default();
1290
1291        // Create a test file in the source directory
1292        let source_file_name = "test_file.txt";
1293        let source_file = source_dir.join(source_file_name);
1294        std::fs::write(&source_file, b"test content").unwrap();
1295
1296        // Create ObjectStore for local filesystem
1297        let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1298            .await
1299            .unwrap();
1300
1301        // Create paths
1302        let from_path = base_path.child(source_file_name);
1303
1304        // Create destination with nested directories that don't exist yet
1305        let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1306        let dest_str = dest_file.to_str().unwrap();
1307        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1308
1309        // Perform the copy operation - should create parent directories
1310        store.copy(&from_path, &to_path).await.unwrap();
1311
1312        // Verify the file was copied correctly and directories were created
1313        assert!(dest_file.exists());
1314        assert!(dest_file.parent().unwrap().exists());
1315        let copied_content = std::fs::read(&dest_file).unwrap();
1316        assert_eq!(copied_content, b"test content");
1317    }
1318}