lance_io/
object_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend [object_store::ObjectStore] functionalities
5
6use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_writer::WriteResult;
39use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
40use lance_core::{Error, Result};
41
42// Local disks tend to do fine with a few threads
43// Note: the number of threads here also impacts the number of files
44// we need to read in some situations.  So keeping this at 8 keeps the
45// RAM on our scanner down.
46pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
47// Cloud disks often need many many threads to saturate the network
48pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
49
50const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size
51#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
52const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size
53
54pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
55
56pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
57
58#[async_trait]
59pub trait ObjectStoreExt {
60    /// Returns true if the file exists.
61    async fn exists(&self, path: &Path) -> Result<bool>;
62
63    /// Read all files (start from base directory) recursively
64    ///
65    /// unmodified_since can be specified to only return files that have not been modified since the given time.
66    async fn read_dir_all<'a>(
67        &'a self,
68        dir_path: impl Into<&Path> + Send,
69        unmodified_since: Option<DateTime<Utc>>,
70    ) -> Result<BoxStream<'a, Result<ObjectMeta>>>;
71}
72
73#[async_trait]
74impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
75    async fn read_dir_all<'a>(
76        &'a self,
77        dir_path: impl Into<&Path> + Send,
78        unmodified_since: Option<DateTime<Utc>>,
79    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
80        let mut output = self.list(Some(dir_path.into()));
81        if let Some(unmodified_since_val) = unmodified_since {
82            output = output
83                .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
84                .boxed();
85        }
86        Ok(output.map_err(|e| e.into()).boxed())
87    }
88
89    async fn exists(&self, path: &Path) -> Result<bool> {
90        match self.head(path).await {
91            Ok(_) => Ok(true),
92            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
93            Err(e) => Err(e.into()),
94        }
95    }
96}
97
98/// Wraps [ObjectStore](object_store::ObjectStore)
99#[derive(Debug, Clone)]
100pub struct ObjectStore {
101    // Inner object store
102    pub inner: Arc<dyn OSObjectStore>,
103    scheme: String,
104    block_size: usize,
105    /// Whether to use constant size upload parts for multipart uploads. This
106    /// is only necessary for Cloudflare R2.
107    pub use_constant_size_upload_parts: bool,
108    /// Whether we can assume that the list of files is lexically ordered. This
109    /// is true for object stores, but not for local filesystems.
110    pub list_is_lexically_ordered: bool,
111    io_parallelism: usize,
112    /// Number of times to retry a failed download
113    download_retry_count: usize,
114}
115
116impl DeepSizeOf for ObjectStore {
117    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
118        // We aren't counting `inner` here which is problematic but an ObjectStore
119        // shouldn't be too big.  The only exception might be the write cache but, if
120        // the writer cache has data, it means we're using it somewhere else that isn't
121        // a cache and so that doesn't really count.
122        self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
123    }
124}
125
126impl std::fmt::Display for ObjectStore {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(f, "ObjectStore({})", self.scheme)
129    }
130}
131
132pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
133    fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
134}
135
136/// Parameters to create an [ObjectStore]
137///
138#[derive(Debug, Clone)]
139pub struct ObjectStoreParams {
140    pub block_size: Option<usize>,
141    #[deprecated(note = "Implement an ObjectStoreProvider instead")]
142    pub object_store: Option<(Arc<DynObjectStore>, Url)>,
143    pub s3_credentials_refresh_offset: Duration,
144    #[cfg(feature = "aws")]
145    pub aws_credentials: Option<AwsCredentialProvider>,
146    pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
147    pub storage_options: Option<HashMap<String, String>>,
148    /// Use constant size upload parts for multipart uploads. Only necessary
149    /// for Cloudflare R2, which doesn't support variable size parts. When this
150    /// is false, max upload size is 2.5TB. When this is true, the max size is
151    /// 50GB.
152    pub use_constant_size_upload_parts: bool,
153    pub list_is_lexically_ordered: Option<bool>,
154}
155
156impl Default for ObjectStoreParams {
157    fn default() -> Self {
158        #[allow(deprecated)]
159        Self {
160            object_store: None,
161            block_size: None,
162            s3_credentials_refresh_offset: Duration::from_secs(60),
163            #[cfg(feature = "aws")]
164            aws_credentials: None,
165            object_store_wrapper: None,
166            storage_options: None,
167            use_constant_size_upload_parts: false,
168            list_is_lexically_ordered: None,
169        }
170    }
171}
172
173// We implement hash for caching
174impl std::hash::Hash for ObjectStoreParams {
175    #[allow(deprecated)]
176    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
177        // For hashing, we use pointer values for ObjectStore, S3 credentials, and wrapper
178        self.block_size.hash(state);
179        if let Some((store, url)) = &self.object_store {
180            Arc::as_ptr(store).hash(state);
181            url.hash(state);
182        }
183        self.s3_credentials_refresh_offset.hash(state);
184        #[cfg(feature = "aws")]
185        if let Some(aws_credentials) = &self.aws_credentials {
186            Arc::as_ptr(aws_credentials).hash(state);
187        }
188        if let Some(wrapper) = &self.object_store_wrapper {
189            Arc::as_ptr(wrapper).hash(state);
190        }
191        if let Some(storage_options) = &self.storage_options {
192            for (key, value) in storage_options {
193                key.hash(state);
194                value.hash(state);
195            }
196        }
197        self.use_constant_size_upload_parts.hash(state);
198        self.list_is_lexically_ordered.hash(state);
199    }
200}
201
202// We implement eq for caching
203impl Eq for ObjectStoreParams {}
204impl PartialEq for ObjectStoreParams {
205    #[allow(deprecated)]
206    fn eq(&self, other: &Self) -> bool {
207        // For equality, we use pointer comparison for ObjectStore, S3 credentials, and wrapper
208        self.block_size == other.block_size
209            && self
210                .object_store
211                .as_ref()
212                .map(|(store, url)| (Arc::as_ptr(store), url))
213                == other
214                    .object_store
215                    .as_ref()
216                    .map(|(store, url)| (Arc::as_ptr(store), url))
217            && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
218            && self.aws_credentials.as_ref().map(Arc::as_ptr)
219                == other.aws_credentials.as_ref().map(Arc::as_ptr)
220            && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
221                == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
222            && self.storage_options == other.storage_options
223            && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
224            && self.list_is_lexically_ordered == other.list_is_lexically_ordered
225    }
226}
227
228fn uri_to_url(uri: &str) -> Result<Url> {
229    match Url::parse(uri) {
230        Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
231            // On Windows, the drive is parsed as a scheme
232            local_path_to_url(uri)
233        }
234        Ok(url) => Ok(url),
235        Err(_) => local_path_to_url(uri),
236    }
237}
238
239fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
240    let expanded = tilde(str_path.as_ref()).to_string();
241
242    let mut expanded_path = path_abs::PathAbs::new(expanded)
243        .unwrap()
244        .as_path()
245        .to_path_buf();
246    // path_abs::PathAbs::new(".") returns an empty string.
247    if let Some(s) = expanded_path.as_path().to_str() {
248        if s.is_empty() {
249            expanded_path = std::env::current_dir()?;
250        }
251    }
252
253    Ok(expanded_path)
254}
255
256fn local_path_to_url(str_path: &str) -> Result<Url> {
257    let expanded_path = expand_path(str_path)?;
258
259    Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
260        source: format!("Invalid table location: '{}'", str_path).into(),
261        location: location!(),
262    })
263}
264
265impl ObjectStore {
266    /// Parse from a string URI.
267    ///
268    /// Returns the ObjectStore instance and the absolute path to the object.
269    ///
270    /// This uses the default [ObjectStoreRegistry] to find the object store. To
271    /// allow for potential re-use of object store instances, it's recommended to
272    /// create a shared [ObjectStoreRegistry] and pass that to [Self::from_uri_and_params].
273    pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
274        let registry = Arc::new(ObjectStoreRegistry::default());
275
276        Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
277    }
278
279    /// Parse from a string URI.
280    ///
281    /// Returns the ObjectStore instance and the absolute path to the object.
282    pub async fn from_uri_and_params(
283        registry: Arc<ObjectStoreRegistry>,
284        uri: &str,
285        params: &ObjectStoreParams,
286    ) -> Result<(Arc<Self>, Path)> {
287        #[allow(deprecated)]
288        if let Some((store, path)) = params.object_store.as_ref() {
289            let mut inner = store.clone();
290            if let Some(wrapper) = params.object_store_wrapper.as_ref() {
291                inner = wrapper.wrap(inner);
292            }
293            let store = Self {
294                inner,
295                scheme: path.scheme().to_string(),
296                block_size: params.block_size.unwrap_or(64 * 1024),
297                use_constant_size_upload_parts: params.use_constant_size_upload_parts,
298                list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
299                io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
300                download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
301            };
302            let path = Path::from(path.path());
303            return Ok((Arc::new(store), path));
304        }
305        let url = uri_to_url(uri)?;
306        let store = registry.get_store(url.clone(), params).await?;
307        // We know the scheme is valid if we got a store back.
308        let provider = registry.get_provider(url.scheme()).expect_ok()?;
309        let path = provider.extract_path(&url);
310
311        Ok((store, path))
312    }
313
314    #[deprecated(note = "Use `from_uri` instead")]
315    pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
316        Self::from_uri_and_params(
317            Arc::new(ObjectStoreRegistry::default()),
318            str_path,
319            &Default::default(),
320        )
321        .now_or_never()
322        .unwrap()
323    }
324
325    /// Local object store.
326    pub fn local() -> Self {
327        let provider = FileStoreProvider;
328        provider
329            .new_store(Url::parse("file:///").unwrap(), &Default::default())
330            .now_or_never()
331            .unwrap()
332            .unwrap()
333    }
334
335    /// Create a in-memory object store directly for testing.
336    pub fn memory() -> Self {
337        let provider = MemoryStoreProvider;
338        provider
339            .new_store(Url::parse("memory:///").unwrap(), &Default::default())
340            .now_or_never()
341            .unwrap()
342            .unwrap()
343    }
344
345    /// Returns true if the object store pointed to a local file system.
346    pub fn is_local(&self) -> bool {
347        self.scheme == "file"
348    }
349
350    pub fn is_cloud(&self) -> bool {
351        self.scheme != "file" && self.scheme != "memory"
352    }
353
354    pub fn block_size(&self) -> usize {
355        self.block_size
356    }
357
358    pub fn io_parallelism(&self) -> usize {
359        std::env::var("LANCE_IO_THREADS")
360            .map(|val| val.parse::<usize>().unwrap())
361            .unwrap_or(self.io_parallelism)
362    }
363
364    /// Open a file for path.
365    ///
366    /// Parameters
367    /// - ``path``: Absolute path to the file.
368    pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
369        match self.scheme.as_str() {
370            "file" => LocalObjectReader::open(path, self.block_size, None).await,
371            _ => Ok(Box::new(CloudObjectReader::new(
372                self.inner.clone(),
373                path.clone(),
374                self.block_size,
375                None,
376                self.download_retry_count,
377            )?)),
378        }
379    }
380
381    /// Open a reader for a file with known size.
382    ///
383    /// This size may either have been retrieved from a list operation or
384    /// cached metadata. By passing in the known size, we can skip a HEAD / metadata
385    /// call.
386    pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
387        match self.scheme.as_str() {
388            "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
389            _ => Ok(Box::new(CloudObjectReader::new(
390                self.inner.clone(),
391                path.clone(),
392                self.block_size,
393                Some(known_size),
394                self.download_retry_count,
395            )?)),
396        }
397    }
398
399    /// Create an [ObjectWriter] from local [std::path::Path]
400    pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
401        let object_store = Self::local();
402        let absolute_path = expand_path(path.to_string_lossy())?;
403        let os_path = Path::from_absolute_path(absolute_path)?;
404        object_store.create(&os_path).await
405    }
406
407    /// Open an [Reader] from local [std::path::Path]
408    pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
409        let object_store = Self::local();
410        let absolute_path = expand_path(path.to_string_lossy())?;
411        let os_path = Path::from_absolute_path(absolute_path)?;
412        object_store.open(&os_path).await
413    }
414
415    /// Create a new file.
416    pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
417        ObjectWriter::new(self, path).await
418    }
419
420    /// A helper function to create a file and write content to it.
421    pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
422        let mut writer = self.create(path).await?;
423        writer.write_all(content).await?;
424        writer.shutdown().await
425    }
426
427    pub async fn delete(&self, path: &Path) -> Result<()> {
428        self.inner.delete(path).await?;
429        Ok(())
430    }
431
432    pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
433        Ok(self.inner.copy(from, to).await?)
434    }
435
436    /// Read a directory (start from base directory) and returns all sub-paths in the directory.
437    pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
438        let path = dir_path.into();
439        let path = Path::parse(&path)?;
440        let output = self.inner.list_with_delimiter(Some(&path)).await?;
441        Ok(output
442            .common_prefixes
443            .iter()
444            .chain(output.objects.iter().map(|o| &o.location))
445            .map(|s| s.filename().unwrap().to_string())
446            .collect())
447    }
448
449    pub fn list(
450        &self,
451        path: Option<Path>,
452    ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
453        Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
454    }
455
456    /// Read all files (start from base directory) recursively
457    ///
458    /// unmodified_since can be specified to only return files that have not been modified since the given time.
459    pub async fn read_dir_all(
460        &self,
461        dir_path: impl Into<&Path> + Send,
462        unmodified_since: Option<DateTime<Utc>>,
463    ) -> Result<BoxStream<Result<ObjectMeta>>> {
464        self.inner.read_dir_all(dir_path, unmodified_since).await
465    }
466
467    /// Remove a directory recursively.
468    pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
469        let path = dir_path.into();
470        let path = Path::parse(&path)?;
471
472        if self.is_local() {
473            // Local file system needs to delete directories as well.
474            return super::local::remove_dir_all(&path);
475        }
476        let sub_entries = self
477            .inner
478            .list(Some(&path))
479            .map(|m| m.map(|meta| meta.location))
480            .boxed();
481        self.inner
482            .delete_stream(sub_entries)
483            .try_collect::<Vec<_>>()
484            .await?;
485        Ok(())
486    }
487
488    pub fn remove_stream<'a>(
489        &'a self,
490        locations: BoxStream<'a, Result<Path>>,
491    ) -> BoxStream<'a, Result<Path>> {
492        self.inner
493            .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
494            .err_into::<Error>()
495            .boxed()
496    }
497
498    /// Check a file exists.
499    pub async fn exists(&self, path: &Path) -> Result<bool> {
500        match self.inner.head(path).await {
501            Ok(_) => Ok(true),
502            Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
503            Err(e) => Err(e.into()),
504        }
505    }
506
507    /// Get file size.
508    pub async fn size(&self, path: &Path) -> Result<usize> {
509        Ok(self.inner.head(path).await?.size)
510    }
511
512    /// Convenience function to open a reader and read all the bytes
513    pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
514        let reader = self.open(path).await?;
515        Ok(reader.get_all().await?)
516    }
517
518    /// Convenience function open a reader and make a single request
519    ///
520    /// If you will be making multiple requests to the path it is more efficient to call [`Self::open`]
521    /// and then call [`Reader::get_range`] multiple times.
522    pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
523        let reader = self.open(path).await?;
524        Ok(reader.get_range(range).await?)
525    }
526}
527
528/// Options that can be set for multiple object stores
529#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
530pub enum LanceConfigKey {
531    /// Number of times to retry a download that fails
532    DownloadRetryCount,
533}
534
535impl FromStr for LanceConfigKey {
536    type Err = Error;
537
538    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
539        match s.to_ascii_lowercase().as_str() {
540            "download_retry_count" => Ok(Self::DownloadRetryCount),
541            _ => Err(Error::InvalidInput {
542                source: format!("Invalid LanceConfigKey: {}", s).into(),
543                location: location!(),
544            }),
545        }
546    }
547}
548
549#[derive(Clone, Debug, Default)]
550pub struct StorageOptions(pub HashMap<String, String>);
551
552impl StorageOptions {
553    /// Create a new instance of [`StorageOptions`]
554    pub fn new(options: HashMap<String, String>) -> Self {
555        let mut options = options;
556        if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
557            options.insert("allow_http".into(), value);
558        }
559        if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
560            options.insert("allow_http".into(), value);
561        }
562        if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
563            options.insert("allow_http".into(), value);
564        }
565        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
566            options.insert("client_max_retries".into(), value);
567        }
568        if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
569            options.insert("client_retry_timeout".into(), value);
570        }
571        Self(options)
572    }
573
574    /// Denotes if unsecure connections via http are allowed
575    pub fn allow_http(&self) -> bool {
576        self.0.iter().any(|(key, value)| {
577            key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
578        })
579    }
580
581    /// Number of times to retry a download that fails
582    pub fn download_retry_count(&self) -> usize {
583        self.0
584            .iter()
585            .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
586            .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
587            .unwrap_or(3)
588    }
589
590    /// Max retry times to set in RetryConfig for object store client
591    pub fn client_max_retries(&self) -> usize {
592        self.0
593            .iter()
594            .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
595            .and_then(|(_, value)| value.parse::<usize>().ok())
596            .unwrap_or(10)
597    }
598
599    /// Seconds of timeout to set in RetryConfig for object store client
600    pub fn client_retry_timeout(&self) -> u64 {
601        self.0
602            .iter()
603            .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
604            .and_then(|(_, value)| value.parse::<u64>().ok())
605            .unwrap_or(180)
606    }
607
608    pub fn get(&self, key: &str) -> Option<&String> {
609        self.0.get(key)
610    }
611}
612
613impl From<HashMap<String, String>> for StorageOptions {
614    fn from(value: HashMap<String, String>) -> Self {
615        Self::new(value)
616    }
617}
618
619impl ObjectStore {
620    #[allow(clippy::too_many_arguments)]
621    pub fn new(
622        store: Arc<DynObjectStore>,
623        location: Url,
624        block_size: Option<usize>,
625        wrapper: Option<Arc<dyn WrappingObjectStore>>,
626        use_constant_size_upload_parts: bool,
627        list_is_lexically_ordered: bool,
628        io_parallelism: usize,
629        download_retry_count: usize,
630    ) -> Self {
631        let scheme = location.scheme();
632        let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
633
634        let store = match wrapper {
635            Some(wrapper) => wrapper.wrap(store),
636            None => store,
637        };
638
639        Self {
640            inner: store,
641            scheme: scheme.into(),
642            block_size,
643            use_constant_size_upload_parts,
644            list_is_lexically_ordered,
645            io_parallelism,
646            download_retry_count,
647        }
648    }
649}
650
651fn infer_block_size(scheme: &str) -> usize {
652    // Block size: On local file systems, we use 4KB block size. On cloud
653    // object stores, we use 64KB block size. This is generally the largest
654    // block size where we don't see a latency penalty.
655    match scheme {
656        "file" => 4 * 1024,
657        _ => 64 * 1024,
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664    use object_store::memory::InMemory;
665    use parquet::data_type::AsBytes;
666    use rstest::rstest;
667    use std::env::set_current_dir;
668    use std::fs::{create_dir_all, write};
669    use std::path::Path as StdPath;
670    use std::sync::atomic::{AtomicBool, Ordering};
671
672    /// Write test content to file.
673    fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
674        let expanded = tilde(path_str).to_string();
675        let path = StdPath::new(&expanded);
676        std::fs::create_dir_all(path.parent().unwrap())?;
677        write(path, contents)
678    }
679
680    async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
681        let test_file_store = store.open(path).await.unwrap();
682        let size = test_file_store.size().await.unwrap();
683        let bytes = test_file_store.get_range(0..size).await.unwrap();
684        let contents = String::from_utf8(bytes.to_vec()).unwrap();
685        Ok(contents)
686    }
687
688    #[tokio::test]
689    async fn test_absolute_paths() {
690        let tmp_dir = tempfile::tempdir().unwrap();
691        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
692        write_to_file(
693            &format!("{tmp_path}/bar/foo.lance/test_file"),
694            "TEST_CONTENT",
695        )
696        .unwrap();
697
698        // test a few variations of the same path
699        for uri in &[
700            format!("{tmp_path}/bar/foo.lance"),
701            format!("{tmp_path}/./bar/foo.lance"),
702            format!("{tmp_path}/bar/foo.lance/../foo.lance"),
703        ] {
704            let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
705            let contents = read_from_store(store.as_ref(), &path.child("test_file"))
706                .await
707                .unwrap();
708            assert_eq!(contents, "TEST_CONTENT");
709        }
710    }
711
712    #[tokio::test]
713    async fn test_cloud_paths() {
714        let uri = "s3://bucket/foo.lance";
715        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
716        assert_eq!(store.scheme, "s3");
717        assert_eq!(path.to_string(), "foo.lance");
718
719        let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
720            .await
721            .unwrap();
722        assert_eq!(store.scheme, "s3");
723        assert_eq!(path.to_string(), "foo.lance");
724
725        let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
726            .await
727            .unwrap();
728        assert_eq!(store.scheme, "gs");
729        assert_eq!(path.to_string(), "foo.lance");
730    }
731
732    async fn test_block_size_used_test_helper(
733        uri: &str,
734        storage_options: Option<HashMap<String, String>>,
735        default_expected_block_size: usize,
736    ) {
737        // Test the default
738        let registry = Arc::new(ObjectStoreRegistry::default());
739        let params = ObjectStoreParams {
740            storage_options: storage_options.clone(),
741            ..ObjectStoreParams::default()
742        };
743        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
744            .await
745            .unwrap();
746        assert_eq!(store.block_size, default_expected_block_size);
747
748        // Ensure param is used
749        let registry = Arc::new(ObjectStoreRegistry::default());
750        let params = ObjectStoreParams {
751            block_size: Some(1024),
752            storage_options: storage_options.clone(),
753            ..ObjectStoreParams::default()
754        };
755        let (store, _) = ObjectStore::from_uri_and_params(registry, uri, &params)
756            .await
757            .unwrap();
758        assert_eq!(store.block_size, 1024);
759    }
760
761    #[rstest]
762    #[case("s3://bucket/foo.lance", None)]
763    #[case("gs://bucket/foo.lance", None)]
764    #[case("az://account/bucket/foo.lance",
765      Some(HashMap::from([
766            (String::from("account_name"), String::from("account")),
767            (String::from("container_name"), String::from("container"))
768           ])))]
769    #[tokio::test]
770    async fn test_block_size_used_cloud(
771        #[case] uri: &str,
772        #[case] storage_options: Option<HashMap<String, String>>,
773    ) {
774        test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
775    }
776
777    #[rstest]
778    #[case("file")]
779    #[case("file-object-store")]
780    #[case("memory:///bucket/foo.lance")]
781    #[tokio::test]
782    async fn test_block_size_used_file(#[case] prefix: &str) {
783        let tmp_dir = tempfile::tempdir().unwrap();
784        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
785        let path = format!("{tmp_path}/bar/foo.lance/test_file");
786        write_to_file(&path, "URL").unwrap();
787        let uri = format!("{prefix}:///{path}");
788        test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
789    }
790
791    #[tokio::test]
792    async fn test_relative_paths() {
793        let tmp_dir = tempfile::tempdir().unwrap();
794        let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
795        write_to_file(
796            &format!("{tmp_path}/bar/foo.lance/test_file"),
797            "RELATIVE_URL",
798        )
799        .unwrap();
800
801        set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
802        let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
803
804        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
805            .await
806            .unwrap();
807        assert_eq!(contents, "RELATIVE_URL");
808    }
809
810    #[tokio::test]
811    async fn test_tilde_expansion() {
812        let uri = "~/foo.lance";
813        write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
814        let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
815        let contents = read_from_store(store.as_ref(), &path.child("test_file"))
816            .await
817            .unwrap();
818        assert_eq!(contents, "TILDE");
819    }
820
821    #[tokio::test]
822    async fn test_read_directory() {
823        let tmp_dir = tempfile::tempdir().unwrap();
824        let path = tmp_dir.path();
825        create_dir_all(path.join("foo").join("bar")).unwrap();
826        create_dir_all(path.join("foo").join("zoo")).unwrap();
827        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
828        write_to_file(
829            path.join("foo").join("test_file").to_str().unwrap(),
830            "read_dir",
831        )
832        .unwrap();
833        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
834
835        let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
836        assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
837    }
838
839    #[tokio::test]
840    async fn test_delete_directory() {
841        let tmp_dir = tempfile::tempdir().unwrap();
842        let path = tmp_dir.path();
843        create_dir_all(path.join("foo").join("bar")).unwrap();
844        create_dir_all(path.join("foo").join("zoo")).unwrap();
845        create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
846        write_to_file(
847            path.join("foo")
848                .join("bar")
849                .join("test_file")
850                .to_str()
851                .unwrap(),
852            "delete",
853        )
854        .unwrap();
855        write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
856        let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
857        store.remove_dir_all(base.child("foo")).await.unwrap();
858
859        assert!(!path.join("foo").exists());
860    }
861
862    #[derive(Debug)]
863    struct TestWrapper {
864        called: AtomicBool,
865
866        return_value: Arc<dyn OSObjectStore>,
867    }
868
869    impl WrappingObjectStore for TestWrapper {
870        fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
871            self.called.store(true, Ordering::Relaxed);
872
873            // return a mocked value so we can check if the final store is the one we expect
874            self.return_value.clone()
875        }
876    }
877
878    impl TestWrapper {
879        fn called(&self) -> bool {
880            self.called.load(Ordering::Relaxed)
881        }
882    }
883
884    #[tokio::test]
885    async fn test_wrapping_object_store_option_is_used() {
886        // Make a store for the inner store first
887        let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
888        let registry = Arc::new(ObjectStoreRegistry::default());
889
890        assert_eq!(Arc::strong_count(&mock_inner_store), 1);
891
892        let wrapper = Arc::new(TestWrapper {
893            called: AtomicBool::new(false),
894            return_value: mock_inner_store.clone(),
895        });
896
897        let params = ObjectStoreParams {
898            object_store_wrapper: Some(wrapper.clone()),
899            ..ObjectStoreParams::default()
900        };
901
902        // not called yet
903        assert!(!wrapper.called());
904
905        let _ = ObjectStore::from_uri_and_params(registry, "memory:///", &params)
906            .await
907            .unwrap();
908
909        // called after construction
910        assert!(wrapper.called());
911
912        // hard to compare two trait pointers as the point to vtables
913        // using the ref count as a proxy to make sure that the store is correctly kept
914        assert_eq!(Arc::strong_count(&mock_inner_store), 2);
915    }
916
917    #[tokio::test]
918    async fn test_local_paths() {
919        let temp_dir = tempfile::tempdir().unwrap();
920
921        let file_path = temp_dir.path().join("test_file");
922        let mut writer = ObjectStore::create_local_writer(file_path.as_path())
923            .await
924            .unwrap();
925        writer.write_all(b"LOCAL").await.unwrap();
926        writer.shutdown().await.unwrap();
927
928        let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
929        let buf = reader.get_range(0..5).await.unwrap();
930        assert_eq!(buf.as_bytes(), b"LOCAL");
931    }
932
933    #[tokio::test]
934    async fn test_read_one() {
935        let temp_dir = tempfile::tempdir().unwrap();
936
937        let file_path = temp_dir.path().join("test_file");
938        let mut writer = ObjectStore::create_local_writer(file_path.as_path())
939            .await
940            .unwrap();
941        writer.write_all(b"LOCAL").await.unwrap();
942        writer.shutdown().await.unwrap();
943
944        let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
945        let obj_store = ObjectStore::local();
946        let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
947        assert_eq!(buf.as_bytes(), b"LOCAL");
948
949        let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
950        assert_eq!(buf.as_bytes(), b"LOCAL");
951    }
952
953    #[tokio::test]
954    #[cfg(windows)]
955    async fn test_windows_paths() {
956        use std::path::Component;
957        use std::path::Prefix;
958        use std::path::Prefix::*;
959
960        fn get_path_prefix(path: &StdPath) -> Prefix {
961            match path.components().next().unwrap() {
962                Component::Prefix(prefix_component) => prefix_component.kind(),
963                _ => panic!(),
964            }
965        }
966
967        fn get_drive_letter(prefix: Prefix) -> String {
968            match prefix {
969                Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
970                _ => panic!(),
971            }
972        }
973
974        let tmp_dir = tempfile::tempdir().unwrap();
975        let tmp_path = tmp_dir.path();
976        let prefix = get_path_prefix(tmp_path);
977        let drive_letter = get_drive_letter(prefix);
978
979        write_to_file(
980            &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
981            "WINDOWS",
982        )
983        .unwrap();
984
985        for uri in &[
986            format!("{drive_letter}:/test_folder/test.lance"),
987            format!("{drive_letter}:\\test_folder\\test.lance"),
988        ] {
989            let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
990            let contents = read_from_store(store.as_ref(), &base.child("test_file"))
991                .await
992                .unwrap();
993            assert_eq!(contents, "WINDOWS");
994        }
995    }
996}