lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43// Local disks tend to do fine with a few threads
44// Note: the number of threads here also impacts the number of files
45// we need to read in some situations.  So keeping this at 8 keeps the
46// RAM on our scanner down.
47pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48// Cloud disks often need many many threads to saturate the network
49pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
52#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
54
55pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
56    std::env::var("LANCE_MAX_IOP_SIZE")
57        .map(|val| val.parse().unwrap())
58        .unwrap_or(16 * 1024 * 1024)
59});
60
61pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
62
63pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
64
65#[async_trait]
66pub trait ObjectStoreExt {
67    /// Returns true if the file exists.
68    async fn exists(&self, path: &Path) -> Result<bool>;
69
70    /// Read all files (start from base directory) recursively
71    ///
72    /// unmodified_since can be specified to only return files that have not been modified since the given time.
73    fn read_dir_all<'a, 'b>(
74        &'a self,
75        dir_path: impl Into<&'b Path> + Send,
76        unmodified_since: Option<DateTime<Utc>>,
77    ) -> BoxStream<'a, Result<ObjectMeta>>;
78}
79
80#[async_trait]
81impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
82    fn read_dir_all<'a, 'b>(
83        &'a self,
84        dir_path: impl Into<&'b Path> + Send,
85        unmodified_since: Option<DateTime<Utc>>,
86    ) -> BoxStream<'a, Result<ObjectMeta>> {
87        let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
88        if let Some(unmodified_since_val) = unmodified_since {
89            output
90                .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
91                .boxed()
92        } else {
93            output.boxed()
94        }
95    }
96
97    async fn exists(&self, path: &Path) -> Result<bool> {
98        match self.head(path).await {
99            Ok(_) => Ok(true),
100            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
101            Err(e) => Err(e.into()),
102        }
103    }
104}
105
106/// Wraps [ObjectStore](object_store::ObjectStore)
107#[derive(Debug, Clone)]
108pub struct ObjectStore {
109    // Inner object store
110    pub inner: Arc<dyn OSObjectStore>,
111    scheme: String,
112    block_size: usize,
113    max_iop_size: u64,
114    /// Whether to use constant size upload parts for multipart uploads. This
115    /// is only necessary for Cloudflare R2.
116    pub use_constant_size_upload_parts: bool,
117    /// Whether we can assume that the list of files is lexically ordered. This
118    /// is true for object stores, but not for local filesystems.
119    pub list_is_lexically_ordered: bool,
120    io_parallelism: usize,
121    /// Number of times to retry a failed download
122    download_retry_count: usize,
123}
124
125impl DeepSizeOf for ObjectStore {
126    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
127        // We aren't counting `inner` here which is problematic but an ObjectStore
128        // shouldn't be too big.  The only exception might be the write cache but, if
129        // the writer cache has data, it means we're using it somewhere else that isn't
130        // a cache and so that doesn't really count.
131        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
132    }
133}
134
135impl std::fmt::Display for ObjectStore {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        write!(f, "ObjectStore({})", self.scheme)
138    }
139}
140
141pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
142    fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
143}
144
145#[derive(Debug, Clone)]
146pub struct ChainedWrappingObjectStore {
147    wrappers: Vec<Arc<dyn WrappingObjectStore>>,
148}
149
150impl ChainedWrappingObjectStore {
151    pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
152        Self { wrappers }
153    }
154
155    pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
156        self.wrappers.push(wrapper);
157    }
158}
159
160impl WrappingObjectStore for ChainedWrappingObjectStore {
161    fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
162        self.wrappers
163            .iter()
164            .fold(original, |acc, wrapper| wrapper.wrap(acc))
165    }
166}
167
168/// Parameters to create an [ObjectStore]
169///
170#[derive(Debug, Clone)]
171pub struct ObjectStoreParams {
172    pub block_size: Option<usize>,
173    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
174    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
175    pub s3_credentials_refresh_offset: Duration,
176    #[cfg(feature = "aws")]
177    pub aws_credentials: Option<AwsCredentialProvider>,
178    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
179    pub storage_options: Option<HashMap<String, String>>,
180    /// Use constant size upload parts for multipart uploads. Only necessary
181    /// for Cloudflare R2, which doesn't support variable size parts. When this
182    /// is false, max upload size is 2.5TB. When this is true, the max size is
183    /// 50GB.
184    pub use_constant_size_upload_parts: bool,
185    pub list_is_lexically_ordered: Option<bool>,
186}
187
188impl Default for ObjectStoreParams {
189    fn default() -> Self {
190        #[allow(deprecated)]
191        Self {
192            object_store: None,
193            block_size: None,
194            s3_credentials_refresh_offset: Duration::from_secs(60),
195            #[cfg(feature = "aws")]
196            aws_credentials: None,
197            object_store_wrapper: None,
198            storage_options: None,
199            use_constant_size_upload_parts: false,
200            list_is_lexically_ordered: None,
201        }
202    }
203}
204
205// We implement hash for caching
206impl std::hash::Hash for ObjectStoreParams {
207    #[allow(deprecated)]
208    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
209        // For hashing, we use pointer values for ObjectStore, S3 credentials, and wrapper
210        self.block_size.hash(state);
211        if let Some((store, url)) = &self.object_store {
212            Arc::as_ptr(store).hash(state);
213            url.hash(state);
214        }
215        self.s3_credentials_refresh_offset.hash(state);
216        #[cfg(feature = "aws")]
217        if let Some(aws_credentials) = &self.aws_credentials {
218            Arc::as_ptr(aws_credentials).hash(state);
219        }
220        if let Some(wrapper) = &self.object_store_wrapper {
221            Arc::as_ptr(wrapper).hash(state);
222        }
223        if let Some(storage_options) = &self.storage_options {
224            for (key, value) in storage_options {
225                key.hash(state);
226                value.hash(state);
227            }
228        }
229        self.use_constant_size_upload_parts.hash(state);
230        self.list_is_lexically_ordered.hash(state);
231    }
232}
233
234// We implement eq for caching
235impl Eq for ObjectStoreParams {}
236impl PartialEq for ObjectStoreParams {
237    #[allow(deprecated)]
238    fn eq(&self, other: &Self) -> bool {
239        #[cfg(feature = "aws")]
240        if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
241            return false;
242        }
243
244        // For equality, we use pointer comparison for ObjectStore, S3 credentials, and wrapper
245        self.block_size == other.block_size
246            && self
247                .object_store
248                .as_ref()
249                .map(|(store, url)| (Arc::as_ptr(store), url))
250                == other
251                    .object_store
252                    .as_ref()
253                    .map(|(store, url)| (Arc::as_ptr(store), url))
254            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
255            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
256                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
257            && self.storage_options == other.storage_options
258            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
259            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
260    }
261}
262
263fn uri_to_url(uri: &str) -> Result<Url> {
264    match Url::parse(uri) {
265        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
266            // On Windows, the drive is parsed as a scheme
267            local_path_to_url(uri)
268        }
269        Ok(url) => Ok(url),
270        Err(_) => local_path_to_url(uri),
271    }
272}
273
274fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
275    let expanded = tilde(str_path.as_ref()).to_string();
276
277    let mut expanded_path = path_abs::PathAbs::new(expanded)
278        .unwrap()
279        .as_path()
280        .to_path_buf();
281    // path_abs::PathAbs::new(".") returns an empty string.
282    if let Some(s) = expanded_path.as_path().to_str() {
283        if s.is_empty() {
284            expanded_path = std::env::current_dir()?;
285        }
286    }
287
288    Ok(expanded_path)
289}
290
291fn local_path_to_url(str_path: &str) -> Result<Url> {
292    let expanded_path = expand_path(str_path)?;
293
294    Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
295        source: format!("Invalid table location: '{}'", str_path).into(),
296        location: location!(),
297    })
298}
299
300impl ObjectStore {
301    /// Parse from a string URI.
302    ///
303    /// Returns the ObjectStore instance and the absolute path to the object.
304    ///
305    /// This uses the default [ObjectStoreRegistry] to find the object store. To
306    /// allow for potential re-use of object store instances, it's recommended to
307    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
308    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
309        let registry = Arc::new(ObjectStoreRegistry::default());
310
311        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
312    }
313
314    /// Parse from a string URI.
315    ///
316    /// Returns the ObjectStore instance and the absolute path to the object.
317    pub async fn from_uri_and_params(
318        registry: Arc<ObjectStoreRegistry>,
319        uri: &str,
320        params: &ObjectStoreParams,
321    ) -> Result<(Arc<Self>, Path)> {
322        #[allow(deprecated)]
323        if let Some((store, path)) = params.object_store.as_ref() {
324            let mut inner = store.clone();
325            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
326                inner = wrapper.wrap(inner);
327            }
328            let store = Self {
329                inner,
330                scheme: path.scheme().to_string(),
331                block_size: params.block_size.unwrap_or(64 * 1024),
332                max_iop_size: *DEFAULT_MAX_IOP_SIZE,
333                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
334                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
335                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
336                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
337            };
338            let path = Path::parse(path.path())?;
339            return Ok((Arc::new(store), path));
340        }
341        let url = uri_to_url(uri)?;
342        let store = registry.get_store(url.clone(), params).await?;
343        // We know the scheme is valid if we got a store back.
344        let provider = registry.get_provider(url.scheme()).expect_ok()?;
345        let path = provider.extract_path(&url)?;
346
347        Ok((store, path))
348    }
349
350    #[deprecated(note = "Use `from_uri` instead")]
351    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
352        Self::from_uri_and_params(
353            Arc::new(ObjectStoreRegistry::default()),
354            str_path,
355            &Default::default(),
356        )
357        .now_or_never()
358        .unwrap()
359    }
360
361    /// Local object store.
362    pub fn local() -> Self {
363        let provider = FileStoreProvider;
364        provider
365            .new_store(Url::parse("file:///").unwrap(), &Default::default())
366            .now_or_never()
367            .unwrap()
368            .unwrap()
369    }
370
371    /// Create a in-memory object store directly for testing.
372    pub fn memory() -> Self {
373        let provider = MemoryStoreProvider;
374        provider
375            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
376            .now_or_never()
377            .unwrap()
378            .unwrap()
379    }
380
381    /// Returns true if the object store pointed to a local file system.
382    pub fn is_local(&self) -> bool {
383        self.scheme == "file"
384    }
385
386    pub fn is_cloud(&self) -> bool {
387        self.scheme != "file" && self.scheme != "memory"
388    }
389
390    pub fn scheme(&self) -> &str {
391        &self.scheme
392    }
393
394    pub fn block_size(&self) -> usize {
395        self.block_size
396    }
397
398    pub fn max_iop_size(&self) -> u64 {
399        self.max_iop_size
400    }
401
402    pub fn io_parallelism(&self) -> usize {
403        std::env::var("LANCE_IO_THREADS")
404            .map(|val| val.parse::<usize>().unwrap())
405            .unwrap_or(self.io_parallelism)
406    }
407
408    /// Open a file for path.
409    ///
410    /// Parameters
411    /// - ``path``: Absolute path to the file.
412    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
413        match self.scheme.as_str() {
414            "file" => LocalObjectReader::open(path, self.block_size, None).await,
415            _ => Ok(Box::new(CloudObjectReader::new(
416                self.inner.clone(),
417                path.clone(),
418                self.block_size,
419                None,
420                self.download_retry_count,
421            )?)),
422        }
423    }
424
425    /// Open a reader for a file with known size.
426    ///
427    /// This size may either have been retrieved from a list operation or
428    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
429    /// call.
430    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
431        // If we know the file is really small, we can read the whole thing
432        // as a single request.
433        if known_size <= self.block_size {
434            return Ok(Box::new(SmallReader::new(
435                self.inner.clone(),
436                path.clone(),
437                self.download_retry_count,
438                known_size,
439            )));
440        }
441
442        match self.scheme.as_str() {
443            "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
444            _ => Ok(Box::new(CloudObjectReader::new(
445                self.inner.clone(),
446                path.clone(),
447                self.block_size,
448                Some(known_size),
449                self.download_retry_count,
450            )?)),
451        }
452    }
453
454    /// Create an [ObjectWriter] from local [std::path::Path]
455    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
456        let object_store = Self::local();
457        let absolute_path = expand_path(path.to_string_lossy())?;
458        let os_path = Path::from_absolute_path(absolute_path)?;
459        object_store.create(&os_path).await
460    }
461
462    /// Open an [Reader] from local [std::path::Path]
463    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
464        let object_store = Self::local();
465        let absolute_path = expand_path(path.to_string_lossy())?;
466        let os_path = Path::from_absolute_path(absolute_path)?;
467        object_store.open(&os_path).await
468    }
469
470    /// Create a new file.
471    pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
472        ObjectWriter::new(self, path).await
473    }
474
475    /// A helper function to create a file and write content to it.
476    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
477        let mut writer = self.create(path).await?;
478        writer.write_all(content).await?;
479        writer.shutdown().await
480    }
481
482    pub async fn delete(&self, path: &Path) -> Result<()> {
483        self.inner.delete(path).await?;
484        Ok(())
485    }
486
487    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
488        if self.is_local() {
489            // Use std::fs::copy for local filesystem to support cross-filesystem copies
490            return super::local::copy_file(from, to);
491        }
492        Ok(self.inner.copy(from, to).await?)
493    }
494
495    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
496    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
497        let path = dir_path.into();
498        let path = Path::parse(&path)?;
499        let output = self.inner.list_with_delimiter(Some(&path)).await?;
500        Ok(output
501            .common_prefixes
502            .iter()
503            .chain(output.objects.iter().map(|o| &o.location))
504            .map(|s| s.filename().unwrap().to_string())
505            .collect())
506    }
507
508    pub fn list(
509        &self,
510        path: Option<Path>,
511    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
512        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
513    }
514
515    /// Read all files (start from base directory) recursively
516    ///
517    /// unmodified_since can be specified to only return files that have not been modified since the given time.
518    pub fn read_dir_all<'a, 'b>(
519        &'a self,
520        dir_path: impl Into<&'b Path> + Send,
521        unmodified_since: Option<DateTime<Utc>>,
522    ) -> BoxStream<'a, Result<ObjectMeta>> {
523        self.inner.read_dir_all(dir_path, unmodified_since)
524    }
525
526    /// Remove a directory recursively.
527    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
528        let path = dir_path.into();
529        let path = Path::parse(&path)?;
530
531        if self.is_local() {
532            // Local file system needs to delete directories as well.
533            return super::local::remove_dir_all(&path);
534        }
535        let sub_entries = self
536            .inner
537            .list(Some(&path))
538            .map(|m| m.map(|meta| meta.location))
539            .boxed();
540        self.inner
541            .delete_stream(sub_entries)
542            .try_collect::<Vec<_>>()
543            .await?;
544        Ok(())
545    }
546
547    pub fn remove_stream<'a>(
548        &'a self,
549        locations: BoxStream<'a, Result<Path>>,
550    ) -> BoxStream<'a, Result<Path>> {
551        self.inner
552            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
553            .err_into::<Error>()
554            .boxed()
555    }
556
557    /// Check a file exists.
558    pub async fn exists(&self, path: &Path) -> Result<bool> {
559        match self.inner.head(path).await {
560            Ok(_) => Ok(true),
561            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
562            Err(e) => Err(e.into()),
563        }
564    }
565
566    /// Get file size.
567    pub async fn size(&self, path: &Path) -> Result<u64> {
568        Ok(self.inner.head(path).await?.size)
569    }
570
571    /// Convenience function to open a reader and read all the bytes
572    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
573        let reader = self.open(path).await?;
574        Ok(reader.get_all().await?)
575    }
576
577    /// Convenience function open a reader and make a single request
578    ///
579    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
580    /// and then call [`Reader::get_range`] multiple times.
581    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
582        let reader = self.open(path).await?;
583        Ok(reader.get_range(range).await?)
584    }
585}
586
587/// Options that can be set for multiple object stores
588#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
589pub enum LanceConfigKey {
590    /// Number of times to retry a download that fails
591    DownloadRetryCount,
592}
593
594impl FromStr for LanceConfigKey {
595    type Err = Error;
596
597    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
598        match s.to_ascii_lowercase().as_str() {
599            "download_retry_count" => Ok(Self::DownloadRetryCount),
600            _ => Err(Error::InvalidInput {
601                source: format!("Invalid LanceConfigKey: {}", s).into(),
602                location: location!(),
603            }),
604        }
605    }
606}
607
608#[derive(Clone, Debug, Default)]
609pub struct StorageOptions(pub HashMap<String, String>);
610
611impl StorageOptions {
612    /// Create a new instance of [`StorageOptions`]
613    pub fn new(options: HashMap<String, String>) -> Self {
614        let mut options = options;
615        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
616            options.insert("allow_http".into(), value);
617        }
618        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
619            options.insert("allow_http".into(), value);
620        }
621        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
622            options.insert("allow_http".into(), value);
623        }
624        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
625            options.insert("client_max_retries".into(), value);
626        }
627        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
628            options.insert("client_retry_timeout".into(), value);
629        }
630        Self(options)
631    }
632
633    /// Denotes if unsecure connections via http are allowed
634    pub fn allow_http(&self) -> bool {
635        self.0.iter().any(|(key, value)| {
636            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
637        })
638    }
639
640    /// Number of times to retry a download that fails
641    pub fn download_retry_count(&self) -> usize {
642        self.0
643            .iter()
644            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
645            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
646            .unwrap_or(3)
647    }
648
649    /// Max retry times to set in RetryConfig for object store client
650    pub fn client_max_retries(&self) -> usize {
651        self.0
652            .iter()
653            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
654            .and_then(|(_, value)| value.parse::<usize>().ok())
655            .unwrap_or(10)
656    }
657
658    /// Seconds of timeout to set in RetryConfig for object store client
659    pub fn client_retry_timeout(&self) -> u64 {
660        self.0
661            .iter()
662            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
663            .and_then(|(_, value)| value.parse::<u64>().ok())
664            .unwrap_or(180)
665    }
666
667    pub fn get(&self, key: &str) -> Option<&String> {
668        self.0.get(key)
669    }
670}
671
672impl From<HashMap<String, String>> for StorageOptions {
673    fn from(value: HashMap<String, String>) -> Self {
674        Self::new(value)
675    }
676}
677
678impl ObjectStore {
679    #[allow(clippy::too_many_arguments)]
680    pub fn new(
681        store: Arc<DynObjectStore>,
682        location: Url,
683        block_size: Option<usize>,
684        wrapper: Option<Arc<dyn WrappingObjectStore>>,
685        use_constant_size_upload_parts: bool,
686        list_is_lexically_ordered: bool,
687        io_parallelism: usize,
688        download_retry_count: usize,
689    ) -> Self {
690        let scheme = location.scheme();
691        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
692
693        let store = match wrapper {
694            Some(wrapper) => wrapper.wrap(store),
695            None => store,
696        };
697
698        Self {
699            inner: store,
700            scheme: scheme.into(),
701            block_size,
702            max_iop_size: *DEFAULT_MAX_IOP_SIZE,
703            use_constant_size_upload_parts,
704            list_is_lexically_ordered,
705            io_parallelism,
706            download_retry_count,
707        }
708    }
709}
710
711fn infer_block_size(scheme: &str) -> usize {
712    // Block size: On local file systems, we use 4KB block size. On cloud
713    // object stores, we use 64KB block size. This is generally the largest
714    // block size where we don't see a latency penalty.
715    match scheme {
716        "file" => 4 * 1024,
717        _ => 64 * 1024,
718    }
719}
720
721#[cfg(test)]
722mod tests {
723    use super::*;
724    use object_store::memory::InMemory;
725    use rstest::rstest;
726    use std::env::set_current_dir;
727    use std::fs::{create_dir_all, write};
728    use std::path::Path as StdPath;
729    use std::sync::atomic::{AtomicBool, Ordering};
730
731    /// Write test content to file.
732    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
733        let expanded = tilde(path_str).to_string();
734        let path = StdPath::new(&expanded);
735        std::fs::create_dir_all(path.parent().unwrap())?;
736        write(path, contents)
737    }
738
739    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
740        let test_file_store = store.open(path).await.unwrap();
741        let size = test_file_store.size().await.unwrap();
742        let bytes = test_file_store.get_range(0..size).await.unwrap();
743        let contents = String::from_utf8(bytes.to_vec()).unwrap();
744        Ok(contents)
745    }
746
747    #[tokio::test]
748    async fn test_absolute_paths() {
749        let tmp_dir = tempfile::tempdir().unwrap();
750        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
751        write_to_file(
752            &format!("{tmp_path}/bar/foo.lance/test_file"),
753            "TEST_CONTENT",
754        )
755        .unwrap();
756
757        // test a few variations of the same path
758        for uri in &[
759            format!("{tmp_path}/bar/foo.lance"),
760            format!("{tmp_path}/./bar/foo.lance"),
761            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
762        ] {
763            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
764            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
765                .await
766                .unwrap();
767            assert_eq!(contents, "TEST_CONTENT");
768        }
769    }
770
771    #[tokio::test]
772    async fn test_cloud_paths() {
773        let uri = "s3://bucket/foo.lance";
774        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
775        assert_eq!(store.scheme, "s3");
776        assert_eq!(path.to_string(), "foo.lance");
777
778        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
779            .await
780            .unwrap();
781        assert_eq!(store.scheme, "s3");
782        assert_eq!(path.to_string(), "foo.lance");
783
784        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
785            .await
786            .unwrap();
787        assert_eq!(store.scheme, "gs");
788        assert_eq!(path.to_string(), "foo.lance");
789    }
790
791    async fn test_block_size_used_test_helper(
792        uri: &str,
793        storage_options: Option<HashMap<String, String>>,
794        default_expected_block_size: usize,
795    ) {
796        // Test the default
797        let registry = Arc::new(ObjectStoreRegistry::default());
798        let params = ObjectStoreParams {
799            storage_options: storage_options.clone(),
800            ..ObjectStoreParams::default()
801        };
802        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
803            .await
804            .unwrap();
805        assert_eq!(store.block_size, default_expected_block_size);
806
807        // Ensure param is used
808        let registry = Arc::new(ObjectStoreRegistry::default());
809        let params = ObjectStoreParams {
810            block_size: Some(1024),
811            storage_options: storage_options.clone(),
812            ..ObjectStoreParams::default()
813        };
814        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
815            .await
816            .unwrap();
817        assert_eq!(store.block_size, 1024);
818    }
819
820    #[rstest]
821    #[case("s3://bucket/foo.lance", None)]
822    #[case("gs://bucket/foo.lance", None)]
823    #[case("az://account/bucket/foo.lance",
824      Some(HashMap::from([
825            (String::from("account_name"), String::from("account")),
826            (String::from("container_name"), String::from("container"))
827           ])))]
828    #[tokio::test]
829    async fn test_block_size_used_cloud(
830        #[case] uri: &str,
831        #[case] storage_options: Option<HashMap<String, String>>,
832    ) {
833        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
834    }
835
836    #[rstest]
837    #[case("file")]
838    #[case("file-object-store")]
839    #[case("memory:///bucket/foo.lance")]
840    #[tokio::test]
841    async fn test_block_size_used_file(#[case] prefix: &str) {
842        let tmp_dir = tempfile::tempdir().unwrap();
843        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
844        let path = format!("{tmp_path}/bar/foo.lance/test_file");
845        write_to_file(&path, "URL").unwrap();
846        let uri = format!("{prefix}:///{path}");
847        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
848    }
849
850    #[tokio::test]
851    async fn test_relative_paths() {
852        let tmp_dir = tempfile::tempdir().unwrap();
853        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
854        write_to_file(
855            &format!("{tmp_path}/bar/foo.lance/test_file"),
856            "RELATIVE_URL",
857        )
858        .unwrap();
859
860        set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
861        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
862
863        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
864            .await
865            .unwrap();
866        assert_eq!(contents, "RELATIVE_URL");
867    }
868
869    #[tokio::test]
870    async fn test_tilde_expansion() {
871        let uri = "~/foo.lance";
872        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
873        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
874        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
875            .await
876            .unwrap();
877        assert_eq!(contents, "TILDE");
878    }
879
880    #[tokio::test]
881    async fn test_read_directory() {
882        let tmp_dir = tempfile::tempdir().unwrap();
883        let path = tmp_dir.path();
884        create_dir_all(path.join("foo").join("bar")).unwrap();
885        create_dir_all(path.join("foo").join("zoo")).unwrap();
886        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
887        write_to_file(
888            path.join("foo").join("test_file").to_str().unwrap(),
889            "read_dir",
890        )
891        .unwrap();
892        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
893
894        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
895        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
896    }
897
898    #[tokio::test]
899    async fn test_delete_directory() {
900        let tmp_dir = tempfile::tempdir().unwrap();
901        let path = tmp_dir.path();
902        create_dir_all(path.join("foo").join("bar")).unwrap();
903        create_dir_all(path.join("foo").join("zoo")).unwrap();
904        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
905        write_to_file(
906            path.join("foo")
907                .join("bar")
908                .join("test_file")
909                .to_str()
910                .unwrap(),
911            "delete",
912        )
913        .unwrap();
914        write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
915        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
916        store.remove_dir_all(base.child("foo")).await.unwrap();
917
918        assert!(!path.join("foo").exists());
919    }
920
921    #[derive(Debug)]
922    struct TestWrapper {
923        called: AtomicBool,
924
925        return_value: Arc<dyn OSObjectStore>,
926    }
927
928    impl WrappingObjectStore for TestWrapper {
929        fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
930            self.called.store(true, Ordering::Relaxed);
931
932            // return a mocked value so we can check if the final store is the one we expect
933            self.return_value.clone()
934        }
935    }
936
937    impl TestWrapper {
938        fn called(&self) -> bool {
939            self.called.load(Ordering::Relaxed)
940        }
941    }
942
943    #[tokio::test]
944    async fn test_wrapping_object_store_option_is_used() {
945        // Make a store for the inner store first
946        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
947        let registry = Arc::new(ObjectStoreRegistry::default());
948
949        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
950
951        let wrapper = Arc::new(TestWrapper {
952            called: AtomicBool::new(false),
953            return_value: mock_inner_store.clone(),
954        });
955
956        let params = ObjectStoreParams {
957            object_store_wrapper: Some(wrapper.clone()),
958            ..ObjectStoreParams::default()
959        };
960
961        // not called yet
962        assert!(!wrapper.called());
963
964        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
965            .await
966            .unwrap();
967
968        // called after construction
969        assert!(wrapper.called());
970
971        // hard to compare two trait pointers as the point to vtables
972        // using the ref count as a proxy to make sure that the store is correctly kept
973        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
974    }
975
976    #[tokio::test]
977    async fn test_local_paths() {
978        let temp_dir = tempfile::tempdir().unwrap();
979
980        let file_path = temp_dir.path().join("test_file");
981        let mut writer = ObjectStore::create_local_writer(file_path.as_path())
982            .await
983            .unwrap();
984        writer.write_all(b"LOCAL").await.unwrap();
985        writer.shutdown().await.unwrap();
986
987        let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
988        let buf = reader.get_range(0..5).await.unwrap();
989        assert_eq!(buf.as_ref(), b"LOCAL");
990    }
991
992    #[tokio::test]
993    async fn test_read_one() {
994        let temp_dir = tempfile::tempdir().unwrap();
995
996        let file_path = temp_dir.path().join("test_file");
997        let mut writer = ObjectStore::create_local_writer(file_path.as_path())
998            .await
999            .unwrap();
1000        writer.write_all(b"LOCAL").await.unwrap();
1001        writer.shutdown().await.unwrap();
1002
1003        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1004        let obj_store = ObjectStore::local();
1005        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1006        assert_eq!(buf.as_ref(), b"LOCAL");
1007
1008        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1009        assert_eq!(buf.as_ref(), b"LOCAL");
1010    }
1011
1012    #[tokio::test]
1013    #[cfg(windows)]
1014    async fn test_windows_paths() {
1015        use std::path::Component;
1016        use std::path::Prefix;
1017        use std::path::Prefix::*;
1018
1019        fn get_path_prefix(path: &StdPath) -> Prefix {
1020            match path.components().next().unwrap() {
1021                Component::Prefix(prefix_component) => prefix_component.kind(),
1022                _ => panic!(),
1023            }
1024        }
1025
1026        fn get_drive_letter(prefix: Prefix) -> String {
1027            match prefix {
1028                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1029                _ => panic!(),
1030            }
1031        }
1032
1033        let tmp_dir = tempfile::tempdir().unwrap();
1034        let tmp_path = tmp_dir.path();
1035        let prefix = get_path_prefix(tmp_path);
1036        let drive_letter = get_drive_letter(prefix);
1037
1038        write_to_file(
1039            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1040            "WINDOWS",
1041        )
1042        .unwrap();
1043
1044        for uri in &[
1045            format!("{drive_letter}:/test_folder/test.lance"),
1046            format!("{drive_letter}:\\test_folder\\test.lance"),
1047        ] {
1048            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1049            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1050                .await
1051                .unwrap();
1052            assert_eq!(contents, "WINDOWS");
1053        }
1054    }
1055
1056    #[tokio::test]
1057    async fn test_cross_filesystem_copy() {
1058        // Create two temporary directories that simulate different filesystems
1059        let source_dir = tempfile::tempdir().unwrap();
1060        let dest_dir = tempfile::tempdir().unwrap();
1061
1062        // Create a test file in the source directory
1063        let source_file_name = "test_file.txt";
1064        let source_file = source_dir.path().join(source_file_name);
1065        std::fs::write(&source_file, b"test content").unwrap();
1066
1067        // Create ObjectStore for local filesystem
1068        let (store, base_path) = ObjectStore::from_uri(source_dir.path().to_str().unwrap())
1069            .await
1070            .unwrap();
1071
1072        // Create paths relative to the ObjectStore base
1073        let from_path = base_path.child(source_file_name);
1074
1075        // Use object_store::Path::parse for the destination
1076        let dest_file = dest_dir.path().join("copied_file.txt");
1077        let dest_str = dest_file.to_str().unwrap();
1078        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1079
1080        // Perform the copy operation
1081        store.copy(&from_path, &to_path).await.unwrap();
1082
1083        // Verify the file was copied correctly
1084        assert!(dest_file.exists());
1085        let copied_content = std::fs::read(&dest_file).unwrap();
1086        assert_eq!(copied_content, b"test content");
1087    }
1088
1089    #[tokio::test]
1090    async fn test_copy_creates_parent_directories() {
1091        let source_dir = tempfile::tempdir().unwrap();
1092        let dest_dir = tempfile::tempdir().unwrap();
1093
1094        // Create a test file in the source directory
1095        let source_file_name = "test_file.txt";
1096        let source_file = source_dir.path().join(source_file_name);
1097        std::fs::write(&source_file, b"test content").unwrap();
1098
1099        // Create ObjectStore for local filesystem
1100        let (store, base_path) = ObjectStore::from_uri(source_dir.path().to_str().unwrap())
1101            .await
1102            .unwrap();
1103
1104        // Create paths
1105        let from_path = base_path.child(source_file_name);
1106
1107        // Create destination with nested directories that don't exist yet
1108        let dest_file = dest_dir
1109            .path()
1110            .join("nested")
1111            .join("dirs")
1112            .join("copied_file.txt");
1113        let dest_str = dest_file.to_str().unwrap();
1114        let to_path = object_store::path::Path::parse(dest_str).unwrap();
1115
1116        // Perform the copy operation - should create parent directories
1117        store.copy(&from_path, &to_path).await.unwrap();
1118
1119        // Verify the file was copied correctly and directories were created
1120        assert!(dest_file.exists());
1121        assert!(dest_file.parent().unwrap().exists());
1122        let copied_content = std::fs::read(&dest_file).unwrap();
1123        assert_eq!(copied_content, b"test content");
1124    }
1125}