1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; lazy_static::lazy_static! {
56 pub static ref DEFAULT_MAX_IOP_SIZE: u64 = std::env::var("LANCE_MAX_IOP_SIZE")
57 .map(|val| val.parse().unwrap()).unwrap_or(16 * 1024 * 1024);
58}
59
60pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
61
62pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
63
64#[async_trait]
65pub trait ObjectStoreExt {
66 async fn exists(&self, path: &Path) -> Result<bool>;
68
69 fn read_dir_all<'a, 'b>(
73 &'a self,
74 dir_path: impl Into<&'b Path> + Send,
75 unmodified_since: Option<DateTime<Utc>>,
76 ) -> BoxStream<'a, Result<ObjectMeta>>;
77}
78
79#[async_trait]
80impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
81 fn read_dir_all<'a, 'b>(
82 &'a self,
83 dir_path: impl Into<&'b Path> + Send,
84 unmodified_since: Option<DateTime<Utc>>,
85 ) -> BoxStream<'a, Result<ObjectMeta>> {
86 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
87 if let Some(unmodified_since_val) = unmodified_since {
88 output
89 .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
90 .boxed()
91 } else {
92 output.boxed()
93 }
94 }
95
96 async fn exists(&self, path: &Path) -> Result<bool> {
97 match self.head(path).await {
98 Ok(_) => Ok(true),
99 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
100 Err(e) => Err(e.into()),
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
107pub struct ObjectStore {
108 pub inner: Arc<dyn OSObjectStore>,
110 scheme: String,
111 block_size: usize,
112 max_iop_size: u64,
113 pub use_constant_size_upload_parts: bool,
116 pub list_is_lexically_ordered: bool,
119 io_parallelism: usize,
120 download_retry_count: usize,
122}
123
124impl DeepSizeOf for ObjectStore {
125 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
126 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
131 }
132}
133
134impl std::fmt::Display for ObjectStore {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 write!(f, "ObjectStore({})", self.scheme)
137 }
138}
139
140pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
141 fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
142}
143
144#[derive(Debug, Clone)]
145pub struct ChainedWrappingObjectStore {
146 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
147}
148
149impl ChainedWrappingObjectStore {
150 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
151 Self { wrappers }
152 }
153
154 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
155 self.wrappers.push(wrapper);
156 }
157}
158
159impl WrappingObjectStore for ChainedWrappingObjectStore {
160 fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
161 self.wrappers
162 .iter()
163 .fold(original, |acc, wrapper| wrapper.wrap(acc))
164 }
165}
166
167#[derive(Debug, Clone)]
170pub struct ObjectStoreParams {
171 pub block_size: Option<usize>,
172 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
173 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
174 pub s3_credentials_refresh_offset: Duration,
175 #[cfg(feature = "aws")]
176 pub aws_credentials: Option<AwsCredentialProvider>,
177 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
178 pub storage_options: Option<HashMap<String, String>>,
179 pub use_constant_size_upload_parts: bool,
184 pub list_is_lexically_ordered: Option<bool>,
185}
186
187impl Default for ObjectStoreParams {
188 fn default() -> Self {
189 #[allow(deprecated)]
190 Self {
191 object_store: None,
192 block_size: None,
193 s3_credentials_refresh_offset: Duration::from_secs(60),
194 #[cfg(feature = "aws")]
195 aws_credentials: None,
196 object_store_wrapper: None,
197 storage_options: None,
198 use_constant_size_upload_parts: false,
199 list_is_lexically_ordered: None,
200 }
201 }
202}
203
204impl std::hash::Hash for ObjectStoreParams {
206 #[allow(deprecated)]
207 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
208 self.block_size.hash(state);
210 if let Some((store, url)) = &self.object_store {
211 Arc::as_ptr(store).hash(state);
212 url.hash(state);
213 }
214 self.s3_credentials_refresh_offset.hash(state);
215 #[cfg(feature = "aws")]
216 if let Some(aws_credentials) = &self.aws_credentials {
217 Arc::as_ptr(aws_credentials).hash(state);
218 }
219 if let Some(wrapper) = &self.object_store_wrapper {
220 Arc::as_ptr(wrapper).hash(state);
221 }
222 if let Some(storage_options) = &self.storage_options {
223 for (key, value) in storage_options {
224 key.hash(state);
225 value.hash(state);
226 }
227 }
228 self.use_constant_size_upload_parts.hash(state);
229 self.list_is_lexically_ordered.hash(state);
230 }
231}
232
233impl Eq for ObjectStoreParams {}
235impl PartialEq for ObjectStoreParams {
236 #[allow(deprecated)]
237 fn eq(&self, other: &Self) -> bool {
238 self.block_size == other.block_size
240 && self
241 .object_store
242 .as_ref()
243 .map(|(store, url)| (Arc::as_ptr(store), url))
244 == other
245 .object_store
246 .as_ref()
247 .map(|(store, url)| (Arc::as_ptr(store), url))
248 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
249 && self.aws_credentials.as_ref().map(Arc::as_ptr)
250 == other.aws_credentials.as_ref().map(Arc::as_ptr)
251 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
252 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
253 && self.storage_options == other.storage_options
254 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
255 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
256 }
257}
258
259fn uri_to_url(uri: &str) -> Result<Url> {
260 match Url::parse(uri) {
261 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
262 local_path_to_url(uri)
264 }
265 Ok(url) => Ok(url),
266 Err(_) => local_path_to_url(uri),
267 }
268}
269
270fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
271 let expanded = tilde(str_path.as_ref()).to_string();
272
273 let mut expanded_path = path_abs::PathAbs::new(expanded)
274 .unwrap()
275 .as_path()
276 .to_path_buf();
277 if let Some(s) = expanded_path.as_path().to_str() {
279 if s.is_empty() {
280 expanded_path = std::env::current_dir()?;
281 }
282 }
283
284 Ok(expanded_path)
285}
286
287fn local_path_to_url(str_path: &str) -> Result<Url> {
288 let expanded_path = expand_path(str_path)?;
289
290 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
291 source: format!("Invalid table location: '{}'", str_path).into(),
292 location: location!(),
293 })
294}
295
296impl ObjectStore {
297 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
305 let registry = Arc::new(ObjectStoreRegistry::default());
306
307 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
308 }
309
310 pub async fn from_uri_and_params(
314 registry: Arc<ObjectStoreRegistry>,
315 uri: &str,
316 params: &ObjectStoreParams,
317 ) -> Result<(Arc<Self>, Path)> {
318 #[allow(deprecated)]
319 if let Some((store, path)) = params.object_store.as_ref() {
320 let mut inner = store.clone();
321 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
322 inner = wrapper.wrap(inner);
323 }
324 let store = Self {
325 inner,
326 scheme: path.scheme().to_string(),
327 block_size: params.block_size.unwrap_or(64 * 1024),
328 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
329 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
330 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
331 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
332 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
333 };
334 let path = Path::from(path.path());
335 return Ok((Arc::new(store), path));
336 }
337 let url = uri_to_url(uri)?;
338 let store = registry.get_store(url.clone(), params).await?;
339 let provider = registry.get_provider(url.scheme()).expect_ok()?;
341 let path = provider.extract_path(&url);
342
343 Ok((store, path))
344 }
345
346 #[deprecated(note = "Use `from_uri` instead")]
347 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
348 Self::from_uri_and_params(
349 Arc::new(ObjectStoreRegistry::default()),
350 str_path,
351 &Default::default(),
352 )
353 .now_or_never()
354 .unwrap()
355 }
356
357 pub fn local() -> Self {
359 let provider = FileStoreProvider;
360 provider
361 .new_store(Url::parse("file:///").unwrap(), &Default::default())
362 .now_or_never()
363 .unwrap()
364 .unwrap()
365 }
366
367 pub fn memory() -> Self {
369 let provider = MemoryStoreProvider;
370 provider
371 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
372 .now_or_never()
373 .unwrap()
374 .unwrap()
375 }
376
377 pub fn is_local(&self) -> bool {
379 self.scheme == "file"
380 }
381
382 pub fn is_cloud(&self) -> bool {
383 self.scheme != "file" && self.scheme != "memory"
384 }
385
386 pub fn scheme(&self) -> &str {
387 &self.scheme
388 }
389
390 pub fn block_size(&self) -> usize {
391 self.block_size
392 }
393
394 pub fn max_iop_size(&self) -> u64 {
395 self.max_iop_size
396 }
397
398 pub fn io_parallelism(&self) -> usize {
399 std::env::var("LANCE_IO_THREADS")
400 .map(|val| val.parse::<usize>().unwrap())
401 .unwrap_or(self.io_parallelism)
402 }
403
404 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
409 match self.scheme.as_str() {
410 "file" => LocalObjectReader::open(path, self.block_size, None).await,
411 _ => Ok(Box::new(CloudObjectReader::new(
412 self.inner.clone(),
413 path.clone(),
414 self.block_size,
415 None,
416 self.download_retry_count,
417 )?)),
418 }
419 }
420
421 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
427 if known_size <= self.block_size {
430 return Ok(Box::new(SmallReader::new(
431 self.inner.clone(),
432 path.clone(),
433 self.download_retry_count,
434 known_size,
435 )));
436 }
437
438 match self.scheme.as_str() {
439 "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
440 _ => Ok(Box::new(CloudObjectReader::new(
441 self.inner.clone(),
442 path.clone(),
443 self.block_size,
444 Some(known_size),
445 self.download_retry_count,
446 )?)),
447 }
448 }
449
450 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
452 let object_store = Self::local();
453 let absolute_path = expand_path(path.to_string_lossy())?;
454 let os_path = Path::from_absolute_path(absolute_path)?;
455 object_store.create(&os_path).await
456 }
457
458 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
460 let object_store = Self::local();
461 let absolute_path = expand_path(path.to_string_lossy())?;
462 let os_path = Path::from_absolute_path(absolute_path)?;
463 object_store.open(&os_path).await
464 }
465
466 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
468 ObjectWriter::new(self, path).await
469 }
470
471 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
473 let mut writer = self.create(path).await?;
474 writer.write_all(content).await?;
475 writer.shutdown().await
476 }
477
478 pub async fn delete(&self, path: &Path) -> Result<()> {
479 self.inner.delete(path).await?;
480 Ok(())
481 }
482
483 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
484 Ok(self.inner.copy(from, to).await?)
485 }
486
487 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
489 let path = dir_path.into();
490 let path = Path::parse(&path)?;
491 let output = self.inner.list_with_delimiter(Some(&path)).await?;
492 Ok(output
493 .common_prefixes
494 .iter()
495 .chain(output.objects.iter().map(|o| &o.location))
496 .map(|s| s.filename().unwrap().to_string())
497 .collect())
498 }
499
500 pub fn list(
501 &self,
502 path: Option<Path>,
503 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
504 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
505 }
506
507 pub fn read_dir_all<'a, 'b>(
511 &'a self,
512 dir_path: impl Into<&'b Path> + Send,
513 unmodified_since: Option<DateTime<Utc>>,
514 ) -> BoxStream<'a, Result<ObjectMeta>> {
515 self.inner.read_dir_all(dir_path, unmodified_since)
516 }
517
518 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
520 let path = dir_path.into();
521 let path = Path::parse(&path)?;
522
523 if self.is_local() {
524 return super::local::remove_dir_all(&path);
526 }
527 let sub_entries = self
528 .inner
529 .list(Some(&path))
530 .map(|m| m.map(|meta| meta.location))
531 .boxed();
532 self.inner
533 .delete_stream(sub_entries)
534 .try_collect::<Vec<_>>()
535 .await?;
536 Ok(())
537 }
538
539 pub fn remove_stream<'a>(
540 &'a self,
541 locations: BoxStream<'a, Result<Path>>,
542 ) -> BoxStream<'a, Result<Path>> {
543 self.inner
544 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
545 .err_into::<Error>()
546 .boxed()
547 }
548
549 pub async fn exists(&self, path: &Path) -> Result<bool> {
551 match self.inner.head(path).await {
552 Ok(_) => Ok(true),
553 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
554 Err(e) => Err(e.into()),
555 }
556 }
557
558 pub async fn size(&self, path: &Path) -> Result<usize> {
560 Ok(self.inner.head(path).await?.size)
561 }
562
563 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
565 let reader = self.open(path).await?;
566 Ok(reader.get_all().await?)
567 }
568
569 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
574 let reader = self.open(path).await?;
575 Ok(reader.get_range(range).await?)
576 }
577}
578
579#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
581pub enum LanceConfigKey {
582 DownloadRetryCount,
584}
585
586impl FromStr for LanceConfigKey {
587 type Err = Error;
588
589 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
590 match s.to_ascii_lowercase().as_str() {
591 "download_retry_count" => Ok(Self::DownloadRetryCount),
592 _ => Err(Error::InvalidInput {
593 source: format!("Invalid LanceConfigKey: {}", s).into(),
594 location: location!(),
595 }),
596 }
597 }
598}
599
600#[derive(Clone, Debug, Default)]
601pub struct StorageOptions(pub HashMap<String, String>);
602
603impl StorageOptions {
604 pub fn new(options: HashMap<String, String>) -> Self {
606 let mut options = options;
607 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
608 options.insert("allow_http".into(), value);
609 }
610 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
611 options.insert("allow_http".into(), value);
612 }
613 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
614 options.insert("allow_http".into(), value);
615 }
616 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
617 options.insert("client_max_retries".into(), value);
618 }
619 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
620 options.insert("client_retry_timeout".into(), value);
621 }
622 Self(options)
623 }
624
625 pub fn allow_http(&self) -> bool {
627 self.0.iter().any(|(key, value)| {
628 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
629 })
630 }
631
632 pub fn download_retry_count(&self) -> usize {
634 self.0
635 .iter()
636 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
637 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
638 .unwrap_or(3)
639 }
640
641 pub fn client_max_retries(&self) -> usize {
643 self.0
644 .iter()
645 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
646 .and_then(|(_, value)| value.parse::<usize>().ok())
647 .unwrap_or(10)
648 }
649
650 pub fn client_retry_timeout(&self) -> u64 {
652 self.0
653 .iter()
654 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
655 .and_then(|(_, value)| value.parse::<u64>().ok())
656 .unwrap_or(180)
657 }
658
659 pub fn get(&self, key: &str) -> Option<&String> {
660 self.0.get(key)
661 }
662}
663
664impl From<HashMap<String, String>> for StorageOptions {
665 fn from(value: HashMap<String, String>) -> Self {
666 Self::new(value)
667 }
668}
669
670impl ObjectStore {
671 #[allow(clippy::too_many_arguments)]
672 pub fn new(
673 store: Arc<DynObjectStore>,
674 location: Url,
675 block_size: Option<usize>,
676 wrapper: Option<Arc<dyn WrappingObjectStore>>,
677 use_constant_size_upload_parts: bool,
678 list_is_lexically_ordered: bool,
679 io_parallelism: usize,
680 download_retry_count: usize,
681 ) -> Self {
682 let scheme = location.scheme();
683 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
684
685 let store = match wrapper {
686 Some(wrapper) => wrapper.wrap(store),
687 None => store,
688 };
689
690 Self {
691 inner: store,
692 scheme: scheme.into(),
693 block_size,
694 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
695 use_constant_size_upload_parts,
696 list_is_lexically_ordered,
697 io_parallelism,
698 download_retry_count,
699 }
700 }
701}
702
703fn infer_block_size(scheme: &str) -> usize {
704 match scheme {
708 "file" => 4 * 1024,
709 _ => 64 * 1024,
710 }
711}
712
713#[cfg(test)]
714mod tests {
715 use super::*;
716 use object_store::memory::InMemory;
717 use rstest::rstest;
718 use std::env::set_current_dir;
719 use std::fs::{create_dir_all, write};
720 use std::path::Path as StdPath;
721 use std::sync::atomic::{AtomicBool, Ordering};
722
723 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
725 let expanded = tilde(path_str).to_string();
726 let path = StdPath::new(&expanded);
727 std::fs::create_dir_all(path.parent().unwrap())?;
728 write(path, contents)
729 }
730
731 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
732 let test_file_store = store.open(path).await.unwrap();
733 let size = test_file_store.size().await.unwrap();
734 let bytes = test_file_store.get_range(0..size).await.unwrap();
735 let contents = String::from_utf8(bytes.to_vec()).unwrap();
736 Ok(contents)
737 }
738
739 #[tokio::test]
740 async fn test_absolute_paths() {
741 let tmp_dir = tempfile::tempdir().unwrap();
742 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
743 write_to_file(
744 &format!("{tmp_path}/bar/foo.lance/test_file"),
745 "TEST_CONTENT",
746 )
747 .unwrap();
748
749 for uri in &[
751 format!("{tmp_path}/bar/foo.lance"),
752 format!("{tmp_path}/./bar/foo.lance"),
753 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
754 ] {
755 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
756 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
757 .await
758 .unwrap();
759 assert_eq!(contents, "TEST_CONTENT");
760 }
761 }
762
763 #[tokio::test]
764 async fn test_cloud_paths() {
765 let uri = "s3://bucket/foo.lance";
766 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
767 assert_eq!(store.scheme, "s3");
768 assert_eq!(path.to_string(), "foo.lance");
769
770 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
771 .await
772 .unwrap();
773 assert_eq!(store.scheme, "s3");
774 assert_eq!(path.to_string(), "foo.lance");
775
776 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
777 .await
778 .unwrap();
779 assert_eq!(store.scheme, "gs");
780 assert_eq!(path.to_string(), "foo.lance");
781 }
782
783 async fn test_block_size_used_test_helper(
784 uri: &str,
785 storage_options: Option<HashMap<String, String>>,
786 default_expected_block_size: usize,
787 ) {
788 let registry = Arc::new(ObjectStoreRegistry::default());
790 let params = ObjectStoreParams {
791 storage_options: storage_options.clone(),
792 ..ObjectStoreParams::default()
793 };
794 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
795 .await
796 .unwrap();
797 assert_eq!(store.block_size, default_expected_block_size);
798
799 let registry = Arc::new(ObjectStoreRegistry::default());
801 let params = ObjectStoreParams {
802 block_size: Some(1024),
803 storage_options: storage_options.clone(),
804 ..ObjectStoreParams::default()
805 };
806 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
807 .await
808 .unwrap();
809 assert_eq!(store.block_size, 1024);
810 }
811
812 #[rstest]
813 #[case("s3://bucket/foo.lance", None)]
814 #[case("gs://bucket/foo.lance", None)]
815 #[case("az://account/bucket/foo.lance",
816 Some(HashMap::from([
817 (String::from("account_name"), String::from("account")),
818 (String::from("container_name"), String::from("container"))
819 ])))]
820 #[tokio::test]
821 async fn test_block_size_used_cloud(
822 #[case] uri: &str,
823 #[case] storage_options: Option<HashMap<String, String>>,
824 ) {
825 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
826 }
827
828 #[rstest]
829 #[case("file")]
830 #[case("file-object-store")]
831 #[case("memory:///bucket/foo.lance")]
832 #[tokio::test]
833 async fn test_block_size_used_file(#[case] prefix: &str) {
834 let tmp_dir = tempfile::tempdir().unwrap();
835 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
836 let path = format!("{tmp_path}/bar/foo.lance/test_file");
837 write_to_file(&path, "URL").unwrap();
838 let uri = format!("{prefix}:///{path}");
839 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
840 }
841
842 #[tokio::test]
843 async fn test_relative_paths() {
844 let tmp_dir = tempfile::tempdir().unwrap();
845 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
846 write_to_file(
847 &format!("{tmp_path}/bar/foo.lance/test_file"),
848 "RELATIVE_URL",
849 )
850 .unwrap();
851
852 set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
853 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
854
855 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
856 .await
857 .unwrap();
858 assert_eq!(contents, "RELATIVE_URL");
859 }
860
861 #[tokio::test]
862 async fn test_tilde_expansion() {
863 let uri = "~/foo.lance";
864 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
865 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
866 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
867 .await
868 .unwrap();
869 assert_eq!(contents, "TILDE");
870 }
871
872 #[tokio::test]
873 async fn test_read_directory() {
874 let tmp_dir = tempfile::tempdir().unwrap();
875 let path = tmp_dir.path();
876 create_dir_all(path.join("foo").join("bar")).unwrap();
877 create_dir_all(path.join("foo").join("zoo")).unwrap();
878 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
879 write_to_file(
880 path.join("foo").join("test_file").to_str().unwrap(),
881 "read_dir",
882 )
883 .unwrap();
884 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
885
886 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
887 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
888 }
889
890 #[tokio::test]
891 async fn test_delete_directory() {
892 let tmp_dir = tempfile::tempdir().unwrap();
893 let path = tmp_dir.path();
894 create_dir_all(path.join("foo").join("bar")).unwrap();
895 create_dir_all(path.join("foo").join("zoo")).unwrap();
896 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
897 write_to_file(
898 path.join("foo")
899 .join("bar")
900 .join("test_file")
901 .to_str()
902 .unwrap(),
903 "delete",
904 )
905 .unwrap();
906 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
907 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
908 store.remove_dir_all(base.child("foo")).await.unwrap();
909
910 assert!(!path.join("foo").exists());
911 }
912
913 #[derive(Debug)]
914 struct TestWrapper {
915 called: AtomicBool,
916
917 return_value: Arc<dyn OSObjectStore>,
918 }
919
920 impl WrappingObjectStore for TestWrapper {
921 fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
922 self.called.store(true, Ordering::Relaxed);
923
924 self.return_value.clone()
926 }
927 }
928
929 impl TestWrapper {
930 fn called(&self) -> bool {
931 self.called.load(Ordering::Relaxed)
932 }
933 }
934
935 #[tokio::test]
936 async fn test_wrapping_object_store_option_is_used() {
937 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
939 let registry = Arc::new(ObjectStoreRegistry::default());
940
941 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
942
943 let wrapper = Arc::new(TestWrapper {
944 called: AtomicBool::new(false),
945 return_value: mock_inner_store.clone(),
946 });
947
948 let params = ObjectStoreParams {
949 object_store_wrapper: Some(wrapper.clone()),
950 ..ObjectStoreParams::default()
951 };
952
953 assert!(!wrapper.called());
955
956 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
957 .await
958 .unwrap();
959
960 assert!(wrapper.called());
962
963 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
966 }
967
968 #[tokio::test]
969 async fn test_local_paths() {
970 let temp_dir = tempfile::tempdir().unwrap();
971
972 let file_path = temp_dir.path().join("test_file");
973 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
974 .await
975 .unwrap();
976 writer.write_all(b"LOCAL").await.unwrap();
977 writer.shutdown().await.unwrap();
978
979 let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
980 let buf = reader.get_range(0..5).await.unwrap();
981 assert_eq!(buf.as_ref(), b"LOCAL");
982 }
983
984 #[tokio::test]
985 async fn test_read_one() {
986 let temp_dir = tempfile::tempdir().unwrap();
987
988 let file_path = temp_dir.path().join("test_file");
989 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
990 .await
991 .unwrap();
992 writer.write_all(b"LOCAL").await.unwrap();
993 writer.shutdown().await.unwrap();
994
995 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
996 let obj_store = ObjectStore::local();
997 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
998 assert_eq!(buf.as_ref(), b"LOCAL");
999
1000 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1001 assert_eq!(buf.as_ref(), b"LOCAL");
1002 }
1003
1004 #[tokio::test]
1005 #[cfg(windows)]
1006 async fn test_windows_paths() {
1007 use std::path::Component;
1008 use std::path::Prefix;
1009 use std::path::Prefix::*;
1010
1011 fn get_path_prefix(path: &StdPath) -> Prefix {
1012 match path.components().next().unwrap() {
1013 Component::Prefix(prefix_component) => prefix_component.kind(),
1014 _ => panic!(),
1015 }
1016 }
1017
1018 fn get_drive_letter(prefix: Prefix) -> String {
1019 match prefix {
1020 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1021 _ => panic!(),
1022 }
1023 }
1024
1025 let tmp_dir = tempfile::tempdir().unwrap();
1026 let tmp_path = tmp_dir.path();
1027 let prefix = get_path_prefix(tmp_path);
1028 let drive_letter = get_drive_letter(prefix);
1029
1030 write_to_file(
1031 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1032 "WINDOWS",
1033 )
1034 .unwrap();
1035
1036 for uri in &[
1037 format!("{drive_letter}:/test_folder/test.lance"),
1038 format!("{drive_letter}:\\test_folder\\test.lance"),
1039 ] {
1040 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1041 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1042 .await
1043 .unwrap();
1044 assert_eq!(contents, "WINDOWS");
1045 }
1046 }
1047}