1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_reader::SmallReader;
39use crate::object_writer::WriteResult;
40use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
41use lance_core::{Error, Result};
42
43pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
48pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
50
51const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
53const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; lazy_static::lazy_static! {
56 pub static ref DEFAULT_MAX_IOP_SIZE: u64 = std::env::var("LANCE_MAX_IOP_SIZE")
57 .map(|val| val.parse().unwrap()).unwrap_or(16 * 1024 * 1024);
58}
59
60pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
61
62pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
63
64#[async_trait]
65pub trait ObjectStoreExt {
66 async fn exists(&self, path: &Path) -> Result<bool>;
68
69 async fn read_dir_all<'a>(
73 &'a self,
74 dir_path: impl Into<&Path> + Send,
75 unmodified_since: Option<DateTime<Utc>>,
76 ) -> Result<BoxStream<'a, Result<ObjectMeta>>>;
77}
78
79#[async_trait]
80impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
81 async fn read_dir_all<'a>(
82 &'a self,
83 dir_path: impl Into<&Path> + Send,
84 unmodified_since: Option<DateTime<Utc>>,
85 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
86 let mut output = self.list(Some(dir_path.into()));
87 if let Some(unmodified_since_val) = unmodified_since {
88 output = output
89 .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
90 .boxed();
91 }
92 Ok(output.map_err(|e| e.into()).boxed())
93 }
94
95 async fn exists(&self, path: &Path) -> Result<bool> {
96 match self.head(path).await {
97 Ok(_) => Ok(true),
98 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
99 Err(e) => Err(e.into()),
100 }
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct ObjectStore {
107 pub inner: Arc<dyn OSObjectStore>,
109 scheme: String,
110 block_size: usize,
111 max_iop_size: u64,
112 pub use_constant_size_upload_parts: bool,
115 pub list_is_lexically_ordered: bool,
118 io_parallelism: usize,
119 download_retry_count: usize,
121}
122
123impl DeepSizeOf for ObjectStore {
124 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
125 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
130 }
131}
132
133impl std::fmt::Display for ObjectStore {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 write!(f, "ObjectStore({})", self.scheme)
136 }
137}
138
139pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
140 fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
141}
142
143#[derive(Debug, Clone)]
146pub struct ObjectStoreParams {
147 pub block_size: Option<usize>,
148 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
149 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
150 pub s3_credentials_refresh_offset: Duration,
151 #[cfg(feature = "aws")]
152 pub aws_credentials: Option<AwsCredentialProvider>,
153 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
154 pub storage_options: Option<HashMap<String, String>>,
155 pub use_constant_size_upload_parts: bool,
160 pub list_is_lexically_ordered: Option<bool>,
161}
162
163impl Default for ObjectStoreParams {
164 fn default() -> Self {
165 #[allow(deprecated)]
166 Self {
167 object_store: None,
168 block_size: None,
169 s3_credentials_refresh_offset: Duration::from_secs(60),
170 #[cfg(feature = "aws")]
171 aws_credentials: None,
172 object_store_wrapper: None,
173 storage_options: None,
174 use_constant_size_upload_parts: false,
175 list_is_lexically_ordered: None,
176 }
177 }
178}
179
180impl std::hash::Hash for ObjectStoreParams {
182 #[allow(deprecated)]
183 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
184 self.block_size.hash(state);
186 if let Some((store, url)) = &self.object_store {
187 Arc::as_ptr(store).hash(state);
188 url.hash(state);
189 }
190 self.s3_credentials_refresh_offset.hash(state);
191 #[cfg(feature = "aws")]
192 if let Some(aws_credentials) = &self.aws_credentials {
193 Arc::as_ptr(aws_credentials).hash(state);
194 }
195 if let Some(wrapper) = &self.object_store_wrapper {
196 Arc::as_ptr(wrapper).hash(state);
197 }
198 if let Some(storage_options) = &self.storage_options {
199 for (key, value) in storage_options {
200 key.hash(state);
201 value.hash(state);
202 }
203 }
204 self.use_constant_size_upload_parts.hash(state);
205 self.list_is_lexically_ordered.hash(state);
206 }
207}
208
209impl Eq for ObjectStoreParams {}
211impl PartialEq for ObjectStoreParams {
212 #[allow(deprecated)]
213 fn eq(&self, other: &Self) -> bool {
214 self.block_size == other.block_size
216 && self
217 .object_store
218 .as_ref()
219 .map(|(store, url)| (Arc::as_ptr(store), url))
220 == other
221 .object_store
222 .as_ref()
223 .map(|(store, url)| (Arc::as_ptr(store), url))
224 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
225 && self.aws_credentials.as_ref().map(Arc::as_ptr)
226 == other.aws_credentials.as_ref().map(Arc::as_ptr)
227 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
228 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
229 && self.storage_options == other.storage_options
230 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
231 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
232 }
233}
234
235fn uri_to_url(uri: &str) -> Result<Url> {
236 match Url::parse(uri) {
237 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
238 local_path_to_url(uri)
240 }
241 Ok(url) => Ok(url),
242 Err(_) => local_path_to_url(uri),
243 }
244}
245
246fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
247 let expanded = tilde(str_path.as_ref()).to_string();
248
249 let mut expanded_path = path_abs::PathAbs::new(expanded)
250 .unwrap()
251 .as_path()
252 .to_path_buf();
253 if let Some(s) = expanded_path.as_path().to_str() {
255 if s.is_empty() {
256 expanded_path = std::env::current_dir()?;
257 }
258 }
259
260 Ok(expanded_path)
261}
262
263fn local_path_to_url(str_path: &str) -> Result<Url> {
264 let expanded_path = expand_path(str_path)?;
265
266 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
267 source: format!("Invalid table location: '{}'", str_path).into(),
268 location: location!(),
269 })
270}
271
272impl ObjectStore {
273 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
281 let registry = Arc::new(ObjectStoreRegistry::default());
282
283 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
284 }
285
286 pub async fn from_uri_and_params(
290 registry: Arc<ObjectStoreRegistry>,
291 uri: &str,
292 params: &ObjectStoreParams,
293 ) -> Result<(Arc<Self>, Path)> {
294 #[allow(deprecated)]
295 if let Some((store, path)) = params.object_store.as_ref() {
296 let mut inner = store.clone();
297 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
298 inner = wrapper.wrap(inner);
299 }
300 let store = Self {
301 inner,
302 scheme: path.scheme().to_string(),
303 block_size: params.block_size.unwrap_or(64 * 1024),
304 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
305 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
306 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
307 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
308 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
309 };
310 let path = Path::from(path.path());
311 return Ok((Arc::new(store), path));
312 }
313 let url = uri_to_url(uri)?;
314 let store = registry.get_store(url.clone(), params).await?;
315 let provider = registry.get_provider(url.scheme()).expect_ok()?;
317 let path = provider.extract_path(&url);
318
319 Ok((store, path))
320 }
321
322 #[deprecated(note = "Use `from_uri` instead")]
323 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
324 Self::from_uri_and_params(
325 Arc::new(ObjectStoreRegistry::default()),
326 str_path,
327 &Default::default(),
328 )
329 .now_or_never()
330 .unwrap()
331 }
332
333 pub fn local() -> Self {
335 let provider = FileStoreProvider;
336 provider
337 .new_store(Url::parse("file:///").unwrap(), &Default::default())
338 .now_or_never()
339 .unwrap()
340 .unwrap()
341 }
342
343 pub fn memory() -> Self {
345 let provider = MemoryStoreProvider;
346 provider
347 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
348 .now_or_never()
349 .unwrap()
350 .unwrap()
351 }
352
353 pub fn is_local(&self) -> bool {
355 self.scheme == "file"
356 }
357
358 pub fn is_cloud(&self) -> bool {
359 self.scheme != "file" && self.scheme != "memory"
360 }
361
362 pub fn scheme(&self) -> &str {
363 &self.scheme
364 }
365
366 pub fn block_size(&self) -> usize {
367 self.block_size
368 }
369
370 pub fn max_iop_size(&self) -> u64 {
371 self.max_iop_size
372 }
373
374 pub fn io_parallelism(&self) -> usize {
375 std::env::var("LANCE_IO_THREADS")
376 .map(|val| val.parse::<usize>().unwrap())
377 .unwrap_or(self.io_parallelism)
378 }
379
380 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
385 match self.scheme.as_str() {
386 "file" => LocalObjectReader::open(path, self.block_size, None).await,
387 _ => Ok(Box::new(CloudObjectReader::new(
388 self.inner.clone(),
389 path.clone(),
390 self.block_size,
391 None,
392 self.download_retry_count,
393 )?)),
394 }
395 }
396
397 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
403 if known_size <= self.block_size {
406 return Ok(Box::new(SmallReader::new(
407 self.inner.clone(),
408 path.clone(),
409 self.download_retry_count,
410 known_size,
411 )));
412 }
413
414 match self.scheme.as_str() {
415 "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
416 _ => Ok(Box::new(CloudObjectReader::new(
417 self.inner.clone(),
418 path.clone(),
419 self.block_size,
420 Some(known_size),
421 self.download_retry_count,
422 )?)),
423 }
424 }
425
426 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
428 let object_store = Self::local();
429 let absolute_path = expand_path(path.to_string_lossy())?;
430 let os_path = Path::from_absolute_path(absolute_path)?;
431 object_store.create(&os_path).await
432 }
433
434 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
436 let object_store = Self::local();
437 let absolute_path = expand_path(path.to_string_lossy())?;
438 let os_path = Path::from_absolute_path(absolute_path)?;
439 object_store.open(&os_path).await
440 }
441
442 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
444 ObjectWriter::new(self, path).await
445 }
446
447 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
449 let mut writer = self.create(path).await?;
450 writer.write_all(content).await?;
451 writer.shutdown().await
452 }
453
454 pub async fn delete(&self, path: &Path) -> Result<()> {
455 self.inner.delete(path).await?;
456 Ok(())
457 }
458
459 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
460 Ok(self.inner.copy(from, to).await?)
461 }
462
463 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
465 let path = dir_path.into();
466 let path = Path::parse(&path)?;
467 let output = self.inner.list_with_delimiter(Some(&path)).await?;
468 Ok(output
469 .common_prefixes
470 .iter()
471 .chain(output.objects.iter().map(|o| &o.location))
472 .map(|s| s.filename().unwrap().to_string())
473 .collect())
474 }
475
476 pub fn list(
477 &self,
478 path: Option<Path>,
479 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
480 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
481 }
482
483 pub async fn read_dir_all(
487 &self,
488 dir_path: impl Into<&Path> + Send,
489 unmodified_since: Option<DateTime<Utc>>,
490 ) -> Result<BoxStream<Result<ObjectMeta>>> {
491 self.inner.read_dir_all(dir_path, unmodified_since).await
492 }
493
494 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
496 let path = dir_path.into();
497 let path = Path::parse(&path)?;
498
499 if self.is_local() {
500 return super::local::remove_dir_all(&path);
502 }
503 let sub_entries = self
504 .inner
505 .list(Some(&path))
506 .map(|m| m.map(|meta| meta.location))
507 .boxed();
508 self.inner
509 .delete_stream(sub_entries)
510 .try_collect::<Vec<_>>()
511 .await?;
512 Ok(())
513 }
514
515 pub fn remove_stream<'a>(
516 &'a self,
517 locations: BoxStream<'a, Result<Path>>,
518 ) -> BoxStream<'a, Result<Path>> {
519 self.inner
520 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
521 .err_into::<Error>()
522 .boxed()
523 }
524
525 pub async fn exists(&self, path: &Path) -> Result<bool> {
527 match self.inner.head(path).await {
528 Ok(_) => Ok(true),
529 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
530 Err(e) => Err(e.into()),
531 }
532 }
533
534 pub async fn size(&self, path: &Path) -> Result<usize> {
536 Ok(self.inner.head(path).await?.size)
537 }
538
539 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
541 let reader = self.open(path).await?;
542 Ok(reader.get_all().await?)
543 }
544
545 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
550 let reader = self.open(path).await?;
551 Ok(reader.get_range(range).await?)
552 }
553}
554
555#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
557pub enum LanceConfigKey {
558 DownloadRetryCount,
560}
561
562impl FromStr for LanceConfigKey {
563 type Err = Error;
564
565 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
566 match s.to_ascii_lowercase().as_str() {
567 "download_retry_count" => Ok(Self::DownloadRetryCount),
568 _ => Err(Error::InvalidInput {
569 source: format!("Invalid LanceConfigKey: {}", s).into(),
570 location: location!(),
571 }),
572 }
573 }
574}
575
576#[derive(Clone, Debug, Default)]
577pub struct StorageOptions(pub HashMap<String, String>);
578
579impl StorageOptions {
580 pub fn new(options: HashMap<String, String>) -> Self {
582 let mut options = options;
583 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
584 options.insert("allow_http".into(), value);
585 }
586 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
587 options.insert("allow_http".into(), value);
588 }
589 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
590 options.insert("allow_http".into(), value);
591 }
592 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
593 options.insert("client_max_retries".into(), value);
594 }
595 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
596 options.insert("client_retry_timeout".into(), value);
597 }
598 Self(options)
599 }
600
601 pub fn allow_http(&self) -> bool {
603 self.0.iter().any(|(key, value)| {
604 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
605 })
606 }
607
608 pub fn download_retry_count(&self) -> usize {
610 self.0
611 .iter()
612 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
613 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
614 .unwrap_or(3)
615 }
616
617 pub fn client_max_retries(&self) -> usize {
619 self.0
620 .iter()
621 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
622 .and_then(|(_, value)| value.parse::<usize>().ok())
623 .unwrap_or(10)
624 }
625
626 pub fn client_retry_timeout(&self) -> u64 {
628 self.0
629 .iter()
630 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
631 .and_then(|(_, value)| value.parse::<u64>().ok())
632 .unwrap_or(180)
633 }
634
635 pub fn get(&self, key: &str) -> Option<&String> {
636 self.0.get(key)
637 }
638}
639
640impl From<HashMap<String, String>> for StorageOptions {
641 fn from(value: HashMap<String, String>) -> Self {
642 Self::new(value)
643 }
644}
645
646impl ObjectStore {
647 #[allow(clippy::too_many_arguments)]
648 pub fn new(
649 store: Arc<DynObjectStore>,
650 location: Url,
651 block_size: Option<usize>,
652 wrapper: Option<Arc<dyn WrappingObjectStore>>,
653 use_constant_size_upload_parts: bool,
654 list_is_lexically_ordered: bool,
655 io_parallelism: usize,
656 download_retry_count: usize,
657 ) -> Self {
658 let scheme = location.scheme();
659 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
660
661 let store = match wrapper {
662 Some(wrapper) => wrapper.wrap(store),
663 None => store,
664 };
665
666 Self {
667 inner: store,
668 scheme: scheme.into(),
669 block_size,
670 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
671 use_constant_size_upload_parts,
672 list_is_lexically_ordered,
673 io_parallelism,
674 download_retry_count,
675 }
676 }
677}
678
679fn infer_block_size(scheme: &str) -> usize {
680 match scheme {
684 "file" => 4 * 1024,
685 _ => 64 * 1024,
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692 use object_store::memory::InMemory;
693 use parquet::data_type::AsBytes;
694 use rstest::rstest;
695 use std::env::set_current_dir;
696 use std::fs::{create_dir_all, write};
697 use std::path::Path as StdPath;
698 use std::sync::atomic::{AtomicBool, Ordering};
699
700 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
702 let expanded = tilde(path_str).to_string();
703 let path = StdPath::new(&expanded);
704 std::fs::create_dir_all(path.parent().unwrap())?;
705 write(path, contents)
706 }
707
708 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
709 let test_file_store = store.open(path).await.unwrap();
710 let size = test_file_store.size().await.unwrap();
711 let bytes = test_file_store.get_range(0..size).await.unwrap();
712 let contents = String::from_utf8(bytes.to_vec()).unwrap();
713 Ok(contents)
714 }
715
716 #[tokio::test]
717 async fn test_absolute_paths() {
718 let tmp_dir = tempfile::tempdir().unwrap();
719 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
720 write_to_file(
721 &format!("{tmp_path}/bar/foo.lance/test_file"),
722 "TEST_CONTENT",
723 )
724 .unwrap();
725
726 for uri in &[
728 format!("{tmp_path}/bar/foo.lance"),
729 format!("{tmp_path}/./bar/foo.lance"),
730 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
731 ] {
732 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
733 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
734 .await
735 .unwrap();
736 assert_eq!(contents, "TEST_CONTENT");
737 }
738 }
739
740 #[tokio::test]
741 async fn test_cloud_paths() {
742 let uri = "s3://bucket/foo.lance";
743 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
744 assert_eq!(store.scheme, "s3");
745 assert_eq!(path.to_string(), "foo.lance");
746
747 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
748 .await
749 .unwrap();
750 assert_eq!(store.scheme, "s3");
751 assert_eq!(path.to_string(), "foo.lance");
752
753 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
754 .await
755 .unwrap();
756 assert_eq!(store.scheme, "gs");
757 assert_eq!(path.to_string(), "foo.lance");
758 }
759
760 async fn test_block_size_used_test_helper(
761 uri: &str,
762 storage_options: Option<HashMap<String, String>>,
763 default_expected_block_size: usize,
764 ) {
765 let registry = Arc::new(ObjectStoreRegistry::default());
767 let params = ObjectStoreParams {
768 storage_options: storage_options.clone(),
769 ..ObjectStoreParams::default()
770 };
771 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
772 .await
773 .unwrap();
774 assert_eq!(store.block_size, default_expected_block_size);
775
776 let registry = Arc::new(ObjectStoreRegistry::default());
778 let params = ObjectStoreParams {
779 block_size: Some(1024),
780 storage_options: storage_options.clone(),
781 ..ObjectStoreParams::default()
782 };
783 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
784 .await
785 .unwrap();
786 assert_eq!(store.block_size, 1024);
787 }
788
789 #[rstest]
790 #[case("s3://bucket/foo.lance", None)]
791 #[case("gs://bucket/foo.lance", None)]
792 #[case("az://account/bucket/foo.lance",
793 Some(HashMap::from([
794 (String::from("account_name"), String::from("account")),
795 (String::from("container_name"), String::from("container"))
796 ])))]
797 #[tokio::test]
798 async fn test_block_size_used_cloud(
799 #[case] uri: &str,
800 #[case] storage_options: Option<HashMap<String, String>>,
801 ) {
802 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
803 }
804
805 #[rstest]
806 #[case("file")]
807 #[case("file-object-store")]
808 #[case("memory:///bucket/foo.lance")]
809 #[tokio::test]
810 async fn test_block_size_used_file(#[case] prefix: &str) {
811 let tmp_dir = tempfile::tempdir().unwrap();
812 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
813 let path = format!("{tmp_path}/bar/foo.lance/test_file");
814 write_to_file(&path, "URL").unwrap();
815 let uri = format!("{prefix}:///{path}");
816 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
817 }
818
819 #[tokio::test]
820 async fn test_relative_paths() {
821 let tmp_dir = tempfile::tempdir().unwrap();
822 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
823 write_to_file(
824 &format!("{tmp_path}/bar/foo.lance/test_file"),
825 "RELATIVE_URL",
826 )
827 .unwrap();
828
829 set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
830 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
831
832 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
833 .await
834 .unwrap();
835 assert_eq!(contents, "RELATIVE_URL");
836 }
837
838 #[tokio::test]
839 async fn test_tilde_expansion() {
840 let uri = "~/foo.lance";
841 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
842 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
843 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
844 .await
845 .unwrap();
846 assert_eq!(contents, "TILDE");
847 }
848
849 #[tokio::test]
850 async fn test_read_directory() {
851 let tmp_dir = tempfile::tempdir().unwrap();
852 let path = tmp_dir.path();
853 create_dir_all(path.join("foo").join("bar")).unwrap();
854 create_dir_all(path.join("foo").join("zoo")).unwrap();
855 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
856 write_to_file(
857 path.join("foo").join("test_file").to_str().unwrap(),
858 "read_dir",
859 )
860 .unwrap();
861 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
862
863 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
864 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
865 }
866
867 #[tokio::test]
868 async fn test_delete_directory() {
869 let tmp_dir = tempfile::tempdir().unwrap();
870 let path = tmp_dir.path();
871 create_dir_all(path.join("foo").join("bar")).unwrap();
872 create_dir_all(path.join("foo").join("zoo")).unwrap();
873 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
874 write_to_file(
875 path.join("foo")
876 .join("bar")
877 .join("test_file")
878 .to_str()
879 .unwrap(),
880 "delete",
881 )
882 .unwrap();
883 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
884 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
885 store.remove_dir_all(base.child("foo")).await.unwrap();
886
887 assert!(!path.join("foo").exists());
888 }
889
890 #[derive(Debug)]
891 struct TestWrapper {
892 called: AtomicBool,
893
894 return_value: Arc<dyn OSObjectStore>,
895 }
896
897 impl WrappingObjectStore for TestWrapper {
898 fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
899 self.called.store(true, Ordering::Relaxed);
900
901 self.return_value.clone()
903 }
904 }
905
906 impl TestWrapper {
907 fn called(&self) -> bool {
908 self.called.load(Ordering::Relaxed)
909 }
910 }
911
912 #[tokio::test]
913 async fn test_wrapping_object_store_option_is_used() {
914 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
916 let registry = Arc::new(ObjectStoreRegistry::default());
917
918 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
919
920 let wrapper = Arc::new(TestWrapper {
921 called: AtomicBool::new(false),
922 return_value: mock_inner_store.clone(),
923 });
924
925 let params = ObjectStoreParams {
926 object_store_wrapper: Some(wrapper.clone()),
927 ..ObjectStoreParams::default()
928 };
929
930 assert!(!wrapper.called());
932
933 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
934 .await
935 .unwrap();
936
937 assert!(wrapper.called());
939
940 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
943 }
944
945 #[tokio::test]
946 async fn test_local_paths() {
947 let temp_dir = tempfile::tempdir().unwrap();
948
949 let file_path = temp_dir.path().join("test_file");
950 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
951 .await
952 .unwrap();
953 writer.write_all(b"LOCAL").await.unwrap();
954 writer.shutdown().await.unwrap();
955
956 let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
957 let buf = reader.get_range(0..5).await.unwrap();
958 assert_eq!(buf.as_bytes(), b"LOCAL");
959 }
960
961 #[tokio::test]
962 async fn test_read_one() {
963 let temp_dir = tempfile::tempdir().unwrap();
964
965 let file_path = temp_dir.path().join("test_file");
966 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
967 .await
968 .unwrap();
969 writer.write_all(b"LOCAL").await.unwrap();
970 writer.shutdown().await.unwrap();
971
972 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
973 let obj_store = ObjectStore::local();
974 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
975 assert_eq!(buf.as_bytes(), b"LOCAL");
976
977 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
978 assert_eq!(buf.as_bytes(), b"LOCAL");
979 }
980
981 #[tokio::test]
982 #[cfg(windows)]
983 async fn test_windows_paths() {
984 use std::path::Component;
985 use std::path::Prefix;
986 use std::path::Prefix::*;
987
988 fn get_path_prefix(path: &StdPath) -> Prefix {
989 match path.components().next().unwrap() {
990 Component::Prefix(prefix_component) => prefix_component.kind(),
991 _ => panic!(),
992 }
993 }
994
995 fn get_drive_letter(prefix: Prefix) -> String {
996 match prefix {
997 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
998 _ => panic!(),
999 }
1000 }
1001
1002 let tmp_dir = tempfile::tempdir().unwrap();
1003 let tmp_path = tmp_dir.path();
1004 let prefix = get_path_prefix(tmp_path);
1005 let drive_letter = get_drive_letter(prefix);
1006
1007 write_to_file(
1008 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1009 "WINDOWS",
1010 )
1011 .unwrap();
1012
1013 for uri in &[
1014 format!("{drive_letter}:/test_folder/test.lance"),
1015 format!("{drive_letter}:\\test_folder\\test.lance"),
1016 ] {
1017 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1018 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1019 .await
1020 .unwrap();
1021 assert_eq!(contents, "WINDOWS");
1022 }
1023 }
1024}