1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{FutureExt, Stream};
18use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::Error as ObjectStoreError;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::{LocalWriter, WriteResult};
41use crate::traits::Writer;
42use crate::utils::tracking_store::{IOTracker, IoStats};
43use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
44use lance_core::{Error, Result};
45
46pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
51pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
53
54const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
56const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
59 std::env::var("LANCE_MAX_IOP_SIZE")
60 .map(|val| val.parse().unwrap())
61 .unwrap_or(16 * 1024 * 1024)
62});
63
64pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
65
66pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
67pub use storage_options::{
68 EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
69 StorageOptionsAccessor, StorageOptionsProvider,
70};
71
72#[async_trait]
73pub trait ObjectStoreExt {
74 async fn exists(&self, path: &Path) -> Result<bool>;
76
77 fn read_dir_all<'a, 'b>(
81 &'a self,
82 dir_path: impl Into<&'b Path> + Send,
83 unmodified_since: Option<DateTime<Utc>>,
84 ) -> BoxStream<'a, Result<ObjectMeta>>;
85}
86
87#[async_trait]
88impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
89 fn read_dir_all<'a, 'b>(
90 &'a self,
91 dir_path: impl Into<&'b Path> + Send,
92 unmodified_since: Option<DateTime<Utc>>,
93 ) -> BoxStream<'a, Result<ObjectMeta>> {
94 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
95 if let Some(unmodified_since_val) = unmodified_since {
96 output
97 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
98 .boxed()
99 } else {
100 output.boxed()
101 }
102 }
103
104 async fn exists(&self, path: &Path) -> Result<bool> {
105 match self.head(path).await {
106 Ok(_) => Ok(true),
107 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
108 Err(e) => Err(e.into()),
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct ObjectStore {
116 pub inner: Arc<dyn OSObjectStore>,
118 scheme: String,
119 block_size: usize,
120 max_iop_size: u64,
121 pub use_constant_size_upload_parts: bool,
124 pub list_is_lexically_ordered: bool,
127 io_parallelism: usize,
128 download_retry_count: usize,
130 io_tracker: IOTracker,
132 pub store_prefix: String,
136}
137
138impl DeepSizeOf for ObjectStore {
139 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
140 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
145 }
146}
147
148impl std::fmt::Display for ObjectStore {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 write!(f, "ObjectStore({})", self.scheme)
151 }
152}
153
154pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
155 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
160}
161
162#[derive(Debug, Clone)]
163pub struct ChainedWrappingObjectStore {
164 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
165}
166
167impl ChainedWrappingObjectStore {
168 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
169 Self { wrappers }
170 }
171
172 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
173 self.wrappers.push(wrapper);
174 }
175}
176
177impl WrappingObjectStore for ChainedWrappingObjectStore {
178 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
179 self.wrappers
180 .iter()
181 .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
182 }
183}
184
185#[derive(Debug, Clone)]
188pub struct ObjectStoreParams {
189 pub block_size: Option<usize>,
190 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
191 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
192 pub s3_credentials_refresh_offset: Duration,
195 #[cfg(feature = "aws")]
196 pub aws_credentials: Option<AwsCredentialProvider>,
197 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
198 pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
204 pub use_constant_size_upload_parts: bool,
209 pub list_is_lexically_ordered: Option<bool>,
210}
211
212impl Default for ObjectStoreParams {
213 fn default() -> Self {
214 #[allow(deprecated)]
215 Self {
216 object_store: None,
217 block_size: None,
218 s3_credentials_refresh_offset: Duration::from_secs(60),
219 #[cfg(feature = "aws")]
220 aws_credentials: None,
221 object_store_wrapper: None,
222 storage_options_accessor: None,
223 use_constant_size_upload_parts: false,
224 list_is_lexically_ordered: None,
225 }
226 }
227}
228
229impl ObjectStoreParams {
230 pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
232 self.storage_options_accessor.clone()
233 }
234
235 pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
239 self.storage_options_accessor
240 .as_ref()
241 .and_then(|a| a.initial_storage_options())
242 }
243}
244
245impl std::hash::Hash for ObjectStoreParams {
247 #[allow(deprecated)]
248 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
249 self.block_size.hash(state);
251 if let Some((store, url)) = &self.object_store {
252 Arc::as_ptr(store).hash(state);
253 url.hash(state);
254 }
255 self.s3_credentials_refresh_offset.hash(state);
256 #[cfg(feature = "aws")]
257 if let Some(aws_credentials) = &self.aws_credentials {
258 Arc::as_ptr(aws_credentials).hash(state);
259 }
260 if let Some(wrapper) = &self.object_store_wrapper {
261 Arc::as_ptr(wrapper).hash(state);
262 }
263 if let Some(accessor) = &self.storage_options_accessor {
264 accessor.accessor_id().hash(state);
265 }
266 self.use_constant_size_upload_parts.hash(state);
267 self.list_is_lexically_ordered.hash(state);
268 }
269}
270
271impl Eq for ObjectStoreParams {}
273impl PartialEq for ObjectStoreParams {
274 #[allow(deprecated)]
275 fn eq(&self, other: &Self) -> bool {
276 #[cfg(feature = "aws")]
277 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
278 return false;
279 }
280
281 self.block_size == other.block_size
284 && self
285 .object_store
286 .as_ref()
287 .map(|(store, url)| (Arc::as_ptr(store), url))
288 == other
289 .object_store
290 .as_ref()
291 .map(|(store, url)| (Arc::as_ptr(store), url))
292 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
293 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
294 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
295 && self
296 .storage_options_accessor
297 .as_ref()
298 .map(|a| a.accessor_id())
299 == other
300 .storage_options_accessor
301 .as_ref()
302 .map(|a| a.accessor_id())
303 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
304 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
305 }
306}
307
308pub fn uri_to_url(uri: &str) -> Result<Url> {
329 match Url::parse(uri) {
330 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
331 local_path_to_url(uri)
333 }
334 Ok(url) => Ok(url),
335 Err(_) => local_path_to_url(uri),
336 }
337}
338
339fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
340 let str_path = str_path.as_ref();
341 let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
342
343 let mut expanded_path = path_abs::PathAbs::new(expanded)
344 .unwrap()
345 .as_path()
346 .to_path_buf();
347 if let Some(s) = expanded_path.as_path().to_str()
349 && s.is_empty()
350 {
351 expanded_path = std::env::current_dir()?;
352 }
353
354 Ok(expanded_path)
355}
356
357fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
358 let home_dir = std::env::home_dir()?;
359 if path == "~" {
360 return Some(home_dir);
361 }
362 if let Some(stripped) = path.strip_prefix("~/") {
363 return Some(home_dir.join(stripped));
364 }
365 #[cfg(windows)]
366 if let Some(stripped) = path.strip_prefix("~\\") {
367 return Some(home_dir.join(stripped));
368 }
369
370 None
371}
372
373fn local_path_to_url(str_path: &str) -> Result<Url> {
374 let expanded_path = expand_path(str_path)?;
375
376 Url::from_directory_path(expanded_path).map_err(|_| {
377 Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
378 })
379}
380
381#[cfg(feature = "huggingface")]
382fn parse_hf_repo_id(url: &Url) -> Result<String> {
383 let mut segments: Vec<String> = Vec::new();
385 if let Some(host) = url.host_str() {
386 segments.push(host.to_string());
387 }
388 segments.extend(
389 url.path()
390 .trim_start_matches('/')
391 .split('/')
392 .map(|s| s.to_string()),
393 );
394
395 if segments.len() < 2 {
396 return Err(Error::invalid_input(
397 "Huggingface URL must contain at least owner and repo",
398 ));
399 }
400
401 let repo_type_candidates = ["models", "datasets", "spaces"];
402 let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
403 if segments.len() < 3 {
404 return Err(Error::invalid_input(
405 "Huggingface URL missing owner/repo after repo type",
406 ));
407 }
408 (segments[1].as_str(), segments[2].as_str())
409 } else {
410 (segments[0].as_str(), segments[1].as_str())
411 };
412
413 let repo = repo_with_rev
414 .split_once('@')
415 .map(|(r, _)| r)
416 .unwrap_or(repo_with_rev);
417 Ok(format!("{owner}/{repo}"))
418}
419
420impl ObjectStore {
421 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
429 let registry = Arc::new(ObjectStoreRegistry::default());
430
431 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
432 }
433
434 pub async fn from_uri_and_params(
438 registry: Arc<ObjectStoreRegistry>,
439 uri: &str,
440 params: &ObjectStoreParams,
441 ) -> Result<(Arc<Self>, Path)> {
442 #[allow(deprecated)]
443 if let Some((store, path)) = params.object_store.as_ref() {
444 let mut inner = store.clone();
445 let store_prefix =
446 registry.calculate_object_store_prefix(uri, params.storage_options())?;
447 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
448 inner = wrapper.wrap(&store_prefix, inner);
449 }
450
451 let io_tracker = IOTracker::default();
453 let tracked_store = io_tracker.wrap("", inner);
454
455 let store = Self {
456 inner: tracked_store,
457 scheme: path.scheme().to_string(),
458 block_size: params.block_size.unwrap_or(64 * 1024),
459 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
460 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
461 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
462 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
463 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
464 io_tracker,
465 store_prefix,
466 };
467 let path = Path::parse(path.path())?;
468 return Ok((Arc::new(store), path));
469 }
470 let url = uri_to_url(uri)?;
471
472 let store = registry.get_store(url.clone(), params).await?;
473 let provider = registry.get_provider(url.scheme()).expect_ok()?;
475 let path = provider.extract_path(&url)?;
476
477 Ok((store, path))
478 }
479
480 pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
494 let url = uri_to_url(uri)?;
495 let provider = registry
496 .get_provider(url.scheme())
497 .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
498 provider.extract_path(&url)
499 }
500
501 #[deprecated(note = "Use `from_uri` instead")]
502 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
503 Self::from_uri_and_params(
504 Arc::new(ObjectStoreRegistry::default()),
505 str_path,
506 &Default::default(),
507 )
508 .now_or_never()
509 .unwrap()
510 }
511
512 pub fn local() -> Self {
514 let provider = FileStoreProvider;
515 provider
516 .new_store(Url::parse("file:///").unwrap(), &Default::default())
517 .now_or_never()
518 .unwrap()
519 .unwrap()
520 }
521
522 pub fn memory() -> Self {
524 let provider = MemoryStoreProvider;
525 provider
526 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
527 .now_or_never()
528 .unwrap()
529 .unwrap()
530 }
531
532 pub fn is_local(&self) -> bool {
534 self.scheme == "file"
535 }
536
537 pub fn is_cloud(&self) -> bool {
538 self.scheme != "file" && self.scheme != "memory"
539 }
540
541 pub fn scheme(&self) -> &str {
542 &self.scheme
543 }
544
545 pub fn block_size(&self) -> usize {
546 self.block_size
547 }
548
549 pub fn max_iop_size(&self) -> u64 {
550 self.max_iop_size
551 }
552
553 pub fn io_parallelism(&self) -> usize {
554 std::env::var("LANCE_IO_THREADS")
555 .map(|val| val.parse::<usize>().unwrap())
556 .unwrap_or(self.io_parallelism)
557 }
558
559 pub fn io_tracker(&self) -> &IOTracker {
564 &self.io_tracker
565 }
566
567 pub fn io_stats_snapshot(&self) -> IoStats {
572 self.io_tracker.stats()
573 }
574
575 pub fn io_stats_incremental(&self) -> IoStats {
581 self.io_tracker.incremental_stats()
582 }
583
584 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
589 match self.scheme.as_str() {
590 "file" => {
591 LocalObjectReader::open_with_tracker(
592 path,
593 self.block_size,
594 None,
595 Arc::new(self.io_tracker.clone()),
596 )
597 .await
598 }
599 _ => Ok(Box::new(CloudObjectReader::new(
600 self.inner.clone(),
601 path.clone(),
602 self.block_size,
603 None,
604 self.download_retry_count,
605 )?)),
606 }
607 }
608
609 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
615 if known_size <= self.block_size {
618 return Ok(Box::new(SmallReader::new(
619 self.inner.clone(),
620 path.clone(),
621 self.download_retry_count,
622 known_size,
623 )));
624 }
625
626 match self.scheme.as_str() {
627 "file" => {
628 LocalObjectReader::open_with_tracker(
629 path,
630 self.block_size,
631 Some(known_size),
632 Arc::new(self.io_tracker.clone()),
633 )
634 .await
635 }
636 _ => Ok(Box::new(CloudObjectReader::new(
637 self.inner.clone(),
638 path.clone(),
639 self.block_size,
640 Some(known_size),
641 self.download_retry_count,
642 )?)),
643 }
644 }
645
646 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
648 let object_store = Self::local();
649 let absolute_path = expand_path(path.to_string_lossy())?;
650 let os_path = Path::from_absolute_path(absolute_path)?;
651 ObjectWriter::new(&object_store, &os_path).await
652 }
653
654 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
656 let object_store = Self::local();
657 let absolute_path = expand_path(path.to_string_lossy())?;
658 let os_path = Path::from_absolute_path(absolute_path)?;
659 object_store.open(&os_path).await
660 }
661
662 pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
664 match self.scheme.as_str() {
665 "file" => {
666 let local_path = super::local::to_local_path(path);
667 let local_path = std::path::PathBuf::from(&local_path);
668 if let Some(parent) = local_path.parent() {
669 tokio::fs::create_dir_all(parent).await?;
670 }
671 let parent = local_path
672 .parent()
673 .expect("file path must have parent")
674 .to_owned();
675 let named_temp =
676 tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
677 .await
678 .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
679 let (std_file, temp_path) = named_temp.into_parts();
680 let file = tokio::fs::File::from_std(std_file);
681 Ok(Box::new(LocalWriter::new(
682 file,
683 path.clone(),
684 temp_path,
685 Arc::new(self.io_tracker.clone()),
686 )))
687 }
688 _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
689 }
690 }
691
692 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
694 let mut writer = self.create(path).await?;
695 writer.write_all(content).await?;
696 Writer::shutdown(writer.as_mut()).await
697 }
698
699 pub async fn delete(&self, path: &Path) -> Result<()> {
700 self.inner.delete(path).await?;
701 Ok(())
702 }
703
704 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
705 if self.is_local() {
706 return super::local::copy_file(from, to);
708 }
709 Ok(self.inner.copy(from, to).await?)
710 }
711
712 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
714 let path = dir_path.into();
715 let path = Path::parse(&path)?;
716 let output = self.inner.list_with_delimiter(Some(&path)).await?;
717 Ok(output
718 .common_prefixes
719 .iter()
720 .chain(output.objects.iter().map(|o| &o.location))
721 .map(|s| s.filename().unwrap().to_string())
722 .collect())
723 }
724
725 pub fn list(
726 &self,
727 path: Option<Path>,
728 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
729 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
730 }
731
732 pub fn read_dir_all<'a, 'b>(
736 &'a self,
737 dir_path: impl Into<&'b Path> + Send,
738 unmodified_since: Option<DateTime<Utc>>,
739 ) -> BoxStream<'a, Result<ObjectMeta>> {
740 self.inner.read_dir_all(dir_path, unmodified_since)
741 }
742
743 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
745 let path = dir_path.into();
746 let path = Path::parse(&path)?;
747
748 if self.is_local() {
749 return super::local::remove_dir_all(&path);
751 }
752 let sub_entries = self
753 .inner
754 .list(Some(&path))
755 .map(|m| m.map(|meta| meta.location))
756 .boxed();
757 self.inner
758 .delete_stream(sub_entries)
759 .try_collect::<Vec<_>>()
760 .await?;
761 if self.scheme == "file-object-store" {
762 return super::local::remove_dir_all(&path);
765 }
766 Ok(())
767 }
768
769 pub fn remove_stream<'a>(
770 &'a self,
771 locations: BoxStream<'a, Result<Path>>,
772 ) -> BoxStream<'a, Result<Path>> {
773 self.inner
774 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
775 .err_into::<Error>()
776 .boxed()
777 }
778
779 pub async fn exists(&self, path: &Path) -> Result<bool> {
781 match self.inner.head(path).await {
782 Ok(_) => Ok(true),
783 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
784 Err(e) => Err(e.into()),
785 }
786 }
787
788 pub async fn size(&self, path: &Path) -> Result<u64> {
790 Ok(self.inner.head(path).await?.size)
791 }
792
793 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
795 let reader = self.open(path).await?;
796 Ok(reader.get_all().await?)
797 }
798
799 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
804 let reader = self.open(path).await?;
805 Ok(reader.get_range(range).await?)
806 }
807}
808
809#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
811pub enum LanceConfigKey {
812 DownloadRetryCount,
814}
815
816impl FromStr for LanceConfigKey {
817 type Err = Error;
818
819 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
820 match s.to_ascii_lowercase().as_str() {
821 "download_retry_count" => Ok(Self::DownloadRetryCount),
822 _ => Err(Error::invalid_input_source(
823 format!("Invalid LanceConfigKey: {}", s).into(),
824 )),
825 }
826 }
827}
828
829#[derive(Clone, Debug, Default)]
830pub struct StorageOptions(pub HashMap<String, String>);
831
832impl StorageOptions {
833 pub fn new(options: HashMap<String, String>) -> Self {
835 let mut options = options;
836 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
837 options.insert("allow_http".into(), value);
838 }
839 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
840 options.insert("allow_http".into(), value);
841 }
842 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
843 options.insert("allow_http".into(), value);
844 }
845 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
846 options.insert("client_max_retries".into(), value);
847 }
848 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
849 options.insert("client_retry_timeout".into(), value);
850 }
851 Self(options)
852 }
853
854 pub fn allow_http(&self) -> bool {
856 self.0.iter().any(|(key, value)| {
857 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
858 })
859 }
860
861 pub fn download_retry_count(&self) -> usize {
863 self.0
864 .iter()
865 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
866 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
867 .unwrap_or(3)
868 }
869
870 pub fn client_max_retries(&self) -> usize {
872 self.0
873 .iter()
874 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
875 .and_then(|(_, value)| value.parse::<usize>().ok())
876 .unwrap_or(10)
877 }
878
879 pub fn client_retry_timeout(&self) -> u64 {
881 self.0
882 .iter()
883 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
884 .and_then(|(_, value)| value.parse::<u64>().ok())
885 .unwrap_or(180)
886 }
887
888 pub fn get(&self, key: &str) -> Option<&String> {
889 self.0.get(key)
890 }
891
892 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
900 pub fn client_options(&self) -> Result<ClientOptions> {
901 let mut headers = HeaderMap::new();
902 for (key, value) in &self.0 {
903 if let Some(header_name) = key.strip_prefix("headers.") {
904 let name = header_name
905 .parse::<http::header::HeaderName>()
906 .map_err(|e| {
907 Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
908 })?;
909 let val = HeaderValue::from_str(value).map_err(|e| {
910 Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
911 })?;
912 headers.insert(name, val);
913 }
914 }
915 let mut client_options = ClientOptions::default();
916 if !headers.is_empty() {
917 client_options = client_options.with_default_headers(headers);
918 }
919 Ok(client_options)
920 }
921
922 pub fn expires_at_millis(&self) -> Option<u64> {
924 self.0
925 .get(EXPIRES_AT_MILLIS_KEY)
926 .and_then(|s| s.parse::<u64>().ok())
927 }
928}
929
930impl From<HashMap<String, String>> for StorageOptions {
931 fn from(value: HashMap<String, String>) -> Self {
932 Self::new(value)
933 }
934}
935
936static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
937 std::sync::LazyLock::new(ObjectStoreRegistry::default);
938
939impl ObjectStore {
940 #[allow(clippy::too_many_arguments)]
941 pub fn new(
942 store: Arc<DynObjectStore>,
943 location: Url,
944 block_size: Option<usize>,
945 wrapper: Option<Arc<dyn WrappingObjectStore>>,
946 use_constant_size_upload_parts: bool,
947 list_is_lexically_ordered: bool,
948 io_parallelism: usize,
949 download_retry_count: usize,
950 storage_options: Option<&HashMap<String, String>>,
951 ) -> Self {
952 let scheme = location.scheme();
953 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
954 let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
955 Some(provider) => provider
956 .calculate_object_store_prefix(&location, storage_options)
957 .unwrap(),
958 None => {
959 let store_prefix = format!("{}${}", location.scheme(), location.authority());
960 log::warn!(
961 "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
962 store_prefix
963 );
964 store_prefix
965 }
966 };
967 let store = match wrapper {
968 Some(wrapper) => wrapper.wrap(&store_prefix, store),
969 None => store,
970 };
971
972 let io_tracker = IOTracker::default();
974 let tracked_store = io_tracker.wrap("", store);
975
976 Self {
977 inner: tracked_store,
978 scheme: scheme.into(),
979 block_size,
980 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
981 use_constant_size_upload_parts,
982 list_is_lexically_ordered,
983 io_parallelism,
984 download_retry_count,
985 io_tracker,
986 store_prefix,
987 }
988 }
989}
990
991fn infer_block_size(scheme: &str) -> usize {
992 match scheme {
996 "file" => 4 * 1024,
997 _ => 64 * 1024,
998 }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004 use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1005 use object_store::memory::InMemory;
1006 use rstest::rstest;
1007 use std::env::set_current_dir;
1008 use std::fs::{create_dir_all, write};
1009 use std::path::Path as StdPath;
1010 use std::sync::atomic::{AtomicBool, Ordering};
1011
1012 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1014 let path = expand_path(path_str).map_err(std::io::Error::other)?;
1015 std::fs::create_dir_all(path.parent().unwrap())?;
1016 write(path, contents)
1017 }
1018
1019 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1020 let test_file_store = store.open(path).await.unwrap();
1021 let size = test_file_store.size().await.unwrap();
1022 let bytes = test_file_store.get_range(0..size).await.unwrap();
1023 let contents = String::from_utf8(bytes.to_vec()).unwrap();
1024 Ok(contents)
1025 }
1026
1027 #[tokio::test]
1028 async fn test_absolute_paths() {
1029 let tmp_path = TempStrDir::default();
1030 write_to_file(
1031 &format!("{tmp_path}/bar/foo.lance/test_file"),
1032 "TEST_CONTENT",
1033 )
1034 .unwrap();
1035
1036 for uri in &[
1038 format!("{tmp_path}/bar/foo.lance"),
1039 format!("{tmp_path}/./bar/foo.lance"),
1040 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1041 ] {
1042 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1043 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1044 .await
1045 .unwrap();
1046 assert_eq!(contents, "TEST_CONTENT");
1047 }
1048 }
1049
1050 #[tokio::test]
1051 async fn test_cloud_paths() {
1052 let uri = "s3://bucket/foo.lance";
1053 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1054 assert_eq!(store.scheme, "s3");
1055 assert_eq!(path.to_string(), "foo.lance");
1056
1057 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1058 .await
1059 .unwrap();
1060 assert_eq!(store.scheme, "s3");
1061 assert_eq!(path.to_string(), "foo.lance");
1062
1063 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1064 .await
1065 .unwrap();
1066 assert_eq!(store.scheme, "gs");
1067 assert_eq!(path.to_string(), "foo.lance");
1068 }
1069
1070 async fn test_block_size_used_test_helper(
1071 uri: &str,
1072 storage_options: Option<HashMap<String, String>>,
1073 default_expected_block_size: usize,
1074 ) {
1075 let registry = Arc::new(ObjectStoreRegistry::default());
1077 let accessor = storage_options
1078 .clone()
1079 .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1080 let params = ObjectStoreParams {
1081 storage_options_accessor: accessor.clone(),
1082 ..ObjectStoreParams::default()
1083 };
1084 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1085 .await
1086 .unwrap();
1087 assert_eq!(store.block_size, default_expected_block_size);
1088
1089 let registry = Arc::new(ObjectStoreRegistry::default());
1091 let params = ObjectStoreParams {
1092 block_size: Some(1024),
1093 storage_options_accessor: accessor,
1094 ..ObjectStoreParams::default()
1095 };
1096 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1097 .await
1098 .unwrap();
1099 assert_eq!(store.block_size, 1024);
1100 }
1101
1102 #[rstest]
1103 #[case("s3://bucket/foo.lance", None)]
1104 #[case("gs://bucket/foo.lance", None)]
1105 #[case("az://account/bucket/foo.lance",
1106 Some(HashMap::from([
1107 (String::from("account_name"), String::from("account")),
1108 (String::from("container_name"), String::from("container"))
1109 ])))]
1110 #[tokio::test]
1111 async fn test_block_size_used_cloud(
1112 #[case] uri: &str,
1113 #[case] storage_options: Option<HashMap<String, String>>,
1114 ) {
1115 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1116 }
1117
1118 #[rstest]
1119 #[case("file")]
1120 #[case("file-object-store")]
1121 #[case("memory:///bucket/foo.lance")]
1122 #[tokio::test]
1123 async fn test_block_size_used_file(#[case] prefix: &str) {
1124 let tmp_path = TempStrDir::default();
1125 let path = format!("{tmp_path}/bar/foo.lance/test_file");
1126 write_to_file(&path, "URL").unwrap();
1127 let uri = format!("{prefix}:///{path}");
1128 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1129 }
1130
1131 #[tokio::test]
1132 async fn test_relative_paths() {
1133 let tmp_path = TempStrDir::default();
1134 write_to_file(
1135 &format!("{tmp_path}/bar/foo.lance/test_file"),
1136 "RELATIVE_URL",
1137 )
1138 .unwrap();
1139
1140 set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1141 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1142
1143 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1144 .await
1145 .unwrap();
1146 assert_eq!(contents, "RELATIVE_URL");
1147 }
1148
1149 #[tokio::test]
1150 async fn test_tilde_expansion() {
1151 let uri = "~/foo.lance";
1152 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1153 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1154 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1155 .await
1156 .unwrap();
1157 assert_eq!(contents, "TILDE");
1158 }
1159
1160 #[tokio::test]
1161 async fn test_read_directory() {
1162 let path = TempStdDir::default();
1163 create_dir_all(path.join("foo").join("bar")).unwrap();
1164 create_dir_all(path.join("foo").join("zoo")).unwrap();
1165 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1166 write_to_file(
1167 path.join("foo").join("test_file").to_str().unwrap(),
1168 "read_dir",
1169 )
1170 .unwrap();
1171 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1172
1173 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1174 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1175 }
1176
1177 #[tokio::test]
1178 async fn test_delete_directory_local_store() {
1179 test_delete_directory("").await;
1180 }
1181
1182 #[tokio::test]
1183 async fn test_delete_directory_file_object_store() {
1184 test_delete_directory("file-object-store").await;
1185 }
1186
1187 async fn test_delete_directory(scheme: &str) {
1188 let path = TempStdDir::default();
1189 create_dir_all(path.join("foo").join("bar")).unwrap();
1190 create_dir_all(path.join("foo").join("zoo")).unwrap();
1191 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1192 write_to_file(
1193 path.join("foo")
1194 .join("bar")
1195 .join("test_file")
1196 .to_str()
1197 .unwrap(),
1198 "delete",
1199 )
1200 .unwrap();
1201 let file_url = Url::from_directory_path(&path).unwrap();
1202 let url = if scheme.is_empty() {
1203 file_url
1204 } else {
1205 let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1206 url.set_path(file_url.path());
1208 url
1209 };
1210 let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1211 store.remove_dir_all(base.child("foo")).await.unwrap();
1212
1213 assert!(!path.join("foo").exists());
1214 }
1215
1216 #[derive(Debug)]
1217 struct TestWrapper {
1218 called: AtomicBool,
1219
1220 return_value: Arc<dyn OSObjectStore>,
1221 }
1222
1223 impl WrappingObjectStore for TestWrapper {
1224 fn wrap(
1225 &self,
1226 _store_prefix: &str,
1227 _original: Arc<dyn OSObjectStore>,
1228 ) -> Arc<dyn OSObjectStore> {
1229 self.called.store(true, Ordering::Relaxed);
1230
1231 self.return_value.clone()
1233 }
1234 }
1235
1236 impl TestWrapper {
1237 fn called(&self) -> bool {
1238 self.called.load(Ordering::Relaxed)
1239 }
1240 }
1241
1242 #[tokio::test]
1243 async fn test_wrapping_object_store_option_is_used() {
1244 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1246 let registry = Arc::new(ObjectStoreRegistry::default());
1247
1248 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1249
1250 let wrapper = Arc::new(TestWrapper {
1251 called: AtomicBool::new(false),
1252 return_value: mock_inner_store.clone(),
1253 });
1254
1255 let params = ObjectStoreParams {
1256 object_store_wrapper: Some(wrapper.clone()),
1257 ..ObjectStoreParams::default()
1258 };
1259
1260 assert!(!wrapper.called());
1262
1263 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
1264 .await
1265 .unwrap();
1266
1267 assert!(wrapper.called());
1269
1270 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1273 }
1274
1275 #[tokio::test]
1276 async fn test_local_paths() {
1277 let file_path = TempStdFile::default();
1278 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1279 writer.write_all(b"LOCAL").await.unwrap();
1280 Writer::shutdown(&mut writer).await.unwrap();
1281
1282 let reader = ObjectStore::open_local(&file_path).await.unwrap();
1283 let buf = reader.get_range(0..5).await.unwrap();
1284 assert_eq!(buf.as_ref(), b"LOCAL");
1285 }
1286
1287 #[tokio::test]
1288 async fn test_read_one() {
1289 let file_path = TempStdFile::default();
1290 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1291 writer.write_all(b"LOCAL").await.unwrap();
1292 Writer::shutdown(&mut writer).await.unwrap();
1293
1294 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1295 let obj_store = ObjectStore::local();
1296 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1297 assert_eq!(buf.as_ref(), b"LOCAL");
1298
1299 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1300 assert_eq!(buf.as_ref(), b"LOCAL");
1301 }
1302
1303 #[tokio::test]
1304 #[cfg(windows)]
1305 async fn test_windows_paths() {
1306 use std::path::Component;
1307 use std::path::Prefix;
1308 use std::path::Prefix::*;
1309
1310 fn get_path_prefix(path: &StdPath) -> Prefix {
1311 match path.components().next().unwrap() {
1312 Component::Prefix(prefix_component) => prefix_component.kind(),
1313 _ => panic!(),
1314 }
1315 }
1316
1317 fn get_drive_letter(prefix: Prefix) -> String {
1318 match prefix {
1319 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1320 _ => panic!(),
1321 }
1322 }
1323
1324 let tmp_path = TempStdFile::default();
1325 let prefix = get_path_prefix(&tmp_path);
1326 let drive_letter = get_drive_letter(prefix);
1327
1328 write_to_file(
1329 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1330 "WINDOWS",
1331 )
1332 .unwrap();
1333
1334 for uri in &[
1335 format!("{drive_letter}:/test_folder/test.lance"),
1336 format!("{drive_letter}:\\test_folder\\test.lance"),
1337 ] {
1338 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1339 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1340 .await
1341 .unwrap();
1342 assert_eq!(contents, "WINDOWS");
1343 }
1344 }
1345
1346 #[tokio::test]
1347 async fn test_cross_filesystem_copy() {
1348 let source_dir = TempStdDir::default();
1350 let dest_dir = TempStdDir::default();
1351
1352 let source_file_name = "test_file.txt";
1354 let source_file = source_dir.join(source_file_name);
1355 std::fs::write(&source_file, b"test content").unwrap();
1356
1357 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1359 .await
1360 .unwrap();
1361
1362 let from_path = base_path.child(source_file_name);
1364
1365 let dest_file = dest_dir.join("copied_file.txt");
1367 let dest_str = dest_file.to_str().unwrap();
1368 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1369
1370 store.copy(&from_path, &to_path).await.unwrap();
1372
1373 assert!(dest_file.exists());
1375 let copied_content = std::fs::read(&dest_file).unwrap();
1376 assert_eq!(copied_content, b"test content");
1377 }
1378
1379 #[tokio::test]
1380 async fn test_copy_creates_parent_directories() {
1381 let source_dir = TempStdDir::default();
1382 let dest_dir = TempStdDir::default();
1383
1384 let source_file_name = "test_file.txt";
1386 let source_file = source_dir.join(source_file_name);
1387 std::fs::write(&source_file, b"test content").unwrap();
1388
1389 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1391 .await
1392 .unwrap();
1393
1394 let from_path = base_path.child(source_file_name);
1396
1397 let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1399 let dest_str = dest_file.to_str().unwrap();
1400 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1401
1402 store.copy(&from_path, &to_path).await.unwrap();
1404
1405 assert!(dest_file.exists());
1407 assert!(dest_file.parent().unwrap().exists());
1408 let copied_content = std::fs::read(&dest_file).unwrap();
1409 assert_eq!(copied_content, b"test content");
1410 }
1411
1412 #[test]
1413 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1414 fn test_client_options_extracts_headers() {
1415 let opts = StorageOptions(HashMap::from([
1416 ("headers.x-custom-foo".to_string(), "bar".to_string()),
1417 ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1418 ("region".to_string(), "us-west-2".to_string()),
1419 ]));
1420 let client_options = opts.client_options().unwrap();
1421
1422 let opts_no_headers = StorageOptions(HashMap::from([(
1425 "region".to_string(),
1426 "us-west-2".to_string(),
1427 )]));
1428 opts_no_headers.client_options().unwrap();
1429
1430 #[cfg(feature = "gcp")]
1434 {
1435 use object_store::gcp::GoogleCloudStorageBuilder;
1436 let _builder = GoogleCloudStorageBuilder::new()
1437 .with_client_options(client_options)
1438 .with_url("gs://test-bucket");
1439 }
1440 }
1441
1442 #[test]
1443 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1444 fn test_client_options_rejects_invalid_header_name() {
1445 let opts = StorageOptions(HashMap::from([(
1446 "headers.bad header".to_string(),
1447 "value".to_string(),
1448 )]));
1449 let err = opts.client_options().unwrap_err();
1450 assert!(err.to_string().contains("invalid header name"));
1451 }
1452
1453 #[test]
1454 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1455 fn test_client_options_rejects_invalid_header_value() {
1456 let opts = StorageOptions(HashMap::from([(
1457 "headers.x-good-name".to_string(),
1458 "bad\x01value".to_string(),
1459 )]));
1460 let err = opts.client_options().unwrap_err();
1461 assert!(err.to_string().contains("invalid header value"));
1462 }
1463
1464 #[test]
1465 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1466 fn test_client_options_empty_when_no_header_keys() {
1467 let opts = StorageOptions(HashMap::from([
1468 ("region".to_string(), "us-east-1".to_string()),
1469 ("access_key_id".to_string(), "AKID".to_string()),
1470 ]));
1471 opts.client_options().unwrap();
1472 }
1473}