1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37mod tracing;
38use crate::object_writer::WriteResult;
39use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
40use lance_core::{Error, Result};
41
42pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
47pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
49
50const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
52const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
55
56pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
57
58#[async_trait]
59pub trait ObjectStoreExt {
60 async fn exists(&self, path: &Path) -> Result<bool>;
62
63 async fn read_dir_all<'a>(
67 &'a self,
68 dir_path: impl Into<&Path> + Send,
69 unmodified_since: Option<DateTime<Utc>>,
70 ) -> Result<BoxStream<'a, Result<ObjectMeta>>>;
71}
72
73#[async_trait]
74impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
75 async fn read_dir_all<'a>(
76 &'a self,
77 dir_path: impl Into<&Path> + Send,
78 unmodified_since: Option<DateTime<Utc>>,
79 ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
80 let mut output = self.list(Some(dir_path.into()));
81 if let Some(unmodified_since_val) = unmodified_since {
82 output = output
83 .try_filter(move |file| future::ready(file.last_modified < unmodified_since_val))
84 .boxed();
85 }
86 Ok(output.map_err(|e| e.into()).boxed())
87 }
88
89 async fn exists(&self, path: &Path) -> Result<bool> {
90 match self.head(path).await {
91 Ok(_) => Ok(true),
92 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
93 Err(e) => Err(e.into()),
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
100pub struct ObjectStore {
101 pub inner: Arc<dyn OSObjectStore>,
103 scheme: String,
104 block_size: usize,
105 pub use_constant_size_upload_parts: bool,
108 pub list_is_lexically_ordered: bool,
111 io_parallelism: usize,
112 download_retry_count: usize,
114}
115
116impl DeepSizeOf for ObjectStore {
117 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
118 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
123 }
124}
125
126impl std::fmt::Display for ObjectStore {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 write!(f, "ObjectStore({})", self.scheme)
129 }
130}
131
132pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
133 fn wrap(&self, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
134}
135
136#[derive(Debug, Clone)]
139pub struct ObjectStoreParams {
140 pub block_size: Option<usize>,
141 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
142 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
143 pub s3_credentials_refresh_offset: Duration,
144 #[cfg(feature = "aws")]
145 pub aws_credentials: Option<AwsCredentialProvider>,
146 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
147 pub storage_options: Option<HashMap<String, String>>,
148 pub use_constant_size_upload_parts: bool,
153 pub list_is_lexically_ordered: Option<bool>,
154}
155
156impl Default for ObjectStoreParams {
157 fn default() -> Self {
158 #[allow(deprecated)]
159 Self {
160 object_store: None,
161 block_size: None,
162 s3_credentials_refresh_offset: Duration::from_secs(60),
163 #[cfg(feature = "aws")]
164 aws_credentials: None,
165 object_store_wrapper: None,
166 storage_options: None,
167 use_constant_size_upload_parts: false,
168 list_is_lexically_ordered: None,
169 }
170 }
171}
172
173impl std::hash::Hash for ObjectStoreParams {
175 #[allow(deprecated)]
176 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
177 self.block_size.hash(state);
179 if let Some((store, url)) = &self.object_store {
180 Arc::as_ptr(store).hash(state);
181 url.hash(state);
182 }
183 self.s3_credentials_refresh_offset.hash(state);
184 #[cfg(feature = "aws")]
185 if let Some(aws_credentials) = &self.aws_credentials {
186 Arc::as_ptr(aws_credentials).hash(state);
187 }
188 if let Some(wrapper) = &self.object_store_wrapper {
189 Arc::as_ptr(wrapper).hash(state);
190 }
191 if let Some(storage_options) = &self.storage_options {
192 for (key, value) in storage_options {
193 key.hash(state);
194 value.hash(state);
195 }
196 }
197 self.use_constant_size_upload_parts.hash(state);
198 self.list_is_lexically_ordered.hash(state);
199 }
200}
201
202impl Eq for ObjectStoreParams {}
204impl PartialEq for ObjectStoreParams {
205 #[allow(deprecated)]
206 fn eq(&self, other: &Self) -> bool {
207 self.block_size == other.block_size
209 && self
210 .object_store
211 .as_ref()
212 .map(|(store, url)| (Arc::as_ptr(store), url))
213 == other
214 .object_store
215 .as_ref()
216 .map(|(store, url)| (Arc::as_ptr(store), url))
217 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
218 && self.aws_credentials.as_ref().map(Arc::as_ptr)
219 == other.aws_credentials.as_ref().map(Arc::as_ptr)
220 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
221 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
222 && self.storage_options == other.storage_options
223 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
224 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
225 }
226}
227
228fn uri_to_url(uri: &str) -> Result<Url> {
229 match Url::parse(uri) {
230 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
231 local_path_to_url(uri)
233 }
234 Ok(url) => Ok(url),
235 Err(_) => local_path_to_url(uri),
236 }
237}
238
239fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
240 let expanded = tilde(str_path.as_ref()).to_string();
241
242 let mut expanded_path = path_abs::PathAbs::new(expanded)
243 .unwrap()
244 .as_path()
245 .to_path_buf();
246 if let Some(s) = expanded_path.as_path().to_str() {
248 if s.is_empty() {
249 expanded_path = std::env::current_dir()?;
250 }
251 }
252
253 Ok(expanded_path)
254}
255
256fn local_path_to_url(str_path: &str) -> Result<Url> {
257 let expanded_path = expand_path(str_path)?;
258
259 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
260 source: format!("Invalid table location: '{}'", str_path).into(),
261 location: location!(),
262 })
263}
264
265impl ObjectStore {
266 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
274 let registry = Arc::new(ObjectStoreRegistry::default());
275
276 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
277 }
278
279 pub async fn from_uri_and_params(
283 registry: Arc<ObjectStoreRegistry>,
284 uri: &str,
285 params: &ObjectStoreParams,
286 ) -> Result<(Arc<Self>, Path)> {
287 #[allow(deprecated)]
288 if let Some((store, path)) = params.object_store.as_ref() {
289 let mut inner = store.clone();
290 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
291 inner = wrapper.wrap(inner);
292 }
293 let store = Self {
294 inner,
295 scheme: path.scheme().to_string(),
296 block_size: params.block_size.unwrap_or(64 * 1024),
297 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
298 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
299 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
300 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
301 };
302 let path = Path::from(path.path());
303 return Ok((Arc::new(store), path));
304 }
305 let url = uri_to_url(uri)?;
306 let store = registry.get_store(url.clone(), params).await?;
307 let provider = registry.get_provider(url.scheme()).expect_ok()?;
309 let path = provider.extract_path(&url);
310
311 Ok((store, path))
312 }
313
314 #[deprecated(note = "Use `from_uri` instead")]
315 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
316 Self::from_uri_and_params(
317 Arc::new(ObjectStoreRegistry::default()),
318 str_path,
319 &Default::default(),
320 )
321 .now_or_never()
322 .unwrap()
323 }
324
325 pub fn local() -> Self {
327 let provider = FileStoreProvider;
328 provider
329 .new_store(Url::parse("file:///").unwrap(), &Default::default())
330 .now_or_never()
331 .unwrap()
332 .unwrap()
333 }
334
335 pub fn memory() -> Self {
337 let provider = MemoryStoreProvider;
338 provider
339 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
340 .now_or_never()
341 .unwrap()
342 .unwrap()
343 }
344
345 pub fn is_local(&self) -> bool {
347 self.scheme == "file"
348 }
349
350 pub fn is_cloud(&self) -> bool {
351 self.scheme != "file" && self.scheme != "memory"
352 }
353
354 pub fn block_size(&self) -> usize {
355 self.block_size
356 }
357
358 pub fn io_parallelism(&self) -> usize {
359 std::env::var("LANCE_IO_THREADS")
360 .map(|val| val.parse::<usize>().unwrap())
361 .unwrap_or(self.io_parallelism)
362 }
363
364 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
369 match self.scheme.as_str() {
370 "file" => LocalObjectReader::open(path, self.block_size, None).await,
371 _ => Ok(Box::new(CloudObjectReader::new(
372 self.inner.clone(),
373 path.clone(),
374 self.block_size,
375 None,
376 self.download_retry_count,
377 )?)),
378 }
379 }
380
381 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
387 match self.scheme.as_str() {
388 "file" => LocalObjectReader::open(path, self.block_size, Some(known_size)).await,
389 _ => Ok(Box::new(CloudObjectReader::new(
390 self.inner.clone(),
391 path.clone(),
392 self.block_size,
393 Some(known_size),
394 self.download_retry_count,
395 )?)),
396 }
397 }
398
399 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
401 let object_store = Self::local();
402 let absolute_path = expand_path(path.to_string_lossy())?;
403 let os_path = Path::from_absolute_path(absolute_path)?;
404 object_store.create(&os_path).await
405 }
406
407 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
409 let object_store = Self::local();
410 let absolute_path = expand_path(path.to_string_lossy())?;
411 let os_path = Path::from_absolute_path(absolute_path)?;
412 object_store.open(&os_path).await
413 }
414
415 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
417 ObjectWriter::new(self, path).await
418 }
419
420 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
422 let mut writer = self.create(path).await?;
423 writer.write_all(content).await?;
424 writer.shutdown().await
425 }
426
427 pub async fn delete(&self, path: &Path) -> Result<()> {
428 self.inner.delete(path).await?;
429 Ok(())
430 }
431
432 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
433 Ok(self.inner.copy(from, to).await?)
434 }
435
436 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
438 let path = dir_path.into();
439 let path = Path::parse(&path)?;
440 let output = self.inner.list_with_delimiter(Some(&path)).await?;
441 Ok(output
442 .common_prefixes
443 .iter()
444 .chain(output.objects.iter().map(|o| &o.location))
445 .map(|s| s.filename().unwrap().to_string())
446 .collect())
447 }
448
449 pub fn list(
450 &self,
451 path: Option<Path>,
452 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
453 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
454 }
455
456 pub async fn read_dir_all(
460 &self,
461 dir_path: impl Into<&Path> + Send,
462 unmodified_since: Option<DateTime<Utc>>,
463 ) -> Result<BoxStream<Result<ObjectMeta>>> {
464 self.inner.read_dir_all(dir_path, unmodified_since).await
465 }
466
467 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
469 let path = dir_path.into();
470 let path = Path::parse(&path)?;
471
472 if self.is_local() {
473 return super::local::remove_dir_all(&path);
475 }
476 let sub_entries = self
477 .inner
478 .list(Some(&path))
479 .map(|m| m.map(|meta| meta.location))
480 .boxed();
481 self.inner
482 .delete_stream(sub_entries)
483 .try_collect::<Vec<_>>()
484 .await?;
485 Ok(())
486 }
487
488 pub fn remove_stream<'a>(
489 &'a self,
490 locations: BoxStream<'a, Result<Path>>,
491 ) -> BoxStream<'a, Result<Path>> {
492 self.inner
493 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
494 .err_into::<Error>()
495 .boxed()
496 }
497
498 pub async fn exists(&self, path: &Path) -> Result<bool> {
500 match self.inner.head(path).await {
501 Ok(_) => Ok(true),
502 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
503 Err(e) => Err(e.into()),
504 }
505 }
506
507 pub async fn size(&self, path: &Path) -> Result<usize> {
509 Ok(self.inner.head(path).await?.size)
510 }
511
512 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
514 let reader = self.open(path).await?;
515 Ok(reader.get_all().await?)
516 }
517
518 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
523 let reader = self.open(path).await?;
524 Ok(reader.get_range(range).await?)
525 }
526}
527
528#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
530pub enum LanceConfigKey {
531 DownloadRetryCount,
533}
534
535impl FromStr for LanceConfigKey {
536 type Err = Error;
537
538 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
539 match s.to_ascii_lowercase().as_str() {
540 "download_retry_count" => Ok(Self::DownloadRetryCount),
541 _ => Err(Error::InvalidInput {
542 source: format!("Invalid LanceConfigKey: {}", s).into(),
543 location: location!(),
544 }),
545 }
546 }
547}
548
549#[derive(Clone, Debug, Default)]
550pub struct StorageOptions(pub HashMap<String, String>);
551
552impl StorageOptions {
553 pub fn new(options: HashMap<String, String>) -> Self {
555 let mut options = options;
556 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
557 options.insert("allow_http".into(), value);
558 }
559 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
560 options.insert("allow_http".into(), value);
561 }
562 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
563 options.insert("allow_http".into(), value);
564 }
565 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
566 options.insert("client_max_retries".into(), value);
567 }
568 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
569 options.insert("client_retry_timeout".into(), value);
570 }
571 Self(options)
572 }
573
574 pub fn allow_http(&self) -> bool {
576 self.0.iter().any(|(key, value)| {
577 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
578 })
579 }
580
581 pub fn download_retry_count(&self) -> usize {
583 self.0
584 .iter()
585 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
586 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
587 .unwrap_or(3)
588 }
589
590 pub fn client_max_retries(&self) -> usize {
592 self.0
593 .iter()
594 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
595 .and_then(|(_, value)| value.parse::<usize>().ok())
596 .unwrap_or(10)
597 }
598
599 pub fn client_retry_timeout(&self) -> u64 {
601 self.0
602 .iter()
603 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
604 .and_then(|(_, value)| value.parse::<u64>().ok())
605 .unwrap_or(180)
606 }
607
608 pub fn get(&self, key: &str) -> Option<&String> {
609 self.0.get(key)
610 }
611}
612
613impl From<HashMap<String, String>> for StorageOptions {
614 fn from(value: HashMap<String, String>) -> Self {
615 Self::new(value)
616 }
617}
618
619impl ObjectStore {
620 #[allow(clippy::too_many_arguments)]
621 pub fn new(
622 store: Arc<DynObjectStore>,
623 location: Url,
624 block_size: Option<usize>,
625 wrapper: Option<Arc<dyn WrappingObjectStore>>,
626 use_constant_size_upload_parts: bool,
627 list_is_lexically_ordered: bool,
628 io_parallelism: usize,
629 download_retry_count: usize,
630 ) -> Self {
631 let scheme = location.scheme();
632 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
633
634 let store = match wrapper {
635 Some(wrapper) => wrapper.wrap(store),
636 None => store,
637 };
638
639 Self {
640 inner: store,
641 scheme: scheme.into(),
642 block_size,
643 use_constant_size_upload_parts,
644 list_is_lexically_ordered,
645 io_parallelism,
646 download_retry_count,
647 }
648 }
649}
650
651fn infer_block_size(scheme: &str) -> usize {
652 match scheme {
656 "file" => 4 * 1024,
657 _ => 64 * 1024,
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664 use object_store::memory::InMemory;
665 use parquet::data_type::AsBytes;
666 use rstest::rstest;
667 use std::env::set_current_dir;
668 use std::fs::{create_dir_all, write};
669 use std::path::Path as StdPath;
670 use std::sync::atomic::{AtomicBool, Ordering};
671
672 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
674 let expanded = tilde(path_str).to_string();
675 let path = StdPath::new(&expanded);
676 std::fs::create_dir_all(path.parent().unwrap())?;
677 write(path, contents)
678 }
679
680 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
681 let test_file_store = store.open(path).await.unwrap();
682 let size = test_file_store.size().await.unwrap();
683 let bytes = test_file_store.get_range(0..size).await.unwrap();
684 let contents = String::from_utf8(bytes.to_vec()).unwrap();
685 Ok(contents)
686 }
687
688 #[tokio::test]
689 async fn test_absolute_paths() {
690 let tmp_dir = tempfile::tempdir().unwrap();
691 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
692 write_to_file(
693 &format!("{tmp_path}/bar/foo.lance/test_file"),
694 "TEST_CONTENT",
695 )
696 .unwrap();
697
698 for uri in &[
700 format!("{tmp_path}/bar/foo.lance"),
701 format!("{tmp_path}/./bar/foo.lance"),
702 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
703 ] {
704 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
705 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
706 .await
707 .unwrap();
708 assert_eq!(contents, "TEST_CONTENT");
709 }
710 }
711
712 #[tokio::test]
713 async fn test_cloud_paths() {
714 let uri = "s3://bucket/foo.lance";
715 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
716 assert_eq!(store.scheme, "s3");
717 assert_eq!(path.to_string(), "foo.lance");
718
719 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
720 .await
721 .unwrap();
722 assert_eq!(store.scheme, "s3");
723 assert_eq!(path.to_string(), "foo.lance");
724
725 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
726 .await
727 .unwrap();
728 assert_eq!(store.scheme, "gs");
729 assert_eq!(path.to_string(), "foo.lance");
730 }
731
732 async fn test_block_size_used_test_helper(
733 uri: &str,
734 storage_options: Option<HashMap<String, String>>,
735 default_expected_block_size: usize,
736 ) {
737 let registry = Arc::new(ObjectStoreRegistry::default());
739 let params = ObjectStoreParams {
740 storage_options: storage_options.clone(),
741 ..ObjectStoreParams::default()
742 };
743 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
744 .await
745 .unwrap();
746 assert_eq!(store.block_size, default_expected_block_size);
747
748 let registry = Arc::new(ObjectStoreRegistry::default());
750 let params = ObjectStoreParams {
751 block_size: Some(1024),
752 storage_options: storage_options.clone(),
753 ..ObjectStoreParams::default()
754 };
755 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
756 .await
757 .unwrap();
758 assert_eq!(store.block_size, 1024);
759 }
760
761 #[rstest]
762 #[case("s3://bucket/foo.lance", None)]
763 #[case("gs://bucket/foo.lance", None)]
764 #[case("az://account/bucket/foo.lance",
765 Some(HashMap::from([
766 (String::from("account_name"), String::from("account")),
767 (String::from("container_name"), String::from("container"))
768 ])))]
769 #[tokio::test]
770 async fn test_block_size_used_cloud(
771 #[case] uri: &str,
772 #[case] storage_options: Option<HashMap<String, String>>,
773 ) {
774 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
775 }
776
777 #[rstest]
778 #[case("file")]
779 #[case("file-object-store")]
780 #[case("memory:///bucket/foo.lance")]
781 #[tokio::test]
782 async fn test_block_size_used_file(#[case] prefix: &str) {
783 let tmp_dir = tempfile::tempdir().unwrap();
784 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
785 let path = format!("{tmp_path}/bar/foo.lance/test_file");
786 write_to_file(&path, "URL").unwrap();
787 let uri = format!("{prefix}:///{path}");
788 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
789 }
790
791 #[tokio::test]
792 async fn test_relative_paths() {
793 let tmp_dir = tempfile::tempdir().unwrap();
794 let tmp_path = tmp_dir.path().to_str().unwrap().to_owned();
795 write_to_file(
796 &format!("{tmp_path}/bar/foo.lance/test_file"),
797 "RELATIVE_URL",
798 )
799 .unwrap();
800
801 set_current_dir(StdPath::new(&tmp_path)).expect("Error changing current dir");
802 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
803
804 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
805 .await
806 .unwrap();
807 assert_eq!(contents, "RELATIVE_URL");
808 }
809
810 #[tokio::test]
811 async fn test_tilde_expansion() {
812 let uri = "~/foo.lance";
813 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
814 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
815 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
816 .await
817 .unwrap();
818 assert_eq!(contents, "TILDE");
819 }
820
821 #[tokio::test]
822 async fn test_read_directory() {
823 let tmp_dir = tempfile::tempdir().unwrap();
824 let path = tmp_dir.path();
825 create_dir_all(path.join("foo").join("bar")).unwrap();
826 create_dir_all(path.join("foo").join("zoo")).unwrap();
827 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
828 write_to_file(
829 path.join("foo").join("test_file").to_str().unwrap(),
830 "read_dir",
831 )
832 .unwrap();
833 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
834
835 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
836 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
837 }
838
839 #[tokio::test]
840 async fn test_delete_directory() {
841 let tmp_dir = tempfile::tempdir().unwrap();
842 let path = tmp_dir.path();
843 create_dir_all(path.join("foo").join("bar")).unwrap();
844 create_dir_all(path.join("foo").join("zoo")).unwrap();
845 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
846 write_to_file(
847 path.join("foo")
848 .join("bar")
849 .join("test_file")
850 .to_str()
851 .unwrap(),
852 "delete",
853 )
854 .unwrap();
855 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
856 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
857 store.remove_dir_all(base.child("foo")).await.unwrap();
858
859 assert!(!path.join("foo").exists());
860 }
861
862 #[derive(Debug)]
863 struct TestWrapper {
864 called: AtomicBool,
865
866 return_value: Arc<dyn OSObjectStore>,
867 }
868
869 impl WrappingObjectStore for TestWrapper {
870 fn wrap(&self, _original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
871 self.called.store(true, Ordering::Relaxed);
872
873 self.return_value.clone()
875 }
876 }
877
878 impl TestWrapper {
879 fn called(&self) -> bool {
880 self.called.load(Ordering::Relaxed)
881 }
882 }
883
884 #[tokio::test]
885 async fn test_wrapping_object_store_option_is_used() {
886 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
888 let registry = Arc::new(ObjectStoreRegistry::default());
889
890 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
891
892 let wrapper = Arc::new(TestWrapper {
893 called: AtomicBool::new(false),
894 return_value: mock_inner_store.clone(),
895 });
896
897 let params = ObjectStoreParams {
898 object_store_wrapper: Some(wrapper.clone()),
899 ..ObjectStoreParams::default()
900 };
901
902 assert!(!wrapper.called());
904
905 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
906 .await
907 .unwrap();
908
909 assert!(wrapper.called());
911
912 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
915 }
916
917 #[tokio::test]
918 async fn test_local_paths() {
919 let temp_dir = tempfile::tempdir().unwrap();
920
921 let file_path = temp_dir.path().join("test_file");
922 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
923 .await
924 .unwrap();
925 writer.write_all(b"LOCAL").await.unwrap();
926 writer.shutdown().await.unwrap();
927
928 let reader = ObjectStore::open_local(file_path.as_path()).await.unwrap();
929 let buf = reader.get_range(0..5).await.unwrap();
930 assert_eq!(buf.as_bytes(), b"LOCAL");
931 }
932
933 #[tokio::test]
934 async fn test_read_one() {
935 let temp_dir = tempfile::tempdir().unwrap();
936
937 let file_path = temp_dir.path().join("test_file");
938 let mut writer = ObjectStore::create_local_writer(file_path.as_path())
939 .await
940 .unwrap();
941 writer.write_all(b"LOCAL").await.unwrap();
942 writer.shutdown().await.unwrap();
943
944 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
945 let obj_store = ObjectStore::local();
946 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
947 assert_eq!(buf.as_bytes(), b"LOCAL");
948
949 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
950 assert_eq!(buf.as_bytes(), b"LOCAL");
951 }
952
953 #[tokio::test]
954 #[cfg(windows)]
955 async fn test_windows_paths() {
956 use std::path::Component;
957 use std::path::Prefix;
958 use std::path::Prefix::*;
959
960 fn get_path_prefix(path: &StdPath) -> Prefix {
961 match path.components().next().unwrap() {
962 Component::Prefix(prefix_component) => prefix_component.kind(),
963 _ => panic!(),
964 }
965 }
966
967 fn get_drive_letter(prefix: Prefix) -> String {
968 match prefix {
969 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
970 _ => panic!(),
971 }
972 }
973
974 let tmp_dir = tempfile::tempdir().unwrap();
975 let tmp_path = tmp_dir.path();
976 let prefix = get_path_prefix(tmp_path);
977 let drive_letter = get_drive_letter(prefix);
978
979 write_to_file(
980 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
981 "WINDOWS",
982 )
983 .unwrap();
984
985 for uri in &[
986 format!("{drive_letter}:/test_folder/test.lance"),
987 format!("{drive_letter}:\\test_folder\\test.lance"),
988 ] {
989 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
990 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
991 .await
992 .unwrap();
993 assert_eq!(contents, "WINDOWS");
994 }
995 }
996}