1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::WriteResult;
41use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
42use lance_core::{Error, Result};
43
44pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
49pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
51
52const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
54const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
57 std::env::var("LANCE_MAX_IOP_SIZE")
58 .map(|val| val.parse().unwrap())
59 .unwrap_or(16 * 1024 * 1024)
60});
61
62pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
63
64pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
65pub use storage_options::{
66 LanceNamespaceStorageOptionsProvider, StorageOptionsProvider, EXPIRES_AT_MILLIS_KEY,
67};
68
69#[async_trait]
70pub trait ObjectStoreExt {
71 async fn exists(&self, path: &Path) -> Result<bool>;
73
74 fn read_dir_all<'a, 'b>(
78 &'a self,
79 dir_path: impl Into<&'b Path> + Send,
80 unmodified_since: Option<DateTime<Utc>>,
81 ) -> BoxStream<'a, Result<ObjectMeta>>;
82}
83
84#[async_trait]
85impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
86 fn read_dir_all<'a, 'b>(
87 &'a self,
88 dir_path: impl Into<&'b Path> + Send,
89 unmodified_since: Option<DateTime<Utc>>,
90 ) -> BoxStream<'a, Result<ObjectMeta>> {
91 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
92 if let Some(unmodified_since_val) = unmodified_since {
93 output
94 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
95 .boxed()
96 } else {
97 output.boxed()
98 }
99 }
100
101 async fn exists(&self, path: &Path) -> Result<bool> {
102 match self.head(path).await {
103 Ok(_) => Ok(true),
104 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
105 Err(e) => Err(e.into()),
106 }
107 }
108}
109
110#[derive(Debug, Clone)]
112pub struct ObjectStore {
113 pub inner: Arc<dyn OSObjectStore>,
115 scheme: String,
116 block_size: usize,
117 max_iop_size: u64,
118 pub use_constant_size_upload_parts: bool,
121 pub list_is_lexically_ordered: bool,
124 io_parallelism: usize,
125 download_retry_count: usize,
127}
128
129impl DeepSizeOf for ObjectStore {
130 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
131 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
136 }
137}
138
139impl std::fmt::Display for ObjectStore {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 write!(f, "ObjectStore({})", self.scheme)
142 }
143}
144
145pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
146 fn wrap(
151 &self,
152 original: Arc<dyn OSObjectStore>,
153 storage_options: Option<&HashMap<String, String>>,
154 ) -> Arc<dyn OSObjectStore>;
155}
156
157#[derive(Debug, Clone)]
158pub struct ChainedWrappingObjectStore {
159 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
160}
161
162impl ChainedWrappingObjectStore {
163 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
164 Self { wrappers }
165 }
166
167 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
168 self.wrappers.push(wrapper);
169 }
170}
171
172impl WrappingObjectStore for ChainedWrappingObjectStore {
173 fn wrap(
174 &self,
175 original: Arc<dyn OSObjectStore>,
176 storage_options: Option<&HashMap<String, String>>,
177 ) -> Arc<dyn OSObjectStore> {
178 self.wrappers
179 .iter()
180 .fold(original, |acc, wrapper| wrapper.wrap(acc, storage_options))
181 }
182}
183
184#[derive(Debug, Clone)]
187pub struct ObjectStoreParams {
188 pub block_size: Option<usize>,
189 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
190 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
191 pub s3_credentials_refresh_offset: Duration,
192 #[cfg(feature = "aws")]
193 pub aws_credentials: Option<AwsCredentialProvider>,
194 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
195 pub storage_options: Option<HashMap<String, String>>,
196 pub storage_options_provider: Option<Arc<dyn StorageOptionsProvider>>,
198 pub use_constant_size_upload_parts: bool,
203 pub list_is_lexically_ordered: Option<bool>,
204}
205
206impl Default for ObjectStoreParams {
207 fn default() -> Self {
208 #[allow(deprecated)]
209 Self {
210 object_store: None,
211 block_size: None,
212 s3_credentials_refresh_offset: Duration::from_secs(60),
213 #[cfg(feature = "aws")]
214 aws_credentials: None,
215 object_store_wrapper: None,
216 storage_options: None,
217 storage_options_provider: None,
218 use_constant_size_upload_parts: false,
219 list_is_lexically_ordered: None,
220 }
221 }
222}
223
224impl std::hash::Hash for ObjectStoreParams {
226 #[allow(deprecated)]
227 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
228 self.block_size.hash(state);
230 if let Some((store, url)) = &self.object_store {
231 Arc::as_ptr(store).hash(state);
232 url.hash(state);
233 }
234 self.s3_credentials_refresh_offset.hash(state);
235 #[cfg(feature = "aws")]
236 if let Some(aws_credentials) = &self.aws_credentials {
237 Arc::as_ptr(aws_credentials).hash(state);
238 }
239 if let Some(wrapper) = &self.object_store_wrapper {
240 Arc::as_ptr(wrapper).hash(state);
241 }
242 if let Some(storage_options) = &self.storage_options {
243 for (key, value) in storage_options {
244 key.hash(state);
245 value.hash(state);
246 }
247 }
248 if let Some(provider) = &self.storage_options_provider {
249 provider.provider_id().hash(state);
250 }
251 self.use_constant_size_upload_parts.hash(state);
252 self.list_is_lexically_ordered.hash(state);
253 }
254}
255
256impl Eq for ObjectStoreParams {}
258impl PartialEq for ObjectStoreParams {
259 #[allow(deprecated)]
260 fn eq(&self, other: &Self) -> bool {
261 #[cfg(feature = "aws")]
262 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
263 return false;
264 }
265
266 self.block_size == other.block_size
269 && self
270 .object_store
271 .as_ref()
272 .map(|(store, url)| (Arc::as_ptr(store), url))
273 == other
274 .object_store
275 .as_ref()
276 .map(|(store, url)| (Arc::as_ptr(store), url))
277 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
278 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
279 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
280 && self.storage_options == other.storage_options
281 && self
282 .storage_options_provider
283 .as_ref()
284 .map(|p| p.provider_id())
285 == other
286 .storage_options_provider
287 .as_ref()
288 .map(|p| p.provider_id())
289 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
290 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
291 }
292}
293
294fn uri_to_url(uri: &str) -> Result<Url> {
295 match Url::parse(uri) {
296 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
297 local_path_to_url(uri)
299 }
300 Ok(url) => Ok(url),
301 Err(_) => local_path_to_url(uri),
302 }
303}
304
305fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
306 let expanded = tilde(str_path.as_ref()).to_string();
307
308 let mut expanded_path = path_abs::PathAbs::new(expanded)
309 .unwrap()
310 .as_path()
311 .to_path_buf();
312 if let Some(s) = expanded_path.as_path().to_str() {
314 if s.is_empty() {
315 expanded_path = std::env::current_dir()?;
316 }
317 }
318
319 Ok(expanded_path)
320}
321
322fn local_path_to_url(str_path: &str) -> Result<Url> {
323 let expanded_path = expand_path(str_path)?;
324
325 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
326 source: format!("Invalid table location: '{}'", str_path).into(),
327 location: location!(),
328 })
329}
330
331impl ObjectStore {
332 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
340 let registry = Arc::new(ObjectStoreRegistry::default());
341
342 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
343 }
344
345 pub async fn from_uri_and_params(
349 registry: Arc<ObjectStoreRegistry>,
350 uri: &str,
351 params: &ObjectStoreParams,
352 ) -> Result<(Arc<Self>, Path)> {
353 #[allow(deprecated)]
354 if let Some((store, path)) = params.object_store.as_ref() {
355 let mut inner = store.clone();
356 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
357 inner = wrapper.wrap(inner, params.storage_options.as_ref());
358 }
359 let store = Self {
360 inner,
361 scheme: path.scheme().to_string(),
362 block_size: params.block_size.unwrap_or(64 * 1024),
363 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
364 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
365 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
366 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
367 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
368 };
369 let path = Path::parse(path.path())?;
370 return Ok((Arc::new(store), path));
371 }
372 let url = uri_to_url(uri)?;
373 let store = registry.get_store(url.clone(), params).await?;
374 let provider = registry.get_provider(url.scheme()).expect_ok()?;
376 let path = provider.extract_path(&url)?;
377
378 Ok((store, path))
379 }
380
381 pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
395 let url = uri_to_url(uri)?;
396 let provider = registry.get_provider(url.scheme()).ok_or_else(|| {
397 Error::invalid_input(format!("Unknown scheme: {}", url.scheme()), location!())
398 })?;
399 provider.extract_path(&url)
400 }
401
402 #[deprecated(note = "Use `from_uri` instead")]
403 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
404 Self::from_uri_and_params(
405 Arc::new(ObjectStoreRegistry::default()),
406 str_path,
407 &Default::default(),
408 )
409 .now_or_never()
410 .unwrap()
411 }
412
413 pub fn local() -> Self {
415 let provider = FileStoreProvider;
416 provider
417 .new_store(Url::parse("file:///").unwrap(), &Default::default())
418 .now_or_never()
419 .unwrap()
420 .unwrap()
421 }
422
423 pub fn memory() -> Self {
425 let provider = MemoryStoreProvider;
426 provider
427 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
428 .now_or_never()
429 .unwrap()
430 .unwrap()
431 }
432
433 pub fn is_local(&self) -> bool {
435 self.scheme == "file"
436 }
437
438 pub fn is_cloud(&self) -> bool {
439 self.scheme != "file" && self.scheme != "memory"
440 }
441
442 pub fn scheme(&self) -> &str {
443 &self.scheme
444 }
445
446 pub fn block_size(&self) -> usize {
447 self.block_size
448 }
449
450 pub fn max_iop_size(&self) -> u64 {
451 self.max_iop_size
452 }
453
454 pub fn io_parallelism(&self) -> usize {
455 std::env::var("LANCE_IO_THREADS")
456 .map(|val| val.parse::<usize>().unwrap())
457 .unwrap_or(self.io_parallelism)
458 }
459
460 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
465 match self.scheme.as_str() {
466 "file" => LocalObjectReader::open(path, self.block_size, None).await,
467 _ => Ok(Box::new(CloudObjectReader::new(
468 self.inner.clone(),
469 path.clone(),
470 self.block_size,
471 None,
472 self.download_retry_count,
473 )?)),
474 }
475 }
476
477 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
483 if known_size <= self.block_size {
486 return Ok(Box::new(SmallReader::new(
487 self.inner.clone(),
488 path.clone(),
489 self.download_retry_count,
490 known_size,
491 )));
492 }
493
494 match self.scheme.as_str() {
495 "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
496 _ => Ok(Box::new(CloudObjectReader::new(
497 self.inner.clone(),
498 path.clone(),
499 self.block_size,
500 Some(known_size),
501 self.download_retry_count,
502 )?)),
503 }
504 }
505
506 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
508 let object_store = Self::local();
509 let absolute_path = expand_path(path.to_string_lossy())?;
510 let os_path = Path::from_absolute_path(absolute_path)?;
511 object_store.create(&os_path).await
512 }
513
514 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
516 let object_store = Self::local();
517 let absolute_path = expand_path(path.to_string_lossy())?;
518 let os_path = Path::from_absolute_path(absolute_path)?;
519 object_store.open(&os_path).await
520 }
521
522 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
524 ObjectWriter::new(self, path).await
525 }
526
527 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
529 let mut writer = self.create(path).await?;
530 writer.write_all(content).await?;
531 writer.shutdown().await
532 }
533
534 pub async fn delete(&self, path: &Path) -> Result<()> {
535 self.inner.delete(path).await?;
536 Ok(())
537 }
538
539 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
540 if self.is_local() {
541 return super::local::copy_file(from, to);
543 }
544 Ok(self.inner.copy(from, to).await?)
545 }
546
547 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
549 let path = dir_path.into();
550 let path = Path::parse(&path)?;
551 let output = self.inner.list_with_delimiter(Some(&path)).await?;
552 Ok(output
553 .common_prefixes
554 .iter()
555 .chain(output.objects.iter().map(|o| &o.location))
556 .map(|s| s.filename().unwrap().to_string())
557 .collect())
558 }
559
560 pub fn list(
561 &self,
562 path: Option<Path>,
563 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
564 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
565 }
566
567 pub fn read_dir_all<'a, 'b>(
571 &'a self,
572 dir_path: impl Into<&'b Path> + Send,
573 unmodified_since: Option<DateTime<Utc>>,
574 ) -> BoxStream<'a, Result<ObjectMeta>> {
575 self.inner.read_dir_all(dir_path, unmodified_since)
576 }
577
578 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
580 let path = dir_path.into();
581 let path = Path::parse(&path)?;
582
583 if self.is_local() {
584 return super::local::remove_dir_all(&path);
586 }
587 let sub_entries = self
588 .inner
589 .list(Some(&path))
590 .map(|m| m.map(|meta| meta.location))
591 .boxed();
592 self.inner
593 .delete_stream(sub_entries)
594 .try_collect::<Vec<_>>()
595 .await?;
596 Ok(())
597 }
598
599 pub fn remove_stream<'a>(
600 &'a self,
601 locations: BoxStream<'a, Result<Path>>,
602 ) -> BoxStream<'a, Result<Path>> {
603 self.inner
604 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
605 .err_into::<Error>()
606 .boxed()
607 }
608
609 pub async fn exists(&self, path: &Path) -> Result<bool> {
611 match self.inner.head(path).await {
612 Ok(_) => Ok(true),
613 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
614 Err(e) => Err(e.into()),
615 }
616 }
617
618 pub async fn size(&self, path: &Path) -> Result<u64> {
620 Ok(self.inner.head(path).await?.size)
621 }
622
623 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
625 let reader = self.open(path).await?;
626 Ok(reader.get_all().await?)
627 }
628
629 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
634 let reader = self.open(path).await?;
635 Ok(reader.get_range(range).await?)
636 }
637}
638
639#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
641pub enum LanceConfigKey {
642 DownloadRetryCount,
644}
645
646impl FromStr for LanceConfigKey {
647 type Err = Error;
648
649 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
650 match s.to_ascii_lowercase().as_str() {
651 "download_retry_count" => Ok(Self::DownloadRetryCount),
652 _ => Err(Error::InvalidInput {
653 source: format!("Invalid LanceConfigKey: {}", s).into(),
654 location: location!(),
655 }),
656 }
657 }
658}
659
660#[derive(Clone, Debug, Default)]
661pub struct StorageOptions(pub HashMap<String, String>);
662
663impl StorageOptions {
664 pub fn new(options: HashMap<String, String>) -> Self {
666 let mut options = options;
667 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
668 options.insert("allow_http".into(), value);
669 }
670 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
671 options.insert("allow_http".into(), value);
672 }
673 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
674 options.insert("allow_http".into(), value);
675 }
676 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
677 options.insert("client_max_retries".into(), value);
678 }
679 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
680 options.insert("client_retry_timeout".into(), value);
681 }
682 Self(options)
683 }
684
685 pub fn allow_http(&self) -> bool {
687 self.0.iter().any(|(key, value)| {
688 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
689 })
690 }
691
692 pub fn download_retry_count(&self) -> usize {
694 self.0
695 .iter()
696 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
697 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
698 .unwrap_or(3)
699 }
700
701 pub fn client_max_retries(&self) -> usize {
703 self.0
704 .iter()
705 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
706 .and_then(|(_, value)| value.parse::<usize>().ok())
707 .unwrap_or(10)
708 }
709
710 pub fn client_retry_timeout(&self) -> u64 {
712 self.0
713 .iter()
714 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
715 .and_then(|(_, value)| value.parse::<u64>().ok())
716 .unwrap_or(180)
717 }
718
719 pub fn get(&self, key: &str) -> Option<&String> {
720 self.0.get(key)
721 }
722
723 pub fn expires_at_millis(&self) -> Option<u64> {
725 self.0
726 .get(EXPIRES_AT_MILLIS_KEY)
727 .and_then(|s| s.parse::<u64>().ok())
728 }
729}
730
731impl From<HashMap<String, String>> for StorageOptions {
732 fn from(value: HashMap<String, String>) -> Self {
733 Self::new(value)
734 }
735}
736
737impl ObjectStore {
738 #[allow(clippy::too_many_arguments)]
739 pub fn new(
740 store: Arc<DynObjectStore>,
741 location: Url,
742 block_size: Option<usize>,
743 wrapper: Option<Arc<dyn WrappingObjectStore>>,
744 use_constant_size_upload_parts: bool,
745 list_is_lexically_ordered: bool,
746 io_parallelism: usize,
747 download_retry_count: usize,
748 storage_options: Option<&HashMap<String, String>>,
749 ) -> Self {
750 let scheme = location.scheme();
751 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
752
753 let store = match wrapper {
754 Some(wrapper) => wrapper.wrap(store, storage_options),
755 None => store,
756 };
757
758 Self {
759 inner: store,
760 scheme: scheme.into(),
761 block_size,
762 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
763 use_constant_size_upload_parts,
764 list_is_lexically_ordered,
765 io_parallelism,
766 download_retry_count,
767 }
768 }
769}
770
771fn infer_block_size(scheme: &str) -> usize {
772 match scheme {
776 "file" => 4 * 1024,
777 _ => 64 * 1024,
778 }
779}
780
781#[cfg(test)]
782mod tests {
783 use super::*;
784 use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
785 use object_store::memory::InMemory;
786 use rstest::rstest;
787 use std::env::set_current_dir;
788 use std::fs::{create_dir_all, write};
789 use std::path::Path as StdPath;
790 use std::sync::atomic::{AtomicBool, Ordering};
791
792 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
794 let expanded = tilde(path_str).to_string();
795 let path = StdPath::new(&expanded);
796 std::fs::create_dir_all(path.parent().unwrap())?;
797 write(path, contents)
798 }
799
800 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
801 let test_file_store = store.open(path).await.unwrap();
802 let size = test_file_store.size().await.unwrap();
803 let bytes = test_file_store.get_range(0..size).await.unwrap();
804 let contents = String::from_utf8(bytes.to_vec()).unwrap();
805 Ok(contents)
806 }
807
808 #[tokio::test]
809 async fn test_absolute_paths() {
810 let tmp_path = TempStrDir::default();
811 write_to_file(
812 &format!("{tmp_path}/bar/foo.lance/test_file"),
813 "TEST_CONTENT",
814 )
815 .unwrap();
816
817 for uri in &[
819 format!("{tmp_path}/bar/foo.lance"),
820 format!("{tmp_path}/./bar/foo.lance"),
821 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
822 ] {
823 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
824 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
825 .await
826 .unwrap();
827 assert_eq!(contents, "TEST_CONTENT");
828 }
829 }
830
831 #[tokio::test]
832 async fn test_cloud_paths() {
833 let uri = "s3://bucket/foo.lance";
834 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
835 assert_eq!(store.scheme, "s3");
836 assert_eq!(path.to_string(), "foo.lance");
837
838 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
839 .await
840 .unwrap();
841 assert_eq!(store.scheme, "s3");
842 assert_eq!(path.to_string(), "foo.lance");
843
844 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
845 .await
846 .unwrap();
847 assert_eq!(store.scheme, "gs");
848 assert_eq!(path.to_string(), "foo.lance");
849 }
850
851 async fn test_block_size_used_test_helper(
852 uri: &str,
853 storage_options: Option<HashMap<String, String>>,
854 default_expected_block_size: usize,
855 ) {
856 let registry = Arc::new(ObjectStoreRegistry::default());
858 let params = ObjectStoreParams {
859 storage_options: storage_options.clone(),
860 ..ObjectStoreParams::default()
861 };
862 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
863 .await
864 .unwrap();
865 assert_eq!(store.block_size, default_expected_block_size);
866
867 let registry = Arc::new(ObjectStoreRegistry::default());
869 let params = ObjectStoreParams {
870 block_size: Some(1024),
871 storage_options: storage_options.clone(),
872 ..ObjectStoreParams::default()
873 };
874 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
875 .await
876 .unwrap();
877 assert_eq!(store.block_size, 1024);
878 }
879
880 #[rstest]
881 #[case("s3://bucket/foo.lance", None)]
882 #[case("gs://bucket/foo.lance", None)]
883 #[case("az://account/bucket/foo.lance",
884 Some(HashMap::from([
885 (String::from("account_name"), String::from("account")),
886 (String::from("container_name"), String::from("container"))
887 ])))]
888 #[tokio::test]
889 async fn test_block_size_used_cloud(
890 #[case] uri: &str,
891 #[case] storage_options: Option<HashMap<String, String>>,
892 ) {
893 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
894 }
895
896 #[rstest]
897 #[case("file")]
898 #[case("file-object-store")]
899 #[case("memory:///bucket/foo.lance")]
900 #[tokio::test]
901 async fn test_block_size_used_file(#[case] prefix: &str) {
902 let tmp_path = TempStrDir::default();
903 let path = format!("{tmp_path}/bar/foo.lance/test_file");
904 write_to_file(&path, "URL").unwrap();
905 let uri = format!("{prefix}:///{path}");
906 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
907 }
908
909 #[tokio::test]
910 async fn test_relative_paths() {
911 let tmp_path = TempStrDir::default();
912 write_to_file(
913 &format!("{tmp_path}/bar/foo.lance/test_file"),
914 "RELATIVE_URL",
915 )
916 .unwrap();
917
918 set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
919 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
920
921 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
922 .await
923 .unwrap();
924 assert_eq!(contents, "RELATIVE_URL");
925 }
926
927 #[tokio::test]
928 async fn test_tilde_expansion() {
929 let uri = "~/foo.lance";
930 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
931 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
932 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
933 .await
934 .unwrap();
935 assert_eq!(contents, "TILDE");
936 }
937
938 #[tokio::test]
939 async fn test_read_directory() {
940 let path = TempStdDir::default();
941 create_dir_all(path.join("foo").join("bar")).unwrap();
942 create_dir_all(path.join("foo").join("zoo")).unwrap();
943 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
944 write_to_file(
945 path.join("foo").join("test_file").to_str().unwrap(),
946 "read_dir",
947 )
948 .unwrap();
949 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
950
951 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
952 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
953 }
954
955 #[tokio::test]
956 async fn test_delete_directory() {
957 let path = TempStdDir::default();
958 create_dir_all(path.join("foo").join("bar")).unwrap();
959 create_dir_all(path.join("foo").join("zoo")).unwrap();
960 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
961 write_to_file(
962 path.join("foo")
963 .join("bar")
964 .join("test_file")
965 .to_str()
966 .unwrap(),
967 "delete",
968 )
969 .unwrap();
970 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
971 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
972 store.remove_dir_all(base.child("foo")).await.unwrap();
973
974 assert!(!path.join("foo").exists());
975 }
976
977 #[derive(Debug)]
978 struct TestWrapper {
979 called: AtomicBool,
980
981 return_value: Arc<dyn OSObjectStore>,
982 }
983
984 impl WrappingObjectStore for TestWrapper {
985 fn wrap(
986 &self,
987 _original: Arc<dyn OSObjectStore>,
988 _storage_options: Option<&HashMap<String, String>>,
989 ) -> Arc<dyn OSObjectStore> {
990 self.called.store(true, Ordering::Relaxed);
991
992 self.return_value.clone()
994 }
995 }
996
997 impl TestWrapper {
998 fn called(&self) -> bool {
999 self.called.load(Ordering::Relaxed)
1000 }
1001 }
1002
1003 #[tokio::test]
1004 async fn test_wrapping_object_store_option_is_used() {
1005 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1007 let registry = Arc::new(ObjectStoreRegistry::default());
1008
1009 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1010
1011 let wrapper = Arc::new(TestWrapper {
1012 called: AtomicBool::new(false),
1013 return_value: mock_inner_store.clone(),
1014 });
1015
1016 let params = ObjectStoreParams {
1017 object_store_wrapper: Some(wrapper.clone()),
1018 ..ObjectStoreParams::default()
1019 };
1020
1021 assert!(!wrapper.called());
1023
1024 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
1025 .await
1026 .unwrap();
1027
1028 assert!(wrapper.called());
1030
1031 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1034 }
1035
1036 #[tokio::test]
1037 async fn test_local_paths() {
1038 let file_path = TempStdFile::default();
1039 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1040 writer.write_all(b"LOCAL").await.unwrap();
1041 writer.shutdown().await.unwrap();
1042
1043 let reader = ObjectStore::open_local(&file_path).await.unwrap();
1044 let buf = reader.get_range(0..5).await.unwrap();
1045 assert_eq!(buf.as_ref(), b"LOCAL");
1046 }
1047
1048 #[tokio::test]
1049 async fn test_read_one() {
1050 let file_path = TempStdFile::default();
1051 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1052 writer.write_all(b"LOCAL").await.unwrap();
1053 writer.shutdown().await.unwrap();
1054
1055 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1056 let obj_store = ObjectStore::local();
1057 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1058 assert_eq!(buf.as_ref(), b"LOCAL");
1059
1060 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1061 assert_eq!(buf.as_ref(), b"LOCAL");
1062 }
1063
1064 #[tokio::test]
1065 #[cfg(windows)]
1066 async fn test_windows_paths() {
1067 use std::path::Component;
1068 use std::path::Prefix;
1069 use std::path::Prefix::*;
1070
1071 fn get_path_prefix(path: &StdPath) -> Prefix {
1072 match path.components().next().unwrap() {
1073 Component::Prefix(prefix_component) => prefix_component.kind(),
1074 _ => panic!(),
1075 }
1076 }
1077
1078 fn get_drive_letter(prefix: Prefix) -> String {
1079 match prefix {
1080 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1081 _ => panic!(),
1082 }
1083 }
1084
1085 let tmp_path = TempStdFile::default();
1086 let prefix = get_path_prefix(&tmp_path);
1087 let drive_letter = get_drive_letter(prefix);
1088
1089 write_to_file(
1090 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1091 "WINDOWS",
1092 )
1093 .unwrap();
1094
1095 for uri in &[
1096 format!("{drive_letter}:/test_folder/test.lance"),
1097 format!("{drive_letter}:\\test_folder\\test.lance"),
1098 ] {
1099 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1100 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1101 .await
1102 .unwrap();
1103 assert_eq!(contents, "WINDOWS");
1104 }
1105 }
1106
1107 #[tokio::test]
1108 async fn test_cross_filesystem_copy() {
1109 let source_dir = TempStdDir::default();
1111 let dest_dir = TempStdDir::default();
1112
1113 let source_file_name = "test_file.txt";
1115 let source_file = source_dir.join(source_file_name);
1116 std::fs::write(&source_file, b"test content").unwrap();
1117
1118 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1120 .await
1121 .unwrap();
1122
1123 let from_path = base_path.child(source_file_name);
1125
1126 let dest_file = dest_dir.join("copied_file.txt");
1128 let dest_str = dest_file.to_str().unwrap();
1129 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1130
1131 store.copy(&from_path, &to_path).await.unwrap();
1133
1134 assert!(dest_file.exists());
1136 let copied_content = std::fs::read(&dest_file).unwrap();
1137 assert_eq!(copied_content, b"test content");
1138 }
1139
1140 #[tokio::test]
1141 async fn test_copy_creates_parent_directories() {
1142 let source_dir = TempStdDir::default();
1143 let dest_dir = TempStdDir::default();
1144
1145 let source_file_name = "test_file.txt";
1147 let source_file = source_dir.join(source_file_name);
1148 std::fs::write(&source_file, b"test content").unwrap();
1149
1150 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1152 .await
1153 .unwrap();
1154
1155 let from_path = base_path.child(source_file_name);
1157
1158 let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1160 let dest_str = dest_file.to_str().unwrap();
1161 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1162
1163 store.copy(&from_path, &to_path).await.unwrap();
1165
1166 assert!(dest_file.exists());
1168 assert!(dest_file.parent().unwrap().exists());
1169 let copied_content = std::fs::read(&dest_file).unwrap();
1170 assert_eq!(copied_content, b"test content");
1171 }
1172}