1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::WriteResult;
41use crate::utils::tracking_store::{IOTracker, IoStats};
42use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
43use lance_core::{Error, Result};
44
45pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
50pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
52
53const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
55const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
58 std::env::var("LANCE_MAX_IOP_SIZE")
59 .map(|val| val.parse().unwrap())
60 .unwrap_or(16 * 1024 * 1024)
61});
62
63pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
64
65pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
66pub use storage_options::{
67 LanceNamespaceStorageOptionsProvider, StorageOptionsProvider, EXPIRES_AT_MILLIS_KEY,
68};
69
70#[async_trait]
71pub trait ObjectStoreExt {
72 async fn exists(&self, path: &Path) -> Result<bool>;
74
75 fn read_dir_all<'a, 'b>(
79 &'a self,
80 dir_path: impl Into<&'b Path> + Send,
81 unmodified_since: Option<DateTime<Utc>>,
82 ) -> BoxStream<'a, Result<ObjectMeta>>;
83}
84
85#[async_trait]
86impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
87 fn read_dir_all<'a, 'b>(
88 &'a self,
89 dir_path: impl Into<&'b Path> + Send,
90 unmodified_since: Option<DateTime<Utc>>,
91 ) -> BoxStream<'a, Result<ObjectMeta>> {
92 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
93 if let Some(unmodified_since_val) = unmodified_since {
94 output
95 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
96 .boxed()
97 } else {
98 output.boxed()
99 }
100 }
101
102 async fn exists(&self, path: &Path) -> Result<bool> {
103 match self.head(path).await {
104 Ok(_) => Ok(true),
105 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
106 Err(e) => Err(e.into()),
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct ObjectStore {
114 pub inner: Arc<dyn OSObjectStore>,
116 scheme: String,
117 block_size: usize,
118 max_iop_size: u64,
119 pub use_constant_size_upload_parts: bool,
122 pub list_is_lexically_ordered: bool,
125 io_parallelism: usize,
126 download_retry_count: usize,
128 io_tracker: IOTracker,
130}
131
132impl DeepSizeOf for ObjectStore {
133 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
134 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
139 }
140}
141
142impl std::fmt::Display for ObjectStore {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 write!(f, "ObjectStore({})", self.scheme)
145 }
146}
147
148pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
149 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
154}
155
156#[derive(Debug, Clone)]
157pub struct ChainedWrappingObjectStore {
158 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
159}
160
161impl ChainedWrappingObjectStore {
162 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
163 Self { wrappers }
164 }
165
166 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
167 self.wrappers.push(wrapper);
168 }
169}
170
171impl WrappingObjectStore for ChainedWrappingObjectStore {
172 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
173 self.wrappers
174 .iter()
175 .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
176 }
177}
178
179#[derive(Debug, Clone)]
182pub struct ObjectStoreParams {
183 pub block_size: Option<usize>,
184 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
185 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
186 pub s3_credentials_refresh_offset: Duration,
187 #[cfg(feature = "aws")]
188 pub aws_credentials: Option<AwsCredentialProvider>,
189 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
190 pub storage_options: Option<HashMap<String, String>>,
191 pub storage_options_provider: Option<Arc<dyn StorageOptionsProvider>>,
193 pub use_constant_size_upload_parts: bool,
198 pub list_is_lexically_ordered: Option<bool>,
199}
200
201impl Default for ObjectStoreParams {
202 fn default() -> Self {
203 #[allow(deprecated)]
204 Self {
205 object_store: None,
206 block_size: None,
207 s3_credentials_refresh_offset: Duration::from_secs(60),
208 #[cfg(feature = "aws")]
209 aws_credentials: None,
210 object_store_wrapper: None,
211 storage_options: None,
212 storage_options_provider: None,
213 use_constant_size_upload_parts: false,
214 list_is_lexically_ordered: None,
215 }
216 }
217}
218
219impl std::hash::Hash for ObjectStoreParams {
221 #[allow(deprecated)]
222 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
223 self.block_size.hash(state);
225 if let Some((store, url)) = &self.object_store {
226 Arc::as_ptr(store).hash(state);
227 url.hash(state);
228 }
229 self.s3_credentials_refresh_offset.hash(state);
230 #[cfg(feature = "aws")]
231 if let Some(aws_credentials) = &self.aws_credentials {
232 Arc::as_ptr(aws_credentials).hash(state);
233 }
234 if let Some(wrapper) = &self.object_store_wrapper {
235 Arc::as_ptr(wrapper).hash(state);
236 }
237 if let Some(storage_options) = &self.storage_options {
238 for (key, value) in storage_options {
239 key.hash(state);
240 value.hash(state);
241 }
242 }
243 if let Some(provider) = &self.storage_options_provider {
244 provider.provider_id().hash(state);
245 }
246 self.use_constant_size_upload_parts.hash(state);
247 self.list_is_lexically_ordered.hash(state);
248 }
249}
250
251impl Eq for ObjectStoreParams {}
253impl PartialEq for ObjectStoreParams {
254 #[allow(deprecated)]
255 fn eq(&self, other: &Self) -> bool {
256 #[cfg(feature = "aws")]
257 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
258 return false;
259 }
260
261 self.block_size == other.block_size
264 && self
265 .object_store
266 .as_ref()
267 .map(|(store, url)| (Arc::as_ptr(store), url))
268 == other
269 .object_store
270 .as_ref()
271 .map(|(store, url)| (Arc::as_ptr(store), url))
272 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
273 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
274 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
275 && self.storage_options == other.storage_options
276 && self
277 .storage_options_provider
278 .as_ref()
279 .map(|p| p.provider_id())
280 == other
281 .storage_options_provider
282 .as_ref()
283 .map(|p| p.provider_id())
284 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
285 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
286 }
287}
288
289pub fn uri_to_url(uri: &str) -> Result<Url> {
310 match Url::parse(uri) {
311 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
312 local_path_to_url(uri)
314 }
315 Ok(url) => Ok(url),
316 Err(_) => local_path_to_url(uri),
317 }
318}
319
320fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
321 let expanded = tilde(str_path.as_ref()).to_string();
322
323 let mut expanded_path = path_abs::PathAbs::new(expanded)
324 .unwrap()
325 .as_path()
326 .to_path_buf();
327 if let Some(s) = expanded_path.as_path().to_str() {
329 if s.is_empty() {
330 expanded_path = std::env::current_dir()?;
331 }
332 }
333
334 Ok(expanded_path)
335}
336
337fn local_path_to_url(str_path: &str) -> Result<Url> {
338 let expanded_path = expand_path(str_path)?;
339
340 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
341 source: format!("Invalid table location: '{}'", str_path).into(),
342 location: location!(),
343 })
344}
345
346#[cfg(feature = "huggingface")]
347fn parse_hf_repo_id(url: &Url) -> Result<String> {
348 let mut segments: Vec<String> = Vec::new();
350 if let Some(host) = url.host_str() {
351 segments.push(host.to_string());
352 }
353 segments.extend(
354 url.path()
355 .trim_start_matches('/')
356 .split('/')
357 .map(|s| s.to_string()),
358 );
359
360 if segments.len() < 2 {
361 return Err(Error::invalid_input(
362 "Huggingface URL must contain at least owner and repo",
363 location!(),
364 ));
365 }
366
367 let repo_type_candidates = ["models", "datasets", "spaces"];
368 let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
369 if segments.len() < 3 {
370 return Err(Error::invalid_input(
371 "Huggingface URL missing owner/repo after repo type",
372 location!(),
373 ));
374 }
375 (segments[1].as_str(), segments[2].as_str())
376 } else {
377 (segments[0].as_str(), segments[1].as_str())
378 };
379
380 let repo = repo_with_rev
381 .split_once('@')
382 .map(|(r, _)| r)
383 .unwrap_or(repo_with_rev);
384 Ok(format!("{owner}/{repo}"))
385}
386
387impl ObjectStore {
388 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
396 let registry = Arc::new(ObjectStoreRegistry::default());
397
398 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
399 }
400
401 pub async fn from_uri_and_params(
405 registry: Arc<ObjectStoreRegistry>,
406 uri: &str,
407 params: &ObjectStoreParams,
408 ) -> Result<(Arc<Self>, Path)> {
409 #[allow(deprecated)]
410 if let Some((store, path)) = params.object_store.as_ref() {
411 let mut inner = store.clone();
412 let store_prefix =
413 registry.calculate_object_store_prefix(uri, params.storage_options.as_ref())?;
414 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
415 inner = wrapper.wrap(&store_prefix, inner);
416 }
417
418 let io_tracker = IOTracker::default();
420 let tracked_store = io_tracker.wrap("", inner);
421
422 let store = Self {
423 inner: tracked_store,
424 scheme: path.scheme().to_string(),
425 block_size: params.block_size.unwrap_or(64 * 1024),
426 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
427 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
428 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
429 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
430 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
431 io_tracker,
432 };
433 let path = Path::parse(path.path())?;
434 return Ok((Arc::new(store), path));
435 }
436 let url = uri_to_url(uri)?;
437
438 let store = registry.get_store(url.clone(), params).await?;
439 let provider = registry.get_provider(url.scheme()).expect_ok()?;
441 let path = provider.extract_path(&url)?;
442
443 Ok((store, path))
444 }
445
446 pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
460 let url = uri_to_url(uri)?;
461 let provider = registry.get_provider(url.scheme()).ok_or_else(|| {
462 Error::invalid_input(format!("Unknown scheme: {}", url.scheme()), location!())
463 })?;
464 provider.extract_path(&url)
465 }
466
467 #[deprecated(note = "Use `from_uri` instead")]
468 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
469 Self::from_uri_and_params(
470 Arc::new(ObjectStoreRegistry::default()),
471 str_path,
472 &Default::default(),
473 )
474 .now_or_never()
475 .unwrap()
476 }
477
478 pub fn local() -> Self {
480 let provider = FileStoreProvider;
481 provider
482 .new_store(Url::parse("file:///").unwrap(), &Default::default())
483 .now_or_never()
484 .unwrap()
485 .unwrap()
486 }
487
488 pub fn memory() -> Self {
490 let provider = MemoryStoreProvider;
491 provider
492 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
493 .now_or_never()
494 .unwrap()
495 .unwrap()
496 }
497
498 pub fn is_local(&self) -> bool {
500 self.scheme == "file"
501 }
502
503 pub fn is_cloud(&self) -> bool {
504 self.scheme != "file" && self.scheme != "memory"
505 }
506
507 pub fn scheme(&self) -> &str {
508 &self.scheme
509 }
510
511 pub fn block_size(&self) -> usize {
512 self.block_size
513 }
514
515 pub fn max_iop_size(&self) -> u64 {
516 self.max_iop_size
517 }
518
519 pub fn io_parallelism(&self) -> usize {
520 std::env::var("LANCE_IO_THREADS")
521 .map(|val| val.parse::<usize>().unwrap())
522 .unwrap_or(self.io_parallelism)
523 }
524
525 pub fn io_tracker(&self) -> &IOTracker {
530 &self.io_tracker
531 }
532
533 pub fn io_stats_snapshot(&self) -> IoStats {
538 self.io_tracker.stats()
539 }
540
541 pub fn io_stats_incremental(&self) -> IoStats {
547 self.io_tracker.incremental_stats()
548 }
549
550 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
555 match self.scheme.as_str() {
556 "file" => {
557 LocalObjectReader::open_with_tracker(
558 path,
559 self.block_size,
560 None,
561 Arc::new(self.io_tracker.clone()),
562 )
563 .await
564 }
565 _ => Ok(Box::new(CloudObjectReader::new(
566 self.inner.clone(),
567 path.clone(),
568 self.block_size,
569 None,
570 self.download_retry_count,
571 )?)),
572 }
573 }
574
575 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
581 if known_size <= self.block_size {
584 return Ok(Box::new(SmallReader::new(
585 self.inner.clone(),
586 path.clone(),
587 self.download_retry_count,
588 known_size,
589 )));
590 }
591
592 match self.scheme.as_str() {
593 "file" => {
594 LocalObjectReader::open_with_tracker(
595 path,
596 self.block_size,
597 Some(known_size),
598 Arc::new(self.io_tracker.clone()),
599 )
600 .await
601 }
602 _ => Ok(Box::new(CloudObjectReader::new(
603 self.inner.clone(),
604 path.clone(),
605 self.block_size,
606 Some(known_size),
607 self.download_retry_count,
608 )?)),
609 }
610 }
611
612 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
614 let object_store = Self::local();
615 let absolute_path = expand_path(path.to_string_lossy())?;
616 let os_path = Path::from_absolute_path(absolute_path)?;
617 object_store.create(&os_path).await
618 }
619
620 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
622 let object_store = Self::local();
623 let absolute_path = expand_path(path.to_string_lossy())?;
624 let os_path = Path::from_absolute_path(absolute_path)?;
625 object_store.open(&os_path).await
626 }
627
628 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
630 ObjectWriter::new(self, path).await
631 }
632
633 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
635 let mut writer = self.create(path).await?;
636 writer.write_all(content).await?;
637 writer.shutdown().await
638 }
639
640 pub async fn delete(&self, path: &Path) -> Result<()> {
641 self.inner.delete(path).await?;
642 Ok(())
643 }
644
645 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
646 if self.is_local() {
647 return super::local::copy_file(from, to);
649 }
650 Ok(self.inner.copy(from, to).await?)
651 }
652
653 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
655 let path = dir_path.into();
656 let path = Path::parse(&path)?;
657 let output = self.inner.list_with_delimiter(Some(&path)).await?;
658 Ok(output
659 .common_prefixes
660 .iter()
661 .chain(output.objects.iter().map(|o| &o.location))
662 .map(|s| s.filename().unwrap().to_string())
663 .collect())
664 }
665
666 pub fn list(
667 &self,
668 path: Option<Path>,
669 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
670 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
671 }
672
673 pub fn read_dir_all<'a, 'b>(
677 &'a self,
678 dir_path: impl Into<&'b Path> + Send,
679 unmodified_since: Option<DateTime<Utc>>,
680 ) -> BoxStream<'a, Result<ObjectMeta>> {
681 self.inner.read_dir_all(dir_path, unmodified_since)
682 }
683
684 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
686 let path = dir_path.into();
687 let path = Path::parse(&path)?;
688
689 if self.is_local() {
690 return super::local::remove_dir_all(&path);
692 }
693 let sub_entries = self
694 .inner
695 .list(Some(&path))
696 .map(|m| m.map(|meta| meta.location))
697 .boxed();
698 self.inner
699 .delete_stream(sub_entries)
700 .try_collect::<Vec<_>>()
701 .await?;
702 Ok(())
703 }
704
705 pub fn remove_stream<'a>(
706 &'a self,
707 locations: BoxStream<'a, Result<Path>>,
708 ) -> BoxStream<'a, Result<Path>> {
709 self.inner
710 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
711 .err_into::<Error>()
712 .boxed()
713 }
714
715 pub async fn exists(&self, path: &Path) -> Result<bool> {
717 match self.inner.head(path).await {
718 Ok(_) => Ok(true),
719 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
720 Err(e) => Err(e.into()),
721 }
722 }
723
724 pub async fn size(&self, path: &Path) -> Result<u64> {
726 Ok(self.inner.head(path).await?.size)
727 }
728
729 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
731 let reader = self.open(path).await?;
732 Ok(reader.get_all().await?)
733 }
734
735 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
740 let reader = self.open(path).await?;
741 Ok(reader.get_range(range).await?)
742 }
743}
744
745#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
747pub enum LanceConfigKey {
748 DownloadRetryCount,
750}
751
752impl FromStr for LanceConfigKey {
753 type Err = Error;
754
755 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
756 match s.to_ascii_lowercase().as_str() {
757 "download_retry_count" => Ok(Self::DownloadRetryCount),
758 _ => Err(Error::InvalidInput {
759 source: format!("Invalid LanceConfigKey: {}", s).into(),
760 location: location!(),
761 }),
762 }
763 }
764}
765
766#[derive(Clone, Debug, Default)]
767pub struct StorageOptions(pub HashMap<String, String>);
768
769impl StorageOptions {
770 pub fn new(options: HashMap<String, String>) -> Self {
772 let mut options = options;
773 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
774 options.insert("allow_http".into(), value);
775 }
776 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
777 options.insert("allow_http".into(), value);
778 }
779 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
780 options.insert("allow_http".into(), value);
781 }
782 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
783 options.insert("client_max_retries".into(), value);
784 }
785 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
786 options.insert("client_retry_timeout".into(), value);
787 }
788 Self(options)
789 }
790
791 pub fn allow_http(&self) -> bool {
793 self.0.iter().any(|(key, value)| {
794 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
795 })
796 }
797
798 pub fn download_retry_count(&self) -> usize {
800 self.0
801 .iter()
802 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
803 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
804 .unwrap_or(3)
805 }
806
807 pub fn client_max_retries(&self) -> usize {
809 self.0
810 .iter()
811 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
812 .and_then(|(_, value)| value.parse::<usize>().ok())
813 .unwrap_or(10)
814 }
815
816 pub fn client_retry_timeout(&self) -> u64 {
818 self.0
819 .iter()
820 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
821 .and_then(|(_, value)| value.parse::<u64>().ok())
822 .unwrap_or(180)
823 }
824
825 pub fn get(&self, key: &str) -> Option<&String> {
826 self.0.get(key)
827 }
828
829 pub fn expires_at_millis(&self) -> Option<u64> {
831 self.0
832 .get(EXPIRES_AT_MILLIS_KEY)
833 .and_then(|s| s.parse::<u64>().ok())
834 }
835}
836
837impl From<HashMap<String, String>> for StorageOptions {
838 fn from(value: HashMap<String, String>) -> Self {
839 Self::new(value)
840 }
841}
842
843static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
844 std::sync::LazyLock::new(ObjectStoreRegistry::default);
845
846impl ObjectStore {
847 #[allow(clippy::too_many_arguments)]
848 pub fn new(
849 store: Arc<DynObjectStore>,
850 location: Url,
851 block_size: Option<usize>,
852 wrapper: Option<Arc<dyn WrappingObjectStore>>,
853 use_constant_size_upload_parts: bool,
854 list_is_lexically_ordered: bool,
855 io_parallelism: usize,
856 download_retry_count: usize,
857 storage_options: Option<&HashMap<String, String>>,
858 ) -> Self {
859 let scheme = location.scheme();
860 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
861
862 let store = match wrapper {
863 Some(wrapper) => {
864 let store_prefix = DEFAULT_OBJECT_STORE_REGISTRY
865 .calculate_object_store_prefix(location.as_ref(), storage_options)
866 .unwrap();
867 wrapper.wrap(&store_prefix, store)
868 }
869 None => store,
870 };
871
872 let io_tracker = IOTracker::default();
874 let tracked_store = io_tracker.wrap("", store);
875
876 Self {
877 inner: tracked_store,
878 scheme: scheme.into(),
879 block_size,
880 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
881 use_constant_size_upload_parts,
882 list_is_lexically_ordered,
883 io_parallelism,
884 download_retry_count,
885 io_tracker,
886 }
887 }
888}
889
890fn infer_block_size(scheme: &str) -> usize {
891 match scheme {
895 "file" => 4 * 1024,
896 _ => 64 * 1024,
897 }
898}
899
900#[cfg(test)]
901mod tests {
902 use super::*;
903 use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
904 use object_store::memory::InMemory;
905 use rstest::rstest;
906 use std::env::set_current_dir;
907 use std::fs::{create_dir_all, write};
908 use std::path::Path as StdPath;
909 use std::sync::atomic::{AtomicBool, Ordering};
910
911 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
913 let expanded = tilde(path_str).to_string();
914 let path = StdPath::new(&expanded);
915 std::fs::create_dir_all(path.parent().unwrap())?;
916 write(path, contents)
917 }
918
919 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
920 let test_file_store = store.open(path).await.unwrap();
921 let size = test_file_store.size().await.unwrap();
922 let bytes = test_file_store.get_range(0..size).await.unwrap();
923 let contents = String::from_utf8(bytes.to_vec()).unwrap();
924 Ok(contents)
925 }
926
927 #[tokio::test]
928 async fn test_absolute_paths() {
929 let tmp_path = TempStrDir::default();
930 write_to_file(
931 &format!("{tmp_path}/bar/foo.lance/test_file"),
932 "TEST_CONTENT",
933 )
934 .unwrap();
935
936 for uri in &[
938 format!("{tmp_path}/bar/foo.lance"),
939 format!("{tmp_path}/./bar/foo.lance"),
940 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
941 ] {
942 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
943 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
944 .await
945 .unwrap();
946 assert_eq!(contents, "TEST_CONTENT");
947 }
948 }
949
950 #[tokio::test]
951 async fn test_cloud_paths() {
952 let uri = "s3://bucket/foo.lance";
953 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
954 assert_eq!(store.scheme, "s3");
955 assert_eq!(path.to_string(), "foo.lance");
956
957 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
958 .await
959 .unwrap();
960 assert_eq!(store.scheme, "s3");
961 assert_eq!(path.to_string(), "foo.lance");
962
963 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
964 .await
965 .unwrap();
966 assert_eq!(store.scheme, "gs");
967 assert_eq!(path.to_string(), "foo.lance");
968 }
969
970 async fn test_block_size_used_test_helper(
971 uri: &str,
972 storage_options: Option<HashMap<String, String>>,
973 default_expected_block_size: usize,
974 ) {
975 let registry = Arc::new(ObjectStoreRegistry::default());
977 let params = ObjectStoreParams {
978 storage_options: storage_options.clone(),
979 ..ObjectStoreParams::default()
980 };
981 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
982 .await
983 .unwrap();
984 assert_eq!(store.block_size, default_expected_block_size);
985
986 let registry = Arc::new(ObjectStoreRegistry::default());
988 let params = ObjectStoreParams {
989 block_size: Some(1024),
990 storage_options: storage_options.clone(),
991 ..ObjectStoreParams::default()
992 };
993 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
994 .await
995 .unwrap();
996 assert_eq!(store.block_size, 1024);
997 }
998
999 #[rstest]
1000 #[case("s3://bucket/foo.lance", None)]
1001 #[case("gs://bucket/foo.lance", None)]
1002 #[case("az://account/bucket/foo.lance",
1003 Some(HashMap::from([
1004 (String::from("account_name"), String::from("account")),
1005 (String::from("container_name"), String::from("container"))
1006 ])))]
1007 #[tokio::test]
1008 async fn test_block_size_used_cloud(
1009 #[case] uri: &str,
1010 #[case] storage_options: Option<HashMap<String, String>>,
1011 ) {
1012 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1013 }
1014
1015 #[rstest]
1016 #[case("file")]
1017 #[case("file-object-store")]
1018 #[case("memory:///bucket/foo.lance")]
1019 #[tokio::test]
1020 async fn test_block_size_used_file(#[case] prefix: &str) {
1021 let tmp_path = TempStrDir::default();
1022 let path = format!("{tmp_path}/bar/foo.lance/test_file");
1023 write_to_file(&path, "URL").unwrap();
1024 let uri = format!("{prefix}:///{path}");
1025 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1026 }
1027
1028 #[tokio::test]
1029 async fn test_relative_paths() {
1030 let tmp_path = TempStrDir::default();
1031 write_to_file(
1032 &format!("{tmp_path}/bar/foo.lance/test_file"),
1033 "RELATIVE_URL",
1034 )
1035 .unwrap();
1036
1037 set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1038 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1039
1040 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1041 .await
1042 .unwrap();
1043 assert_eq!(contents, "RELATIVE_URL");
1044 }
1045
1046 #[tokio::test]
1047 async fn test_tilde_expansion() {
1048 let uri = "~/foo.lance";
1049 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1050 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1051 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1052 .await
1053 .unwrap();
1054 assert_eq!(contents, "TILDE");
1055 }
1056
1057 #[tokio::test]
1058 async fn test_read_directory() {
1059 let path = TempStdDir::default();
1060 create_dir_all(path.join("foo").join("bar")).unwrap();
1061 create_dir_all(path.join("foo").join("zoo")).unwrap();
1062 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1063 write_to_file(
1064 path.join("foo").join("test_file").to_str().unwrap(),
1065 "read_dir",
1066 )
1067 .unwrap();
1068 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1069
1070 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1071 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1072 }
1073
1074 #[tokio::test]
1075 async fn test_delete_directory() {
1076 let path = TempStdDir::default();
1077 create_dir_all(path.join("foo").join("bar")).unwrap();
1078 create_dir_all(path.join("foo").join("zoo")).unwrap();
1079 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1080 write_to_file(
1081 path.join("foo")
1082 .join("bar")
1083 .join("test_file")
1084 .to_str()
1085 .unwrap(),
1086 "delete",
1087 )
1088 .unwrap();
1089 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
1090 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1091 store.remove_dir_all(base.child("foo")).await.unwrap();
1092
1093 assert!(!path.join("foo").exists());
1094 }
1095
1096 #[derive(Debug)]
1097 struct TestWrapper {
1098 called: AtomicBool,
1099
1100 return_value: Arc<dyn OSObjectStore>,
1101 }
1102
1103 impl WrappingObjectStore for TestWrapper {
1104 fn wrap(
1105 &self,
1106 _store_prefix: &str,
1107 _original: Arc<dyn OSObjectStore>,
1108 ) -> Arc<dyn OSObjectStore> {
1109 self.called.store(true, Ordering::Relaxed);
1110
1111 self.return_value.clone()
1113 }
1114 }
1115
1116 impl TestWrapper {
1117 fn called(&self) -> bool {
1118 self.called.load(Ordering::Relaxed)
1119 }
1120 }
1121
1122 #[tokio::test]
1123 async fn test_wrapping_object_store_option_is_used() {
1124 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1126 let registry = Arc::new(ObjectStoreRegistry::default());
1127
1128 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1129
1130 let wrapper = Arc::new(TestWrapper {
1131 called: AtomicBool::new(false),
1132 return_value: mock_inner_store.clone(),
1133 });
1134
1135 let params = ObjectStoreParams {
1136 object_store_wrapper: Some(wrapper.clone()),
1137 ..ObjectStoreParams::default()
1138 };
1139
1140 assert!(!wrapper.called());
1142
1143 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
1144 .await
1145 .unwrap();
1146
1147 assert!(wrapper.called());
1149
1150 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1153 }
1154
1155 #[tokio::test]
1156 async fn test_local_paths() {
1157 let file_path = TempStdFile::default();
1158 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1159 writer.write_all(b"LOCAL").await.unwrap();
1160 writer.shutdown().await.unwrap();
1161
1162 let reader = ObjectStore::open_local(&file_path).await.unwrap();
1163 let buf = reader.get_range(0..5).await.unwrap();
1164 assert_eq!(buf.as_ref(), b"LOCAL");
1165 }
1166
1167 #[tokio::test]
1168 async fn test_read_one() {
1169 let file_path = TempStdFile::default();
1170 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1171 writer.write_all(b"LOCAL").await.unwrap();
1172 writer.shutdown().await.unwrap();
1173
1174 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1175 let obj_store = ObjectStore::local();
1176 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1177 assert_eq!(buf.as_ref(), b"LOCAL");
1178
1179 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1180 assert_eq!(buf.as_ref(), b"LOCAL");
1181 }
1182
1183 #[tokio::test]
1184 #[cfg(windows)]
1185 async fn test_windows_paths() {
1186 use std::path::Component;
1187 use std::path::Prefix;
1188 use std::path::Prefix::*;
1189
1190 fn get_path_prefix(path: &StdPath) -> Prefix {
1191 match path.components().next().unwrap() {
1192 Component::Prefix(prefix_component) => prefix_component.kind(),
1193 _ => panic!(),
1194 }
1195 }
1196
1197 fn get_drive_letter(prefix: Prefix) -> String {
1198 match prefix {
1199 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1200 _ => panic!(),
1201 }
1202 }
1203
1204 let tmp_path = TempStdFile::default();
1205 let prefix = get_path_prefix(&tmp_path);
1206 let drive_letter = get_drive_letter(prefix);
1207
1208 write_to_file(
1209 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1210 "WINDOWS",
1211 )
1212 .unwrap();
1213
1214 for uri in &[
1215 format!("{drive_letter}:/test_folder/test.lance"),
1216 format!("{drive_letter}:\\test_folder\\test.lance"),
1217 ] {
1218 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1219 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1220 .await
1221 .unwrap();
1222 assert_eq!(contents, "WINDOWS");
1223 }
1224 }
1225
1226 #[tokio::test]
1227 async fn test_cross_filesystem_copy() {
1228 let source_dir = TempStdDir::default();
1230 let dest_dir = TempStdDir::default();
1231
1232 let source_file_name = "test_file.txt";
1234 let source_file = source_dir.join(source_file_name);
1235 std::fs::write(&source_file, b"test content").unwrap();
1236
1237 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1239 .await
1240 .unwrap();
1241
1242 let from_path = base_path.child(source_file_name);
1244
1245 let dest_file = dest_dir.join("copied_file.txt");
1247 let dest_str = dest_file.to_str().unwrap();
1248 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1249
1250 store.copy(&from_path, &to_path).await.unwrap();
1252
1253 assert!(dest_file.exists());
1255 let copied_content = std::fs::read(&dest_file).unwrap();
1256 assert_eq!(copied_content, b"test content");
1257 }
1258
1259 #[tokio::test]
1260 async fn test_copy_creates_parent_directories() {
1261 let source_dir = TempStdDir::default();
1262 let dest_dir = TempStdDir::default();
1263
1264 let source_file_name = "test_file.txt";
1266 let source_file = source_dir.join(source_file_name);
1267 std::fs::write(&source_file, b"test content").unwrap();
1268
1269 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1271 .await
1272 .unwrap();
1273
1274 let from_path = base_path.child(source_file_name);
1276
1277 let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1279 let dest_str = dest_file.to_str().unwrap();
1280 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1281
1282 store.copy(&from_path, &to_path).await.unwrap();
1284
1285 assert!(dest_file.exists());
1287 assert!(dest_file.parent().unwrap().exists());
1288 let copied_content = std::fs::read(&dest_file).unwrap();
1289 assert_eq!(copied_content, b"test content");
1290 }
1291}