1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; lazy_static::lazy_static! {
56 pub static ref DEFAULT_MAX_IOP_SIZE: u64 = std::env::var("LANCE_MAX_IOP_SIZE")
57 .map(|val| val.parse().unwrap()).unwrap_or(16 * 1024 * 1024);
58}
59
60pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
61
62pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
63
64#[async_trait]
65pub trait ObjectStoreExt {
66 async fn exists(&self, path: &Path) -> Result<bool>;
68
69 async fn read_dir_all<'a>(
73 &'a self,
74 dir_path: impl Into<&Path> + Send,
75 unmodified_since: Option<DateTime<Utc>>,
76 ) -> Result<BoxStream<'a, Result<ObjectMeta>>>;
77}
78
79#[async_trait]
80impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
81 async fn read_dir_all<'a>(
82 &'a self,
83 dir_path: impl Into<&Path> + Send,
84 unmodified_since: Option<DateTime<Utc>>,
85 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
86 let mut output = self.list(Some(dir_path.into()));
87 if let Some(unmodified_since_val) = unmodified_since {
88 output = output
89 .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
90 .boxed();
91 }
92 Ok(output.map_err(|e| e.into()).boxed())
93 }
94
95 async fn exists(&self, path: &Path) -> Result<bool> {
96 match self.head(path).await {
97 Ok(_) => Ok(true),
98 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
99 Err(e) => Err(e.into()),
100 }
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct ObjectStore {
107 pub inner: Arc<dyn OSObjectStore>,
109 scheme: String,
110 block_size: usize,
111 max_iop_size: u64,
112 pub use_constant_size_upload_parts: bool,
115 pub list_is_lexically_ordered: bool,
118 io_parallelism: usize,
119 download_retry_count: usize,
121}
122
123impl DeepSizeOf for ObjectStore {
124 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
125 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
130 }
131}
132
133impl std::fmt::Display for ObjectStore {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 write!(f, "ObjectStore({})", self.scheme)
136 }
137}
138
139pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
140 fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
141}
142
143#[derive(Debug, Clone)]
146pub struct ObjectStoreParams {
147 pub block_size: Option<usize>,
148 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
149 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
150 pub s3_credentials_refresh_offset: Duration,
151 #[cfg(feature = "aws")]
152 pub aws_credentials: Option<AwsCredentialProvider>,
153 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
154 pub storage_options: Option<HashMap<String, String>>,
155 pub use_constant_size_upload_parts: bool,
160 pub list_is_lexically_ordered: Option<bool>,
161}
162
163impl Default for ObjectStoreParams {
164 fn default() -> Self {
165 #[allow(deprecated)]
166 Self {
167 object_store: None,
168 block_size: None,
169 s3_credentials_refresh_offset: Duration::from_secs(60),
170 #[cfg(feature = "aws")]
171 aws_credentials: None,
172 object_store_wrapper: None,
173 storage_options: None,
174 use_constant_size_upload_parts: false,
175 list_is_lexically_ordered: None,
176 }
177 }
178}
179
180impl std::hash::Hash for ObjectStoreParams {
182 #[allow(deprecated)]
183 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
184 self.block_size.hash(state);
186 if let Some((store, url)) = &self.object_store {
187 Arc::as_ptr(store).hash(state);
188 url.hash(state);
189 }
190 self.s3_credentials_refresh_offset.hash(state);
191 #[cfg(feature = "aws")]
192 if let Some(aws_credentials) = &self.aws_credentials {
193 Arc::as_ptr(aws_credentials).hash(state);
194 }
195 if let Some(wrapper) = &self.object_store_wrapper {
196 Arc::as_ptr(wrapper).hash(state);
197 }
198 if let Some(storage_options) = &self.storage_options {
199 for (key, value) in storage_options {
200 key.hash(state);
201 value.hash(state);
202 }
203 }
204 self.use_constant_size_upload_parts.hash(state);
205 self.list_is_lexically_ordered.hash(state);
206 }
207}
208
209impl Eq for ObjectStoreParams {}
211impl PartialEq for ObjectStoreParams {
212 #[allow(deprecated)]
213 fn eq(&self, other: &Self) -> bool {
214 self.block_size == other.block_size
216 && self
217 .object_store
218 .as_ref()
219 .map(|(store, url)| (Arc::as_ptr(store), url))
220 == other
221 .object_store
222 .as_ref()
223 .map(|(store, url)| (Arc::as_ptr(store), url))
224 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
225 && self.aws_credentials.as_ref().map(Arc::as_ptr)
226 == other.aws_credentials.as_ref().map(Arc::as_ptr)
227 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
228 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
229 && self.storage_options == other.storage_options
230 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
231 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
232 }
233}
234
235fn uri_to_url(uri: &str) -> Result<Url> {
236 match Url::parse(uri) {
237 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
238 local_path_to_url(uri)
240 }
241 Ok(url) => Ok(url),
242 Err(_) => local_path_to_url(uri),
243 }
244}
245
246fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
247 let expanded = tilde(str_path.as_ref()).to_string();
248
249 let mut expanded_path = path_abs::PathAbs::new(expanded)
250 .unwrap()
251 .as_path()
252 .to_path_buf();
253 if let Some(s) = expanded_path.as_path().to_str() {
255 if s.is_empty() {
256 expanded_path = std::env::current_dir()?;
257 }
258 }
259
260 Ok(expanded_path)
261}
262
263fn local_path_to_url(str_path: &str) -> Result<Url> {
264 let expanded_path = expand_path(str_path)?;
265
266 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
267 source: format!("Invalid table location: '{}'", str_path).into(),
268 location: location!(),
269 })
270}
271
272impl ObjectStore {
273 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
281 let registry = Arc::new(ObjectStoreRegistry::default());
282
283 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
284 }
285
286 pub async fn from_uri_and_params(
290 registry: Arc<ObjectStoreRegistry>,
291 uri: &str,
292 params: &ObjectStoreParams,
293 ) -> Result<(Arc<Self>, Path)> {
294 #[allow(deprecated)]
295 if let Some((store, path)) = params.object_store.as_ref() {
296 let mut inner = store.clone();
297 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
298 inner = wrapper.wrap(inner);
299 }
300 let store = Self {
301 inner,
302 scheme: path.scheme().to_string(),
303 block_size: params.block_size.unwrap_or(64 * 1024),
304 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
305 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
306 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
307 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
308 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
309 };
310 let path = Path::from(path.path());
311 return Ok((Arc::new(store), path));
312 }
313 let url = uri_to_url(uri)?;
314 let store = registry.get_store(url.clone(), params).await?;
315 let provider = registry.get_provider(url.scheme()).expect_ok()?;
317 let path = provider.extract_path(&url);
318
319 Ok((store, path))
320 }
321
322 #[deprecated(note = "Use `from_uri` instead")]
323 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
324 Self::from_uri_and_params(
325 Arc::new(ObjectStoreRegistry::default()),
326 str_path,
327 &Default::default(),
328 )
329 .now_or_never()
330 .unwrap()
331 }
332
333 pub fn local() -> Self {
335 let provider = FileStoreProvider;
336 provider
337 .new_store(Url::parse("file:///").unwrap(), &Default::default())
338 .now_or_never()
339 .unwrap()
340 .unwrap()
341 }
342
343 pub fn memory() -> Self {
345 let provider = MemoryStoreProvider;
346 provider
347 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
348 .now_or_never()
349 .unwrap()
350 .unwrap()
351 }
352
353 pub fn is_local(&self) -> bool {
355 self.scheme == "file"
356 }
357
358 pub fn is_cloud(&self) -> bool {
359 self.scheme != "file" && self.scheme != "memory"
360 }
361
362 pub fn block_size(&self) -> usize {
363 self.block_size
364 }
365
366 pub fn max_iop_size(&self) -> u64 {
367 self.max_iop_size
368 }
369
370 pub fn io_parallelism(&self) -> usize {
371 std::env::var("LANCE_IO_THREADS")
372 .map(|val| val.parse::<usize>().unwrap())
373 .unwrap_or(self.io_parallelism)
374 }
375
376 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
381 match self.scheme.as_str() {
382 "file" => LocalObjectReader::open(path, self.block_size, None).await,
383 _ => Ok(Box::new(CloudObjectReader::new(
384 self.inner.clone(),
385 path.clone(),
386 self.block_size,
387 None,
388 self.download_retry_count,
389 )?)),
390 }
391 }
392
393 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
399 if known_size <= self.block_size {
402 return Ok(Box::new(SmallReader::new(
403 self.inner.clone(),
404 path.clone(),
405 self.download_retry_count,
406 known_size,
407 )));
408 }
409
410 match self.scheme.as_str() {
411 "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
412 _ => Ok(Box::new(CloudObjectReader::new(
413 self.inner.clone(),
414 path.clone(),
415 self.block_size,
416 Some(known_size),
417 self.download_retry_count,
418 )?)),
419 }
420 }
421
422 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
424 let object_store = Self::local();
425 let absolute_path = expand_path(path.to_string_lossy())?;
426 let os_path = Path::from_absolute_path(absolute_path)?;
427 object_store.create(&os_path).await
428 }
429
430 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
432 let object_store = Self::local();
433 let absolute_path = expand_path(path.to_string_lossy())?;
434 let os_path = Path::from_absolute_path(absolute_path)?;
435 object_store.open(&os_path).await
436 }
437
438 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
440 ObjectWriter::new(self, path).await
441 }
442
443 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
445 let mut writer = self.create(path).await?;
446 writer.write_all(content).await?;
447 writer.shutdown().await
448 }
449
450 pub async fn delete(&self, path: &Path) -> Result<()> {
451 self.inner.delete(path).await?;
452 Ok(())
453 }
454
455 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
456 Ok(self.inner.copy(from, to).await?)
457 }
458
459 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
461 let path = dir_path.into();
462 let path = Path::parse(&path)?;
463 let output = self.inner.list_with_delimiter(Some(&path)).await?;
464 Ok(output
465 .common_prefixes
466 .iter()
467 .chain(output.objects.iter().map(|o| &o.location))
468 .map(|s| s.filename().unwrap().to_string())
469 .collect())
470 }
471
472 pub fn list(
473 &self,
474 path: Option<Path>,
475 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
476 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
477 }
478
479 pub async fn read_dir_all(
483 &self,
484 dir_path: impl Into<&Path> + Send,
485 unmodified_since: Option<DateTime<Utc>>,
486 ) -> Result<BoxStream<Result<ObjectMeta>>> {
487 self.inner.read_dir_all(dir_path, unmodified_since).await
488 }
489
490 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
492 let path = dir_path.into();
493 let path = Path::parse(&path)?;
494
495 if self.is_local() {
496 return super::local::remove_dir_all(&path);
498 }
499 let sub_entries = self
500 .inner
501 .list(Some(&path))
502 .map(|m| m.map(|meta| meta.location))
503 .boxed();
504 self.inner
505 .delete_stream(sub_entries)
506 .try_collect::<Vec<_>>()
507 .await?;
508 Ok(())
509 }
510
511 pub fn remove_stream<'a>(
512 &'a self,
513 locations: BoxStream<'a, Result<Path>>,
514 ) -> BoxStream<'a, Result<Path>> {
515 self.inner
516 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
517 .err_into::<Error>()
518 .boxed()
519 }
520
521 pub async fn exists(&self, path: &Path) -> Result<bool> {
523 match self.inner.head(path).await {
524 Ok(_) => Ok(true),
525 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
526 Err(e) => Err(e.into()),
527 }
528 }
529
530 pub async fn size(&self, path: &Path) -> Result<usize> {
532 Ok(self.inner.head(path).await?.size)
533 }
534
535 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
537 let reader = self.open(path).await?;
538 Ok(reader.get_all().await?)
539 }
540
541 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
546 let reader = self.open(path).await?;
547 Ok(reader.get_range(range).await?)
548 }
549}
550
551#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
553pub enum LanceConfigKey {
554 DownloadRetryCount,
556}
557
558impl FromStr for LanceConfigKey {
559 type Err = Error;
560
561 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
562 match s.to_ascii_lowercase().as_str() {
563 "download_retry_count" => Ok(Self::DownloadRetryCount),
564 _ => Err(Error::InvalidInput {
565 source: format!("Invalid LanceConfigKey: {}", s).into(),
566 location: location!(),
567 }),
568 }
569 }
570}
571
572#[derive(Clone, Debug, Default)]
573pub struct StorageOptions(pub HashMap<String, String>);
574
575impl StorageOptions {
576 pub fn new(options: HashMap<String, String>) -> Self {
578 let mut options = options;
579 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
580 options.insert("allow_http".into(), value);
581 }
582 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
583 options.insert("allow_http".into(), value);
584 }
585 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
586 options.insert("allow_http".into(), value);
587 }
588 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
589 options.insert("client_max_retries".into(), value);
590 }
591 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
592 options.insert("client_retry_timeout".into(), value);
593 }
594 Self(options)
595 }
596
597 pub fn allow_http(&self) -> bool {
599 self.0.iter().any(|(key, value)| {
600 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
601 })
602 }
603
604 pub fn download_retry_count(&self) -> usize {
606 self.0
607 .iter()
608 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
609 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
610 .unwrap_or(3)
611 }
612
613 pub fn client_max_retries(&self) -> usize {
615 self.0
616 .iter()
617 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
618 .and_then(|(_, value)| value.parse::<usize>().ok())
619 .unwrap_or(10)
620 }
621
622 pub fn client_retry_timeout(&self) -> u64 {
624 self.0
625 .iter()
626 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
627 .and_then(|(_, value)| value.parse::<u64>().ok())
628 .unwrap_or(180)
629 }
630
631 pub fn get(&self, key: &str) -> Option<&String> {
632 self.0.get(key)
633 }
634}
635
636impl From<HashMap<String, String>> for StorageOptions {
637 fn from(value: HashMap<String, String>) -> Self {
638 Self::new(value)
639 }
640}
641
642impl ObjectStore {
643 #[allow(clippy::too_many_arguments)]
644 pub fn new(
645 store: Arc<DynObjectStore>,
646 location: Url,
647 block_size: Option<usize>,
648 wrapper: Option<Arc<dyn WrappingObjectStore>>,
649 use_constant_size_upload_parts: bool,
650 list_is_lexically_ordered: bool,
651 io_parallelism: usize,
652 download_retry_count: usize,
653 ) -> Self {
654 let scheme = location.scheme();
655 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
656
657 let store = match wrapper {
658 Some(wrapper) => wrapper.wrap(store),
659 None => store,
660 };
661
662 Self {
663 inner: store,
664 scheme: scheme.into(),
665 block_size,
666 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
667 use_constant_size_upload_parts,
668 list_is_lexically_ordered,
669 io_parallelism,
670 download_retry_count,
671 }
672 }
673}
674
675fn infer_block_size(scheme: &str) -> usize {
676 match scheme {
680 "file" => 4 * 1024,
681 _ => 64 * 1024,
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use object_store::memory::InMemory;
689 use parquet::data_type::AsBytes;
690 use rstest::rstest;
691 use std::env::set_current_dir;
692 use std::fs::{create_dir_all, write};
693 use std::path::Path as StdPath;
694 use std::sync::atomic::{AtomicBool, Ordering};
695
696 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
698 let expanded = tilde(path_str).to_string();
699 let path = StdPath::new(&expanded);
700 std::fs::create_dir_all(path.parent().unwrap())?;
701 write(path, contents)
702 }
703
704 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
705 let test_file_store = store.open(path).await.unwrap();
706 let size = test_file_store.size().await.unwrap();
707 let bytes = test_file_store.get_range(0..size).await.unwrap();
708 let contents = String::from_utf8(bytes.to_vec()).unwrap();
709 Ok(contents)
710 }
711
712 #[tokio::test]
713 async fn test_absolute_paths() {
714 let tmp_dir = tempfile::tempdir().unwrap();
715 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
716 write_to_file(
717 &format!("{tmp_path}/bar/foo.lance/test_file"),
718 "TEST_CONTENT",
719 )
720 .unwrap();
721
722 for uri in &[
724 format!("{tmp_path}/bar/foo.lance"),
725 format!("{tmp_path}/./bar/foo.lance"),
726 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
727 ] {
728 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
729 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
730 .await
731 .unwrap();
732 assert_eq!(contents, "TEST_CONTENT");
733 }
734 }
735
736 #[tokio::test]
737 async fn test_cloud_paths() {
738 let uri = "s3://bucket/foo.lance";
739 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
740 assert_eq!(store.scheme, "s3");
741 assert_eq!(path.to_string(), "foo.lance");
742
743 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
744 .await
745 .unwrap();
746 assert_eq!(store.scheme, "s3");
747 assert_eq!(path.to_string(), "foo.lance");
748
749 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
750 .await
751 .unwrap();
752 assert_eq!(store.scheme, "gs");
753 assert_eq!(path.to_string(), "foo.lance");
754 }
755
756 async fn test_block_size_used_test_helper(
757 uri: &str,
758 storage_options: Option<HashMap<String, String>>,
759 default_expected_block_size: usize,
760 ) {
761 let registry = Arc::new(ObjectStoreRegistry::default());
763 let params = ObjectStoreParams {
764 storage_options: storage_options.clone(),
765 ..ObjectStoreParams::default()
766 };
767 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
768 .await
769 .unwrap();
770 assert_eq!(store.block_size, default_expected_block_size);
771
772 let registry = Arc::new(ObjectStoreRegistry::default());
774 let params = ObjectStoreParams {
775 block_size: Some(1024),
776 storage_options: storage_options.clone(),
777 ..ObjectStoreParams::default()
778 };
779 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
780 .await
781 .unwrap();
782 assert_eq!(store.block_size, 1024);
783 }
784
785 #[rstest]
786 #[case("s3://bucket/foo.lance", None)]
787 #[case("gs://bucket/foo.lance", None)]
788 #[case("az://account/bucket/foo.lance",
789 Some(HashMap::from([
790 (String::from("account_name"), String::from("account")),
791 (String::from("container_name"), String::from("container"))
792 ])))]
793 #[tokio::test]
794 async fn test_block_size_used_cloud(
795 #[case] uri: &str,
796 #[case] storage_options: Option<HashMap<String, String>>,
797 ) {
798 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
799 }
800
801 #[rstest]
802 #[case("file")]
803 #[case("file-object-store")]
804 #[case("memory:///bucket/foo.lance")]
805 #[tokio::test]
806 async fn test_block_size_used_file(#[case] prefix: &str) {
807 let tmp_dir = tempfile::tempdir().unwrap();
808 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
809 let path = format!("{tmp_path}/bar/foo.lance/test_file");
810 write_to_file(&path, "URL").unwrap();
811 let uri = format!("{prefix}:///{path}");
812 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
813 }
814
815 #[tokio::test]
816 async fn test_relative_paths() {
817 let tmp_dir = tempfile::tempdir().unwrap();
818 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
819 write_to_file(
820 &format!("{tmp_path}/bar/foo.lance/test_file"),
821 "RELATIVE_URL",
822 )
823 .unwrap();
824
825 set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
826 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
827
828 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
829 .await
830 .unwrap();
831 assert_eq!(contents, "RELATIVE_URL");
832 }
833
834 #[tokio::test]
835 async fn test_tilde_expansion() {
836 let uri = "~/foo.lance";
837 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
838 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
839 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
840 .await
841 .unwrap();
842 assert_eq!(contents, "TILDE");
843 }
844
845 #[tokio::test]
846 async fn test_read_directory() {
847 let tmp_dir = tempfile::tempdir().unwrap();
848 let path = tmp_dir.path();
849 create_dir_all(path.join("foo").join("bar")).unwrap();
850 create_dir_all(path.join("foo").join("zoo")).unwrap();
851 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
852 write_to_file(
853 path.join("foo").join("test_file").to_str().unwrap(),
854 "read_dir",
855 )
856 .unwrap();
857 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
858
859 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
860 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
861 }
862
863 #[tokio::test]
864 async fn test_delete_directory() {
865 let tmp_dir = tempfile::tempdir().unwrap();
866 let path = tmp_dir.path();
867 create_dir_all(path.join("foo").join("bar")).unwrap();
868 create_dir_all(path.join("foo").join("zoo")).unwrap();
869 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
870 write_to_file(
871 path.join("foo")
872 .join("bar")
873 .join("test_file")
874 .to_str()
875 .unwrap(),
876 "delete",
877 )
878 .unwrap();
879 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
880 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
881 store.remove_dir_all(base.child("foo")).await.unwrap();
882
883 assert!(!path.join("foo").exists());
884 }
885
886 #[derive(Debug)]
887 struct TestWrapper {
888 called: AtomicBool,
889
890 return_value: Arc<dyn OSObjectStore>,
891 }
892
893 impl WrappingObjectStore for TestWrapper {
894 fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
895 self.called.store(true, Ordering::Relaxed);
896
897 self.return_value.clone()
899 }
900 }
901
902 impl TestWrapper {
903 fn called(&self) -> bool {
904 self.called.load(Ordering::Relaxed)
905 }
906 }
907
908 #[tokio::test]
909 async fn test_wrapping_object_store_option_is_used() {
910 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
912 let registry = Arc::new(ObjectStoreRegistry::default());
913
914 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
915
916 let wrapper = Arc::new(TestWrapper {
917 called: AtomicBool::new(false),
918 return_value: mock_inner_store.clone(),
919 });
920
921 let params = ObjectStoreParams {
922 object_store_wrapper: Some(wrapper.clone()),
923 ..ObjectStoreParams::default()
924 };
925
926 assert!(!wrapper.called());
928
929 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
930 .await
931 .unwrap();
932
933 assert!(wrapper.called());
935
936 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
939 }
940
941 #[tokio::test]
942 async fn test_local_paths() {
943 let temp_dir = tempfile::tempdir().unwrap();
944
945 let file_path = temp_dir.path().join("test_file");
946 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
947 .await
948 .unwrap();
949 writer.write_all(b"LOCAL").await.unwrap();
950 writer.shutdown().await.unwrap();
951
952 let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
953 let buf = reader.get_range(0..5).await.unwrap();
954 assert_eq!(buf.as_bytes(), b"LOCAL");
955 }
956
957 #[tokio::test]
958 async fn test_read_one() {
959 let temp_dir = tempfile::tempdir().unwrap();
960
961 let file_path = temp_dir.path().join("test_file");
962 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
963 .await
964 .unwrap();
965 writer.write_all(b"LOCAL").await.unwrap();
966 writer.shutdown().await.unwrap();
967
968 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
969 let obj_store = ObjectStore::local();
970 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
971 assert_eq!(buf.as_bytes(), b"LOCAL");
972
973 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
974 assert_eq!(buf.as_bytes(), b"LOCAL");
975 }
976
977 #[tokio::test]
978 #[cfg(windows)]
979 async fn test_windows_paths() {
980 use std::path::Component;
981 use std::path::Prefix;
982 use std::path::Prefix::*;
983
984 fn get_path_prefix(path: &StdPath) -> Prefix {
985 match path.components().next().unwrap() {
986 Component::Prefix(prefix_component) => prefix_component.kind(),
987 _ => panic!(),
988 }
989 }
990
991 fn get_drive_letter(prefix: Prefix) -> String {
992 match prefix {
993 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
994 _ => panic!(),
995 }
996 }
997
998 let tmp_dir = tempfile::tempdir().unwrap();
999 let tmp_path = tmp_dir.path();
1000 let prefix = get_path_prefix(tmp_path);
1001 let drive_letter = get_drive_letter(prefix);
1002
1003 write_to_file(
1004 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1005 "WINDOWS",
1006 )
1007 .unwrap();
1008
1009 for uri in &[
1010 format!("{drive_letter}:/test_folder/test.lance"),
1011 format!("{drive_letter}:\\test_folder\\test.lance"),
1012 ] {
1013 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1014 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1015 .await
1016 .unwrap();
1017 assert_eq!(contents, "WINDOWS");
1018 }
1019 }
1020}