1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
56 std::env::var("LANCE_MAX_IOP_SIZE")
57 .map(|val| val.parse().unwrap())
58 .unwrap_or(16 * 1024 * 1024)
59});
60
61pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
62
63pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
64
65#[async_trait]
66pub trait ObjectStoreExt {
67 async fn exists(&self, path: &Path) -> Result<bool>;
69
70 fn read_dir_all<'a, 'b>(
74 &'a self,
75 dir_path: impl Into<&'b Path> + Send,
76 unmodified_since: Option<DateTime<Utc>>,
77 ) -> BoxStream<'a, Result<ObjectMeta>>;
78}
79
80#[async_trait]
81impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
82 fn read_dir_all<'a, 'b>(
83 &'a self,
84 dir_path: impl Into<&'b Path> + Send,
85 unmodified_since: Option<DateTime<Utc>>,
86 ) -> BoxStream<'a, Result<ObjectMeta>> {
87 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
88 if let Some(unmodified_since_val) = unmodified_since {
89 output
90 .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
91 .boxed()
92 } else {
93 output.boxed()
94 }
95 }
96
97 async fn exists(&self, path: &Path) -> Result<bool> {
98 match self.head(path).await {
99 Ok(_) => Ok(true),
100 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
101 Err(e) => Err(e.into()),
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct ObjectStore {
109 pub inner: Arc<dyn OSObjectStore>,
111 scheme: String,
112 block_size: usize,
113 max_iop_size: u64,
114 pub use_constant_size_upload_parts: bool,
117 pub list_is_lexically_ordered: bool,
120 io_parallelism: usize,
121 download_retry_count: usize,
123}
124
125impl DeepSizeOf for ObjectStore {
126 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
127 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
132 }
133}
134
135impl std::fmt::Display for ObjectStore {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 write!(f, "ObjectStore({})", self.scheme)
138 }
139}
140
141pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
142 fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
143}
144
145#[derive(Debug, Clone)]
146pub struct ChainedWrappingObjectStore {
147 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
148}
149
150impl ChainedWrappingObjectStore {
151 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
152 Self { wrappers }
153 }
154
155 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
156 self.wrappers.push(wrapper);
157 }
158}
159
160impl WrappingObjectStore for ChainedWrappingObjectStore {
161 fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
162 self.wrappers
163 .iter()
164 .fold(original, |acc, wrapper| wrapper.wrap(acc))
165 }
166}
167
168#[derive(Debug, Clone)]
171pub struct ObjectStoreParams {
172 pub block_size: Option<usize>,
173 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
174 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
175 pub s3_credentials_refresh_offset: Duration,
176 #[cfg(feature = "aws")]
177 pub aws_credentials: Option<AwsCredentialProvider>,
178 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
179 pub storage_options: Option<HashMap<String, String>>,
180 pub use_constant_size_upload_parts: bool,
185 pub list_is_lexically_ordered: Option<bool>,
186}
187
188impl Default for ObjectStoreParams {
189 fn default() -> Self {
190 #[allow(deprecated)]
191 Self {
192 object_store: None,
193 block_size: None,
194 s3_credentials_refresh_offset: Duration::from_secs(60),
195 #[cfg(feature = "aws")]
196 aws_credentials: None,
197 object_store_wrapper: None,
198 storage_options: None,
199 use_constant_size_upload_parts: false,
200 list_is_lexically_ordered: None,
201 }
202 }
203}
204
205impl std::hash::Hash for ObjectStoreParams {
207 #[allow(deprecated)]
208 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
209 self.block_size.hash(state);
211 if let Some((store, url)) = &self.object_store {
212 Arc::as_ptr(store).hash(state);
213 url.hash(state);
214 }
215 self.s3_credentials_refresh_offset.hash(state);
216 #[cfg(feature = "aws")]
217 if let Some(aws_credentials) = &self.aws_credentials {
218 Arc::as_ptr(aws_credentials).hash(state);
219 }
220 if let Some(wrapper) = &self.object_store_wrapper {
221 Arc::as_ptr(wrapper).hash(state);
222 }
223 if let Some(storage_options) = &self.storage_options {
224 for (key, value) in storage_options {
225 key.hash(state);
226 value.hash(state);
227 }
228 }
229 self.use_constant_size_upload_parts.hash(state);
230 self.list_is_lexically_ordered.hash(state);
231 }
232}
233
234impl Eq for ObjectStoreParams {}
236impl PartialEq for ObjectStoreParams {
237 #[allow(deprecated)]
238 fn eq(&self, other: &Self) -> bool {
239 #[cfg(feature = "aws")]
240 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
241 return false;
242 }
243
244 self.block_size == other.block_size
246 && self
247 .object_store
248 .as_ref()
249 .map(|(store, url)| (Arc::as_ptr(store), url))
250 == other
251 .object_store
252 .as_ref()
253 .map(|(store, url)| (Arc::as_ptr(store), url))
254 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
255 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
256 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
257 && self.storage_options == other.storage_options
258 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
259 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
260 }
261}
262
263fn uri_to_url(uri: &str) -> Result<Url> {
264 match Url::parse(uri) {
265 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
266 local_path_to_url(uri)
268 }
269 Ok(url) => Ok(url),
270 Err(_) => local_path_to_url(uri),
271 }
272}
273
274fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
275 let expanded = tilde(str_path.as_ref()).to_string();
276
277 let mut expanded_path = path_abs::PathAbs::new(expanded)
278 .unwrap()
279 .as_path()
280 .to_path_buf();
281 if let Some(s) = expanded_path.as_path().to_str() {
283 if s.is_empty() {
284 expanded_path = std::env::current_dir()?;
285 }
286 }
287
288 Ok(expanded_path)
289}
290
291fn local_path_to_url(str_path: &str) -> Result<Url> {
292 let expanded_path = expand_path(str_path)?;
293
294 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
295 source: format!("Invalid table location: '{}'", str_path).into(),
296 location: location!(),
297 })
298}
299
300impl ObjectStore {
301 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
309 let registry = Arc::new(ObjectStoreRegistry::default());
310
311 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
312 }
313
314 pub async fn from_uri_and_params(
318 registry: Arc<ObjectStoreRegistry>,
319 uri: &str,
320 params: &ObjectStoreParams,
321 ) -> Result<(Arc<Self>, Path)> {
322 #[allow(deprecated)]
323 if let Some((store, path)) = params.object_store.as_ref() {
324 let mut inner = store.clone();
325 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
326 inner = wrapper.wrap(inner);
327 }
328 let store = Self {
329 inner,
330 scheme: path.scheme().to_string(),
331 block_size: params.block_size.unwrap_or(64 * 1024),
332 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
333 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
334 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
335 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
336 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
337 };
338 let path = Path::parse(path.path())?;
339 return Ok((Arc::new(store), path));
340 }
341 let url = uri_to_url(uri)?;
342 let store = registry.get_store(url.clone(), params).await?;
343 let provider = registry.get_provider(url.scheme()).expect_ok()?;
345 let path = provider.extract_path(&url)?;
346
347 Ok((store, path))
348 }
349
350 #[deprecated(note = "Use `from_uri` instead")]
351 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
352 Self::from_uri_and_params(
353 Arc::new(ObjectStoreRegistry::default()),
354 str_path,
355 &Default::default(),
356 )
357 .now_or_never()
358 .unwrap()
359 }
360
361 pub fn local() -> Self {
363 let provider = FileStoreProvider;
364 provider
365 .new_store(Url::parse("file:///").unwrap(), &Default::default())
366 .now_or_never()
367 .unwrap()
368 .unwrap()
369 }
370
371 pub fn memory() -> Self {
373 let provider = MemoryStoreProvider;
374 provider
375 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
376 .now_or_never()
377 .unwrap()
378 .unwrap()
379 }
380
381 pub fn is_local(&self) -> bool {
383 self.scheme == "file"
384 }
385
386 pub fn is_cloud(&self) -> bool {
387 self.scheme != "file" && self.scheme != "memory"
388 }
389
390 pub fn scheme(&self) -> &str {
391 &self.scheme
392 }
393
394 pub fn block_size(&self) -> usize {
395 self.block_size
396 }
397
398 pub fn max_iop_size(&self) -> u64 {
399 self.max_iop_size
400 }
401
402 pub fn io_parallelism(&self) -> usize {
403 std::env::var("LANCE_IO_THREADS")
404 .map(|val| val.parse::<usize>().unwrap())
405 .unwrap_or(self.io_parallelism)
406 }
407
408 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
413 match self.scheme.as_str() {
414 "file" => LocalObjectReader::open(path, self.block_size, None).await,
415 _ => Ok(Box::new(CloudObjectReader::new(
416 self.inner.clone(),
417 path.clone(),
418 self.block_size,
419 None,
420 self.download_retry_count,
421 )?)),
422 }
423 }
424
425 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
431 if known_size <= self.block_size {
434 return Ok(Box::new(SmallReader::new(
435 self.inner.clone(),
436 path.clone(),
437 self.download_retry_count,
438 known_size,
439 )));
440 }
441
442 match self.scheme.as_str() {
443 "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
444 _ => Ok(Box::new(CloudObjectReader::new(
445 self.inner.clone(),
446 path.clone(),
447 self.block_size,
448 Some(known_size),
449 self.download_retry_count,
450 )?)),
451 }
452 }
453
454 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
456 let object_store = Self::local();
457 let absolute_path = expand_path(path.to_string_lossy())?;
458 let os_path = Path::from_absolute_path(absolute_path)?;
459 object_store.create(&os_path).await
460 }
461
462 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
464 let object_store = Self::local();
465 let absolute_path = expand_path(path.to_string_lossy())?;
466 let os_path = Path::from_absolute_path(absolute_path)?;
467 object_store.open(&os_path).await
468 }
469
470 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
472 ObjectWriter::new(self, path).await
473 }
474
475 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
477 let mut writer = self.create(path).await?;
478 writer.write_all(content).await?;
479 writer.shutdown().await
480 }
481
482 pub async fn delete(&self, path: &Path) -> Result<()> {
483 self.inner.delete(path).await?;
484 Ok(())
485 }
486
487 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
488 if self.is_local() {
489 return super::local::copy_file(from, to);
491 }
492 Ok(self.inner.copy(from, to).await?)
493 }
494
495 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
497 let path = dir_path.into();
498 let path = Path::parse(&path)?;
499 let output = self.inner.list_with_delimiter(Some(&path)).await?;
500 Ok(output
501 .common_prefixes
502 .iter()
503 .chain(output.objects.iter().map(|o| &o.location))
504 .map(|s| s.filename().unwrap().to_string())
505 .collect())
506 }
507
508 pub fn list(
509 &self,
510 path: Option<Path>,
511 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
512 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
513 }
514
515 pub fn read_dir_all<'a, 'b>(
519 &'a self,
520 dir_path: impl Into<&'b Path> + Send,
521 unmodified_since: Option<DateTime<Utc>>,
522 ) -> BoxStream<'a, Result<ObjectMeta>> {
523 self.inner.read_dir_all(dir_path, unmodified_since)
524 }
525
526 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
528 let path = dir_path.into();
529 let path = Path::parse(&path)?;
530
531 if self.is_local() {
532 return super::local::remove_dir_all(&path);
534 }
535 let sub_entries = self
536 .inner
537 .list(Some(&path))
538 .map(|m| m.map(|meta| meta.location))
539 .boxed();
540 self.inner
541 .delete_stream(sub_entries)
542 .try_collect::<Vec<_>>()
543 .await?;
544 Ok(())
545 }
546
547 pub fn remove_stream<'a>(
548 &'a self,
549 locations: BoxStream<'a, Result<Path>>,
550 ) -> BoxStream<'a, Result<Path>> {
551 self.inner
552 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
553 .err_into::<Error>()
554 .boxed()
555 }
556
557 pub async fn exists(&self, path: &Path) -> Result<bool> {
559 match self.inner.head(path).await {
560 Ok(_) => Ok(true),
561 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
562 Err(e) => Err(e.into()),
563 }
564 }
565
566 pub async fn size(&self, path: &Path) -> Result<u64> {
568 Ok(self.inner.head(path).await?.size)
569 }
570
571 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
573 let reader = self.open(path).await?;
574 Ok(reader.get_all().await?)
575 }
576
577 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
582 let reader = self.open(path).await?;
583 Ok(reader.get_range(range).await?)
584 }
585}
586
587#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
589pub enum LanceConfigKey {
590 DownloadRetryCount,
592}
593
594impl FromStr for LanceConfigKey {
595 type Err = Error;
596
597 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
598 match s.to_ascii_lowercase().as_str() {
599 "download_retry_count" => Ok(Self::DownloadRetryCount),
600 _ => Err(Error::InvalidInput {
601 source: format!("Invalid LanceConfigKey: {}", s).into(),
602 location: location!(),
603 }),
604 }
605 }
606}
607
608#[derive(Clone, Debug, Default)]
609pub struct StorageOptions(pub HashMap<String, String>);
610
611impl StorageOptions {
612 pub fn new(options: HashMap<String, String>) -> Self {
614 let mut options = options;
615 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
616 options.insert("allow_http".into(), value);
617 }
618 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
619 options.insert("allow_http".into(), value);
620 }
621 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
622 options.insert("allow_http".into(), value);
623 }
624 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
625 options.insert("client_max_retries".into(), value);
626 }
627 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
628 options.insert("client_retry_timeout".into(), value);
629 }
630 Self(options)
631 }
632
633 pub fn allow_http(&self) -> bool {
635 self.0.iter().any(|(key, value)| {
636 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
637 })
638 }
639
640 pub fn download_retry_count(&self) -> usize {
642 self.0
643 .iter()
644 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
645 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
646 .unwrap_or(3)
647 }
648
649 pub fn client_max_retries(&self) -> usize {
651 self.0
652 .iter()
653 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
654 .and_then(|(_, value)| value.parse::<usize>().ok())
655 .unwrap_or(10)
656 }
657
658 pub fn client_retry_timeout(&self) -> u64 {
660 self.0
661 .iter()
662 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
663 .and_then(|(_, value)| value.parse::<u64>().ok())
664 .unwrap_or(180)
665 }
666
667 pub fn get(&self, key: &str) -> Option<&String> {
668 self.0.get(key)
669 }
670}
671
672impl From<HashMap<String, String>> for StorageOptions {
673 fn from(value: HashMap<String, String>) -> Self {
674 Self::new(value)
675 }
676}
677
678impl ObjectStore {
679 #[allow(clippy::too_many_arguments)]
680 pub fn new(
681 store: Arc<DynObjectStore>,
682 location: Url,
683 block_size: Option<usize>,
684 wrapper: Option<Arc<dyn WrappingObjectStore>>,
685 use_constant_size_upload_parts: bool,
686 list_is_lexically_ordered: bool,
687 io_parallelism: usize,
688 download_retry_count: usize,
689 ) -> Self {
690 let scheme = location.scheme();
691 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
692
693 let store = match wrapper {
694 Some(wrapper) => wrapper.wrap(store),
695 None => store,
696 };
697
698 Self {
699 inner: store,
700 scheme: scheme.into(),
701 block_size,
702 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
703 use_constant_size_upload_parts,
704 list_is_lexically_ordered,
705 io_parallelism,
706 download_retry_count,
707 }
708 }
709}
710
711fn infer_block_size(scheme: &str) -> usize {
712 match scheme {
716 "file" => 4 * 1024,
717 _ => 64 * 1024,
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724 use object_store::memory::InMemory;
725 use rstest::rstest;
726 use std::env::set_current_dir;
727 use std::fs::{create_dir_all, write};
728 use std::path::Path as StdPath;
729 use std::sync::atomic::{AtomicBool, Ordering};
730
731 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
733 let expanded = tilde(path_str).to_string();
734 let path = StdPath::new(&expanded);
735 std::fs::create_dir_all(path.parent().unwrap())?;
736 write(path, contents)
737 }
738
739 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
740 let test_file_store = store.open(path).await.unwrap();
741 let size = test_file_store.size().await.unwrap();
742 let bytes = test_file_store.get_range(0..size).await.unwrap();
743 let contents = String::from_utf8(bytes.to_vec()).unwrap();
744 Ok(contents)
745 }
746
747 #[tokio::test]
748 async fn test_absolute_paths() {
749 let tmp_dir = tempfile::tempdir().unwrap();
750 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
751 write_to_file(
752 &format!("{tmp_path}/bar/foo.lance/test_file"),
753 "TEST_CONTENT",
754 )
755 .unwrap();
756
757 for uri in &[
759 format!("{tmp_path}/bar/foo.lance"),
760 format!("{tmp_path}/./bar/foo.lance"),
761 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
762 ] {
763 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
764 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
765 .await
766 .unwrap();
767 assert_eq!(contents, "TEST_CONTENT");
768 }
769 }
770
771 #[tokio::test]
772 async fn test_cloud_paths() {
773 let uri = "s3://bucket/foo.lance";
774 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
775 assert_eq!(store.scheme, "s3");
776 assert_eq!(path.to_string(), "foo.lance");
777
778 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
779 .await
780 .unwrap();
781 assert_eq!(store.scheme, "s3");
782 assert_eq!(path.to_string(), "foo.lance");
783
784 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
785 .await
786 .unwrap();
787 assert_eq!(store.scheme, "gs");
788 assert_eq!(path.to_string(), "foo.lance");
789 }
790
791 async fn test_block_size_used_test_helper(
792 uri: &str,
793 storage_options: Option<HashMap<String, String>>,
794 default_expected_block_size: usize,
795 ) {
796 let registry = Arc::new(ObjectStoreRegistry::default());
798 let params = ObjectStoreParams {
799 storage_options: storage_options.clone(),
800 ..ObjectStoreParams::default()
801 };
802 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
803 .await
804 .unwrap();
805 assert_eq!(store.block_size, default_expected_block_size);
806
807 let registry = Arc::new(ObjectStoreRegistry::default());
809 let params = ObjectStoreParams {
810 block_size: Some(1024),
811 storage_options: storage_options.clone(),
812 ..ObjectStoreParams::default()
813 };
814 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
815 .await
816 .unwrap();
817 assert_eq!(store.block_size, 1024);
818 }
819
820 #[rstest]
821 #[case("s3://bucket/foo.lance", None)]
822 #[case("gs://bucket/foo.lance", None)]
823 #[case("az://account/bucket/foo.lance",
824 Some(HashMap::from([
825 (String::from("account_name"), String::from("account")),
826 (String::from("container_name"), String::from("container"))
827 ])))]
828 #[tokio::test]
829 async fn test_block_size_used_cloud(
830 #[case] uri: &str,
831 #[case] storage_options: Option<HashMap<String, String>>,
832 ) {
833 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
834 }
835
836 #[rstest]
837 #[case("file")]
838 #[case("file-object-store")]
839 #[case("memory:///bucket/foo.lance")]
840 #[tokio::test]
841 async fn test_block_size_used_file(#[case] prefix: &str) {
842 let tmp_dir = tempfile::tempdir().unwrap();
843 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
844 let path = format!("{tmp_path}/bar/foo.lance/test_file");
845 write_to_file(&path, "URL").unwrap();
846 let uri = format!("{prefix}:///{path}");
847 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
848 }
849
850 #[tokio::test]
851 async fn test_relative_paths() {
852 let tmp_dir = tempfile::tempdir().unwrap();
853 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
854 write_to_file(
855 &format!("{tmp_path}/bar/foo.lance/test_file"),
856 "RELATIVE_URL",
857 )
858 .unwrap();
859
860 set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
861 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
862
863 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
864 .await
865 .unwrap();
866 assert_eq!(contents, "RELATIVE_URL");
867 }
868
869 #[tokio::test]
870 async fn test_tilde_expansion() {
871 let uri = "~/foo.lance";
872 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
873 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
874 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
875 .await
876 .unwrap();
877 assert_eq!(contents, "TILDE");
878 }
879
880 #[tokio::test]
881 async fn test_read_directory() {
882 let tmp_dir = tempfile::tempdir().unwrap();
883 let path = tmp_dir.path();
884 create_dir_all(path.join("foo").join("bar")).unwrap();
885 create_dir_all(path.join("foo").join("zoo")).unwrap();
886 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
887 write_to_file(
888 path.join("foo").join("test_file").to_str().unwrap(),
889 "read_dir",
890 )
891 .unwrap();
892 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
893
894 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
895 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
896 }
897
898 #[tokio::test]
899 async fn test_delete_directory() {
900 let tmp_dir = tempfile::tempdir().unwrap();
901 let path = tmp_dir.path();
902 create_dir_all(path.join("foo").join("bar")).unwrap();
903 create_dir_all(path.join("foo").join("zoo")).unwrap();
904 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
905 write_to_file(
906 path.join("foo")
907 .join("bar")
908 .join("test_file")
909 .to_str()
910 .unwrap(),
911 "delete",
912 )
913 .unwrap();
914 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
915 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
916 store.remove_dir_all(base.child("foo")).await.unwrap();
917
918 assert!(!path.join("foo").exists());
919 }
920
921 #[derive(Debug)]
922 struct TestWrapper {
923 called: AtomicBool,
924
925 return_value: Arc<dyn OSObjectStore>,
926 }
927
928 impl WrappingObjectStore for TestWrapper {
929 fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
930 self.called.store(true, Ordering::Relaxed);
931
932 self.return_value.clone()
934 }
935 }
936
937 impl TestWrapper {
938 fn called(&self) -> bool {
939 self.called.load(Ordering::Relaxed)
940 }
941 }
942
943 #[tokio::test]
944 async fn test_wrapping_object_store_option_is_used() {
945 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
947 let registry = Arc::new(ObjectStoreRegistry::default());
948
949 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
950
951 let wrapper = Arc::new(TestWrapper {
952 called: AtomicBool::new(false),
953 return_value: mock_inner_store.clone(),
954 });
955
956 let params = ObjectStoreParams {
957 object_store_wrapper: Some(wrapper.clone()),
958 ..ObjectStoreParams::default()
959 };
960
961 assert!(!wrapper.called());
963
964 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
965 .await
966 .unwrap();
967
968 assert!(wrapper.called());
970
971 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
974 }
975
976 #[tokio::test]
977 async fn test_local_paths() {
978 let temp_dir = tempfile::tempdir().unwrap();
979
980 let file_path = temp_dir.path().join("test_file");
981 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
982 .await
983 .unwrap();
984 writer.write_all(b"LOCAL").await.unwrap();
985 writer.shutdown().await.unwrap();
986
987 let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
988 let buf = reader.get_range(0..5).await.unwrap();
989 assert_eq!(buf.as_ref(), b"LOCAL");
990 }
991
992 #[tokio::test]
993 async fn test_read_one() {
994 let temp_dir = tempfile::tempdir().unwrap();
995
996 let file_path = temp_dir.path().join("test_file");
997 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
998 .await
999 .unwrap();
1000 writer.write_all(b"LOCAL").await.unwrap();
1001 writer.shutdown().await.unwrap();
1002
1003 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1004 let obj_store = ObjectStore::local();
1005 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1006 assert_eq!(buf.as_ref(), b"LOCAL");
1007
1008 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1009 assert_eq!(buf.as_ref(), b"LOCAL");
1010 }
1011
1012 #[tokio::test]
1013 #[cfg(windows)]
1014 async fn test_windows_paths() {
1015 use std::path::Component;
1016 use std::path::Prefix;
1017 use std::path::Prefix::*;
1018
1019 fn get_path_prefix(path: &StdPath) -> Prefix {
1020 match path.components().next().unwrap() {
1021 Component::Prefix(prefix_component) => prefix_component.kind(),
1022 _ => panic!(),
1023 }
1024 }
1025
1026 fn get_drive_letter(prefix: Prefix) -> String {
1027 match prefix {
1028 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1029 _ => panic!(),
1030 }
1031 }
1032
1033 let tmp_dir = tempfile::tempdir().unwrap();
1034 let tmp_path = tmp_dir.path();
1035 let prefix = get_path_prefix(tmp_path);
1036 let drive_letter = get_drive_letter(prefix);
1037
1038 write_to_file(
1039 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1040 "WINDOWS",
1041 )
1042 .unwrap();
1043
1044 for uri in &[
1045 format!("{drive_letter}:/test_folder/test.lance"),
1046 format!("{drive_letter}:\\test_folder\\test.lance"),
1047 ] {
1048 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1049 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1050 .await
1051 .unwrap();
1052 assert_eq!(contents, "WINDOWS");
1053 }
1054 }
1055
1056 #[tokio::test]
1057 async fn test_cross_filesystem_copy() {
1058 let source_dir = tempfile::tempdir().unwrap();
1060 let dest_dir = tempfile::tempdir().unwrap();
1061
1062 let source_file_name = "test_file.txt";
1064 let source_file = source_dir.path().join(source_file_name);
1065 std::fs::write(&source_file, b"test content").unwrap();
1066
1067 let (store, base_path) = ObjectStore::from_uri(source_dir.path().to_str().unwrap())
1069 .await
1070 .unwrap();
1071
1072 let from_path = base_path.child(source_file_name);
1074
1075 let dest_file = dest_dir.path().join("copied_file.txt");
1077 let dest_str = dest_file.to_str().unwrap();
1078 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1079
1080 store.copy(&from_path, &to_path).await.unwrap();
1082
1083 assert!(dest_file.exists());
1085 let copied_content = std::fs::read(&dest_file).unwrap();
1086 assert_eq!(copied_content, b"test content");
1087 }
1088
1089 #[tokio::test]
1090 async fn test_copy_creates_parent_directories() {
1091 let source_dir = tempfile::tempdir().unwrap();
1092 let dest_dir = tempfile::tempdir().unwrap();
1093
1094 let source_file_name = "test_file.txt";
1096 let source_file = source_dir.path().join(source_file_name);
1097 std::fs::write(&source_file, b"test content").unwrap();
1098
1099 let (store, base_path) = ObjectStore::from_uri(source_dir.path().to_str().unwrap())
1101 .await
1102 .unwrap();
1103
1104 let from_path = base_path.child(source_file_name);
1106
1107 let dest_file = dest_dir
1109 .path()
1110 .join("nested")
1111 .join("dirs")
1112 .join("copied_file.txt");
1113 let dest_str = dest_file.to_str().unwrap();
1114 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1115
1116 store.copy(&from_path, &to_path).await.unwrap();
1118
1119 assert!(dest_file.exists());
1121 assert!(dest_file.parent().unwrap().exists());
1122 let copied_content = std::fs::read(&dest_file).unwrap();
1123 assert_eq!(copied_content, b"test content");
1124 }
1125}