1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
56 std::env::var("LANCE_MAX_IOP_SIZE")
57 .map(|val| val.parse().unwrap())
58 .unwrap_or(16 * 1024 * 1024)
59});
60
61pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
62
63pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
64
65#[async_trait]
66pub trait ObjectStoreExt {
67 async fn exists(&self, path: &Path) -> Result<bool>;
69
70 fn read_dir_all<'a, 'b>(
74 &'a self,
75 dir_path: impl Into<&'b Path> + Send,
76 unmodified_since: Option<DateTime<Utc>>,
77 ) -> BoxStream<'a, Result<ObjectMeta>>;
78}
79
80#[async_trait]
81impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
82 fn read_dir_all<'a, 'b>(
83 &'a self,
84 dir_path: impl Into<&'b Path> + Send,
85 unmodified_since: Option<DateTime<Utc>>,
86 ) -> BoxStream<'a, Result<ObjectMeta>> {
87 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
88 if let Some(unmodified_since_val) = unmodified_since {
89 output
90 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
91 .boxed()
92 } else {
93 output.boxed()
94 }
95 }
96
97 async fn exists(&self, path: &Path) -> Result<bool> {
98 match self.head(path).await {
99 Ok(_) => Ok(true),
100 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
101 Err(e) => Err(e.into()),
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct ObjectStore {
109 pub inner: Arc<dyn OSObjectStore>,
111 scheme: String,
112 block_size: usize,
113 max_iop_size: u64,
114 pub use_constant_size_upload_parts: bool,
117 pub list_is_lexically_ordered: bool,
120 io_parallelism: usize,
121 download_retry_count: usize,
123}
124
125impl DeepSizeOf for ObjectStore {
126 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
127 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
132 }
133}
134
135impl std::fmt::Display for ObjectStore {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 write!(f, "ObjectStore({})", self.scheme)
138 }
139}
140
141pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
142 fn wrap(
147 &self,
148 original: Arc<dyn OSObjectStore>,
149 storage_options: Option<&HashMap<String, String>>,
150 ) -> Arc<dyn OSObjectStore>;
151}
152
153#[derive(Debug, Clone)]
154pub struct ChainedWrappingObjectStore {
155 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
156}
157
158impl ChainedWrappingObjectStore {
159 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
160 Self { wrappers }
161 }
162
163 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
164 self.wrappers.push(wrapper);
165 }
166}
167
168impl WrappingObjectStore for ChainedWrappingObjectStore {
169 fn wrap(
170 &self,
171 original: Arc<dyn OSObjectStore>,
172 storage_options: Option<&HashMap<String, String>>,
173 ) -> Arc<dyn OSObjectStore> {
174 self.wrappers
175 .iter()
176 .fold(original, |acc, wrapper| wrapper.wrap(acc, storage_options))
177 }
178}
179
180#[derive(Debug, Clone)]
183pub struct ObjectStoreParams {
184 pub block_size: Option<usize>,
185 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
186 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
187 pub s3_credentials_refresh_offset: Duration,
188 #[cfg(feature = "aws")]
189 pub aws_credentials: Option<AwsCredentialProvider>,
190 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
191 pub storage_options: Option<HashMap<String, String>>,
192 pub use_constant_size_upload_parts: bool,
197 pub list_is_lexically_ordered: Option<bool>,
198}
199
200impl Default for ObjectStoreParams {
201 fn default() -> Self {
202 #[allow(deprecated)]
203 Self {
204 object_store: None,
205 block_size: None,
206 s3_credentials_refresh_offset: Duration::from_secs(60),
207 #[cfg(feature = "aws")]
208 aws_credentials: None,
209 object_store_wrapper: None,
210 storage_options: None,
211 use_constant_size_upload_parts: false,
212 list_is_lexically_ordered: None,
213 }
214 }
215}
216
217impl std::hash::Hash for ObjectStoreParams {
219 #[allow(deprecated)]
220 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
221 self.block_size.hash(state);
223 if let Some((store, url)) = &self.object_store {
224 Arc::as_ptr(store).hash(state);
225 url.hash(state);
226 }
227 self.s3_credentials_refresh_offset.hash(state);
228 #[cfg(feature = "aws")]
229 if let Some(aws_credentials) = &self.aws_credentials {
230 Arc::as_ptr(aws_credentials).hash(state);
231 }
232 if let Some(wrapper) = &self.object_store_wrapper {
233 Arc::as_ptr(wrapper).hash(state);
234 }
235 if let Some(storage_options) = &self.storage_options {
236 for (key, value) in storage_options {
237 key.hash(state);
238 value.hash(state);
239 }
240 }
241 self.use_constant_size_upload_parts.hash(state);
242 self.list_is_lexically_ordered.hash(state);
243 }
244}
245
246impl Eq for ObjectStoreParams {}
248impl PartialEq for ObjectStoreParams {
249 #[allow(deprecated)]
250 fn eq(&self, other: &Self) -> bool {
251 #[cfg(feature = "aws")]
252 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
253 return false;
254 }
255
256 self.block_size == other.block_size
258 && self
259 .object_store
260 .as_ref()
261 .map(|(store, url)| (Arc::as_ptr(store), url))
262 == other
263 .object_store
264 .as_ref()
265 .map(|(store, url)| (Arc::as_ptr(store), url))
266 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
267 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
268 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
269 && self.storage_options == other.storage_options
270 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
271 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
272 }
273}
274
275fn uri_to_url(uri: &str) -> Result<Url> {
276 match Url::parse(uri) {
277 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
278 local_path_to_url(uri)
280 }
281 Ok(url) => Ok(url),
282 Err(_) => local_path_to_url(uri),
283 }
284}
285
286fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
287 let expanded = tilde(str_path.as_ref()).to_string();
288
289 let mut expanded_path = path_abs::PathAbs::new(expanded)
290 .unwrap()
291 .as_path()
292 .to_path_buf();
293 if let Some(s) = expanded_path.as_path().to_str() {
295 if s.is_empty() {
296 expanded_path = std::env::current_dir()?;
297 }
298 }
299
300 Ok(expanded_path)
301}
302
303fn local_path_to_url(str_path: &str) -> Result<Url> {
304 let expanded_path = expand_path(str_path)?;
305
306 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
307 source: format!("Invalid table location: '{}'", str_path).into(),
308 location: location!(),
309 })
310}
311
312impl ObjectStore {
313 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
321 let registry = Arc::new(ObjectStoreRegistry::default());
322
323 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
324 }
325
326 pub async fn from_uri_and_params(
330 registry: Arc<ObjectStoreRegistry>,
331 uri: &str,
332 params: &ObjectStoreParams,
333 ) -> Result<(Arc<Self>, Path)> {
334 #[allow(deprecated)]
335 if let Some((store, path)) = params.object_store.as_ref() {
336 let mut inner = store.clone();
337 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
338 inner = wrapper.wrap(inner, params.storage_options.as_ref());
339 }
340 let store = Self {
341 inner,
342 scheme: path.scheme().to_string(),
343 block_size: params.block_size.unwrap_or(64 * 1024),
344 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
345 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
346 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
347 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
348 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
349 };
350 let path = Path::parse(path.path())?;
351 return Ok((Arc::new(store), path));
352 }
353 let url = uri_to_url(uri)?;
354 let store = registry.get_store(url.clone(), params).await?;
355 let provider = registry.get_provider(url.scheme()).expect_ok()?;
357 let path = provider.extract_path(&url)?;
358
359 Ok((store, path))
360 }
361
362 #[deprecated(note = "Use `from_uri` instead")]
363 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
364 Self::from_uri_and_params(
365 Arc::new(ObjectStoreRegistry::default()),
366 str_path,
367 &Default::default(),
368 )
369 .now_or_never()
370 .unwrap()
371 }
372
373 pub fn local() -> Self {
375 let provider = FileStoreProvider;
376 provider
377 .new_store(Url::parse("file:///").unwrap(), &Default::default())
378 .now_or_never()
379 .unwrap()
380 .unwrap()
381 }
382
383 pub fn memory() -> Self {
385 let provider = MemoryStoreProvider;
386 provider
387 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
388 .now_or_never()
389 .unwrap()
390 .unwrap()
391 }
392
393 pub fn is_local(&self) -> bool {
395 self.scheme == "file"
396 }
397
398 pub fn is_cloud(&self) -> bool {
399 self.scheme != "file" && self.scheme != "memory"
400 }
401
402 pub fn scheme(&self) -> &str {
403 &self.scheme
404 }
405
406 pub fn block_size(&self) -> usize {
407 self.block_size
408 }
409
410 pub fn max_iop_size(&self) -> u64 {
411 self.max_iop_size
412 }
413
414 pub fn io_parallelism(&self) -> usize {
415 std::env::var("LANCE_IO_THREADS")
416 .map(|val| val.parse::<usize>().unwrap())
417 .unwrap_or(self.io_parallelism)
418 }
419
420 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
425 match self.scheme.as_str() {
426 "file" => LocalObjectReader::open(path, self.block_size, None).await,
427 _ => Ok(Box::new(CloudObjectReader::new(
428 self.inner.clone(),
429 path.clone(),
430 self.block_size,
431 None,
432 self.download_retry_count,
433 )?)),
434 }
435 }
436
437 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
443 if known_size <= self.block_size {
446 return Ok(Box::new(SmallReader::new(
447 self.inner.clone(),
448 path.clone(),
449 self.download_retry_count,
450 known_size,
451 )));
452 }
453
454 match self.scheme.as_str() {
455 "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
456 _ => Ok(Box::new(CloudObjectReader::new(
457 self.inner.clone(),
458 path.clone(),
459 self.block_size,
460 Some(known_size),
461 self.download_retry_count,
462 )?)),
463 }
464 }
465
466 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
468 let object_store = Self::local();
469 let absolute_path = expand_path(path.to_string_lossy())?;
470 let os_path = Path::from_absolute_path(absolute_path)?;
471 object_store.create(&os_path).await
472 }
473
474 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
476 let object_store = Self::local();
477 let absolute_path = expand_path(path.to_string_lossy())?;
478 let os_path = Path::from_absolute_path(absolute_path)?;
479 object_store.open(&os_path).await
480 }
481
482 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
484 ObjectWriter::new(self, path).await
485 }
486
487 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
489 let mut writer = self.create(path).await?;
490 writer.write_all(content).await?;
491 writer.shutdown().await
492 }
493
494 pub async fn delete(&self, path: &Path) -> Result<()> {
495 self.inner.delete(path).await?;
496 Ok(())
497 }
498
499 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
500 if self.is_local() {
501 return super::local::copy_file(from, to);
503 }
504 Ok(self.inner.copy(from, to).await?)
505 }
506
507 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
509 let path = dir_path.into();
510 let path = Path::parse(&path)?;
511 let output = self.inner.list_with_delimiter(Some(&path)).await?;
512 Ok(output
513 .common_prefixes
514 .iter()
515 .chain(output.objects.iter().map(|o| &o.location))
516 .map(|s| s.filename().unwrap().to_string())
517 .collect())
518 }
519
520 pub fn list(
521 &self,
522 path: Option<Path>,
523 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
524 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
525 }
526
527 pub fn read_dir_all<'a, 'b>(
531 &'a self,
532 dir_path: impl Into<&'b Path> + Send,
533 unmodified_since: Option<DateTime<Utc>>,
534 ) -> BoxStream<'a, Result<ObjectMeta>> {
535 self.inner.read_dir_all(dir_path, unmodified_since)
536 }
537
538 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
540 let path = dir_path.into();
541 let path = Path::parse(&path)?;
542
543 if self.is_local() {
544 return super::local::remove_dir_all(&path);
546 }
547 let sub_entries = self
548 .inner
549 .list(Some(&path))
550 .map(|m| m.map(|meta| meta.location))
551 .boxed();
552 self.inner
553 .delete_stream(sub_entries)
554 .try_collect::<Vec<_>>()
555 .await?;
556 Ok(())
557 }
558
559 pub fn remove_stream<'a>(
560 &'a self,
561 locations: BoxStream<'a, Result<Path>>,
562 ) -> BoxStream<'a, Result<Path>> {
563 self.inner
564 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
565 .err_into::<Error>()
566 .boxed()
567 }
568
569 pub async fn exists(&self, path: &Path) -> Result<bool> {
571 match self.inner.head(path).await {
572 Ok(_) => Ok(true),
573 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
574 Err(e) => Err(e.into()),
575 }
576 }
577
578 pub async fn size(&self, path: &Path) -> Result<u64> {
580 Ok(self.inner.head(path).await?.size)
581 }
582
583 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
585 let reader = self.open(path).await?;
586 Ok(reader.get_all().await?)
587 }
588
589 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
594 let reader = self.open(path).await?;
595 Ok(reader.get_range(range).await?)
596 }
597}
598
599#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
601pub enum LanceConfigKey {
602 DownloadRetryCount,
604}
605
606impl FromStr for LanceConfigKey {
607 type Err = Error;
608
609 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
610 match s.to_ascii_lowercase().as_str() {
611 "download_retry_count" => Ok(Self::DownloadRetryCount),
612 _ => Err(Error::InvalidInput {
613 source: format!("Invalid LanceConfigKey: {}", s).into(),
614 location: location!(),
615 }),
616 }
617 }
618}
619
620#[derive(Clone, Debug, Default)]
621pub struct StorageOptions(pub HashMap<String, String>);
622
623impl StorageOptions {
624 pub fn new(options: HashMap<String, String>) -> Self {
626 let mut options = options;
627 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
628 options.insert("allow_http".into(), value);
629 }
630 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
631 options.insert("allow_http".into(), value);
632 }
633 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
634 options.insert("allow_http".into(), value);
635 }
636 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
637 options.insert("client_max_retries".into(), value);
638 }
639 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
640 options.insert("client_retry_timeout".into(), value);
641 }
642 Self(options)
643 }
644
645 pub fn allow_http(&self) -> bool {
647 self.0.iter().any(|(key, value)| {
648 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
649 })
650 }
651
652 pub fn download_retry_count(&self) -> usize {
654 self.0
655 .iter()
656 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
657 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
658 .unwrap_or(3)
659 }
660
661 pub fn client_max_retries(&self) -> usize {
663 self.0
664 .iter()
665 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
666 .and_then(|(_, value)| value.parse::<usize>().ok())
667 .unwrap_or(10)
668 }
669
670 pub fn client_retry_timeout(&self) -> u64 {
672 self.0
673 .iter()
674 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
675 .and_then(|(_, value)| value.parse::<u64>().ok())
676 .unwrap_or(180)
677 }
678
679 pub fn get(&self, key: &str) -> Option<&String> {
680 self.0.get(key)
681 }
682}
683
684impl From<HashMap<String, String>> for StorageOptions {
685 fn from(value: HashMap<String, String>) -> Self {
686 Self::new(value)
687 }
688}
689
690impl ObjectStore {
691 #[allow(clippy::too_many_arguments)]
692 pub fn new(
693 store: Arc<DynObjectStore>,
694 location: Url,
695 block_size: Option<usize>,
696 wrapper: Option<Arc<dyn WrappingObjectStore>>,
697 use_constant_size_upload_parts: bool,
698 list_is_lexically_ordered: bool,
699 io_parallelism: usize,
700 download_retry_count: usize,
701 storage_options: Option<&HashMap<String, String>>,
702 ) -> Self {
703 let scheme = location.scheme();
704 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
705
706 let store = match wrapper {
707 Some(wrapper) => wrapper.wrap(store, storage_options),
708 None => store,
709 };
710
711 Self {
712 inner: store,
713 scheme: scheme.into(),
714 block_size,
715 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
716 use_constant_size_upload_parts,
717 list_is_lexically_ordered,
718 io_parallelism,
719 download_retry_count,
720 }
721 }
722}
723
724fn infer_block_size(scheme: &str) -> usize {
725 match scheme {
729 "file" => 4 * 1024,
730 _ => 64 * 1024,
731 }
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737 use object_store::memory::InMemory;
738 use rstest::rstest;
739 use std::env::set_current_dir;
740 use std::fs::{create_dir_all, write};
741 use std::path::Path as StdPath;
742 use std::sync::atomic::{AtomicBool, Ordering};
743
744 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
746 let expanded = tilde(path_str).to_string();
747 let path = StdPath::new(&expanded);
748 std::fs::create_dir_all(path.parent().unwrap())?;
749 write(path, contents)
750 }
751
752 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
753 let test_file_store = store.open(path).await.unwrap();
754 let size = test_file_store.size().await.unwrap();
755 let bytes = test_file_store.get_range(0..size).await.unwrap();
756 let contents = String::from_utf8(bytes.to_vec()).unwrap();
757 Ok(contents)
758 }
759
760 #[tokio::test]
761 async fn test_absolute_paths() {
762 let tmp_dir = tempfile::tempdir().unwrap();
763 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
764 write_to_file(
765 &format!("{tmp_path}/bar/foo.lance/test_file"),
766 "TEST_CONTENT",
767 )
768 .unwrap();
769
770 for uri in &[
772 format!("{tmp_path}/bar/foo.lance"),
773 format!("{tmp_path}/./bar/foo.lance"),
774 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
775 ] {
776 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
777 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
778 .await
779 .unwrap();
780 assert_eq!(contents, "TEST_CONTENT");
781 }
782 }
783
784 #[tokio::test]
785 async fn test_cloud_paths() {
786 let uri = "s3://bucket/foo.lance";
787 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
788 assert_eq!(store.scheme, "s3");
789 assert_eq!(path.to_string(), "foo.lance");
790
791 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
792 .await
793 .unwrap();
794 assert_eq!(store.scheme, "s3");
795 assert_eq!(path.to_string(), "foo.lance");
796
797 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
798 .await
799 .unwrap();
800 assert_eq!(store.scheme, "gs");
801 assert_eq!(path.to_string(), "foo.lance");
802 }
803
804 async fn test_block_size_used_test_helper(
805 uri: &str,
806 storage_options: Option<HashMap<String, String>>,
807 default_expected_block_size: usize,
808 ) {
809 let registry = Arc::new(ObjectStoreRegistry::default());
811 let params = ObjectStoreParams {
812 storage_options: storage_options.clone(),
813 ..ObjectStoreParams::default()
814 };
815 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
816 .await
817 .unwrap();
818 assert_eq!(store.block_size, default_expected_block_size);
819
820 let registry = Arc::new(ObjectStoreRegistry::default());
822 let params = ObjectStoreParams {
823 block_size: Some(1024),
824 storage_options: storage_options.clone(),
825 ..ObjectStoreParams::default()
826 };
827 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
828 .await
829 .unwrap();
830 assert_eq!(store.block_size, 1024);
831 }
832
833 #[rstest]
834 #[case("s3://bucket/foo.lance", None)]
835 #[case("gs://bucket/foo.lance", None)]
836 #[case("az://account/bucket/foo.lance",
837 Some(HashMap::from([
838 (String::from("account_name"), String::from("account")),
839 (String::from("container_name"), String::from("container"))
840 ])))]
841 #[tokio::test]
842 async fn test_block_size_used_cloud(
843 #[case] uri: &str,
844 #[case] storage_options: Option<HashMap<String, String>>,
845 ) {
846 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
847 }
848
849 #[rstest]
850 #[case("file")]
851 #[case("file-object-store")]
852 #[case("memory:///bucket/foo.lance")]
853 #[tokio::test]
854 async fn test_block_size_used_file(#[case] prefix: &str) {
855 let tmp_dir = tempfile::tempdir().unwrap();
856 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
857 let path = format!("{tmp_path}/bar/foo.lance/test_file");
858 write_to_file(&path, "URL").unwrap();
859 let uri = format!("{prefix}:///{path}");
860 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
861 }
862
863 #[tokio::test]
864 async fn test_relative_paths() {
865 let tmp_dir = tempfile::tempdir().unwrap();
866 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
867 write_to_file(
868 &format!("{tmp_path}/bar/foo.lance/test_file"),
869 "RELATIVE_URL",
870 )
871 .unwrap();
872
873 set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
874 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
875
876 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
877 .await
878 .unwrap();
879 assert_eq!(contents, "RELATIVE_URL");
880 }
881
882 #[tokio::test]
883 async fn test_tilde_expansion() {
884 let uri = "~/foo.lance";
885 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
886 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
887 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
888 .await
889 .unwrap();
890 assert_eq!(contents, "TILDE");
891 }
892
893 #[tokio::test]
894 async fn test_read_directory() {
895 let tmp_dir = tempfile::tempdir().unwrap();
896 let path = tmp_dir.path();
897 create_dir_all(path.join("foo").join("bar")).unwrap();
898 create_dir_all(path.join("foo").join("zoo")).unwrap();
899 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
900 write_to_file(
901 path.join("foo").join("test_file").to_str().unwrap(),
902 "read_dir",
903 )
904 .unwrap();
905 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
906
907 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
908 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
909 }
910
911 #[tokio::test]
912 async fn test_delete_directory() {
913 let tmp_dir = tempfile::tempdir().unwrap();
914 let path = tmp_dir.path();
915 create_dir_all(path.join("foo").join("bar")).unwrap();
916 create_dir_all(path.join("foo").join("zoo")).unwrap();
917 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
918 write_to_file(
919 path.join("foo")
920 .join("bar")
921 .join("test_file")
922 .to_str()
923 .unwrap(),
924 "delete",
925 )
926 .unwrap();
927 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
928 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
929 store.remove_dir_all(base.child("foo")).await.unwrap();
930
931 assert!(!path.join("foo").exists());
932 }
933
934 #[derive(Debug)]
935 struct TestWrapper {
936 called: AtomicBool,
937
938 return_value: Arc<dyn OSObjectStore>,
939 }
940
941 impl WrappingObjectStore for TestWrapper {
942 fn wrap(
943 &self,
944 _original: Arc<dyn OSObjectStore>,
945 _storage_options: Option<&HashMap<String, String>>,
946 ) -> Arc<dyn OSObjectStore> {
947 self.called.store(true, Ordering::Relaxed);
948
949 self.return_value.clone()
951 }
952 }
953
954 impl TestWrapper {
955 fn called(&self) -> bool {
956 self.called.load(Ordering::Relaxed)
957 }
958 }
959
960 #[tokio::test]
961 async fn test_wrapping_object_store_option_is_used() {
962 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
964 let registry = Arc::new(ObjectStoreRegistry::default());
965
966 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
967
968 let wrapper = Arc::new(TestWrapper {
969 called: AtomicBool::new(false),
970 return_value: mock_inner_store.clone(),
971 });
972
973 let params = ObjectStoreParams {
974 object_store_wrapper: Some(wrapper.clone()),
975 ..ObjectStoreParams::default()
976 };
977
978 assert!(!wrapper.called());
980
981 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
982 .await
983 .unwrap();
984
985 assert!(wrapper.called());
987
988 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
991 }
992
993 #[tokio::test]
994 async fn test_local_paths() {
995 let temp_dir = tempfile::tempdir().unwrap();
996
997 let file_path = temp_dir.path().join("test_file");
998 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
999 .await
1000 .unwrap();
1001 writer.write_all(b"LOCAL").await.unwrap();
1002 writer.shutdown().await.unwrap();
1003
1004 let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
1005 let buf = reader.get_range(0..5).await.unwrap();
1006 assert_eq!(buf.as_ref(), b"LOCAL");
1007 }
1008
1009 #[tokio::test]
1010 async fn test_read_one() {
1011 let temp_dir = tempfile::tempdir().unwrap();
1012
1013 let file_path = temp_dir.path().join("test_file");
1014 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
1015 .await
1016 .unwrap();
1017 writer.write_all(b"LOCAL").await.unwrap();
1018 writer.shutdown().await.unwrap();
1019
1020 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1021 let obj_store = ObjectStore::local();
1022 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1023 assert_eq!(buf.as_ref(), b"LOCAL");
1024
1025 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1026 assert_eq!(buf.as_ref(), b"LOCAL");
1027 }
1028
1029 #[tokio::test]
1030 #[cfg(windows)]
1031 async fn test_windows_paths() {
1032 use std::path::Component;
1033 use std::path::Prefix;
1034 use std::path::Prefix::*;
1035
1036 fn get_path_prefix(path: &StdPath) -> Prefix {
1037 match path.components().next().unwrap() {
1038 Component::Prefix(prefix_component) => prefix_component.kind(),
1039 _ => panic!(),
1040 }
1041 }
1042
1043 fn get_drive_letter(prefix: Prefix) -> String {
1044 match prefix {
1045 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1046 _ => panic!(),
1047 }
1048 }
1049
1050 let tmp_dir = tempfile::tempdir().unwrap();
1051 let tmp_path = tmp_dir.path();
1052 let prefix = get_path_prefix(tmp_path);
1053 let drive_letter = get_drive_letter(prefix);
1054
1055 write_to_file(
1056 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1057 "WINDOWS",
1058 )
1059 .unwrap();
1060
1061 for uri in &[
1062 format!("{drive_letter}:/test_folder/test.lance"),
1063 format!("{drive_letter}:\\test_folder\\test.lance"),
1064 ] {
1065 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1066 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1067 .await
1068 .unwrap();
1069 assert_eq!(contents, "WINDOWS");
1070 }
1071 }
1072
1073 #[tokio::test]
1074 async fn test_cross_filesystem_copy() {
1075 let source_dir = tempfile::tempdir().unwrap();
1077 let dest_dir = tempfile::tempdir().unwrap();
1078
1079 let source_file_name = "test_file.txt";
1081 let source_file = source_dir.path().join(source_file_name);
1082 std::fs::write(&source_file, b"test content").unwrap();
1083
1084 let (store, base_path) = ObjectStore::from_uri(source_dir.path().to_str().unwrap())
1086 .await
1087 .unwrap();
1088
1089 let from_path = base_path.child(source_file_name);
1091
1092 let dest_file = dest_dir.path().join("copied_file.txt");
1094 let dest_str = dest_file.to_str().unwrap();
1095 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1096
1097 store.copy(&from_path, &to_path).await.unwrap();
1099
1100 assert!(dest_file.exists());
1102 let copied_content = std::fs::read(&dest_file).unwrap();
1103 assert_eq!(copied_content, b"test content");
1104 }
1105
1106 #[tokio::test]
1107 async fn test_copy_creates_parent_directories() {
1108 let source_dir = tempfile::tempdir().unwrap();
1109 let dest_dir = tempfile::tempdir().unwrap();
1110
1111 let source_file_name = "test_file.txt";
1113 let source_file = source_dir.path().join(source_file_name);
1114 std::fs::write(&source_file, b"test content").unwrap();
1115
1116 let (store, base_path) = ObjectStore::from_uri(source_dir.path().to_str().unwrap())
1118 .await
1119 .unwrap();
1120
1121 let from_path = base_path.child(source_file_name);
1123
1124 let dest_file = dest_dir
1126 .path()
1127 .join("nested")
1128 .join("dirs")
1129 .join("copied_file.txt");
1130 let dest_str = dest_file.to_str().unwrap();
1131 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1132
1133 store.copy(&from_path, &to_path).await.unwrap();
1135
1136 assert!(dest_file.exists());
1138 assert!(dest_file.parent().unwrap().exists());
1139 let copied_content = std::fs::read(&dest_file).unwrap();
1140 assert_eq!(copied_content, b"test content");
1141 }
1142}