1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{FutureExt, Stream};
18use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::Error as ObjectStoreError;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::{LocalWriter, WriteResult};
41use crate::traits::Writer;
42use crate::utils::tracking_store::{IOTracker, IoStats};
43use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
44use lance_core::{Error, Result};
45
46pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
51pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
53
54const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
56const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
59 std::env::var("LANCE_MAX_IOP_SIZE")
60 .map(|val| val.parse().unwrap())
61 .unwrap_or(16 * 1024 * 1024)
62});
63
64pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
65
66pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
67pub use storage_options::{
68 EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
69 StorageOptionsAccessor, StorageOptionsProvider,
70};
71
72#[async_trait]
73pub trait ObjectStoreExt {
74 async fn exists(&self, path: &Path) -> Result<bool>;
76
77 fn read_dir_all<'a, 'b>(
81 &'a self,
82 dir_path: impl Into<&'b Path> + Send,
83 unmodified_since: Option<DateTime<Utc>>,
84 ) -> BoxStream<'a, Result<ObjectMeta>>;
85}
86
87#[async_trait]
88impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
89 fn read_dir_all<'a, 'b>(
90 &'a self,
91 dir_path: impl Into<&'b Path> + Send,
92 unmodified_since: Option<DateTime<Utc>>,
93 ) -> BoxStream<'a, Result<ObjectMeta>> {
94 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
95 if let Some(unmodified_since_val) = unmodified_since {
96 output
97 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
98 .boxed()
99 } else {
100 output.boxed()
101 }
102 }
103
104 async fn exists(&self, path: &Path) -> Result<bool> {
105 match self.head(path).await {
106 Ok(_) => Ok(true),
107 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
108 Err(e) => Err(e.into()),
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct ObjectStore {
116 pub inner: Arc<dyn OSObjectStore>,
118 scheme: String,
119 block_size: usize,
120 max_iop_size: u64,
121 pub use_constant_size_upload_parts: bool,
124 pub list_is_lexically_ordered: bool,
127 io_parallelism: usize,
128 download_retry_count: usize,
130 io_tracker: IOTracker,
132 pub store_prefix: String,
136}
137
138impl DeepSizeOf for ObjectStore {
139 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
140 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
145 }
146}
147
148impl std::fmt::Display for ObjectStore {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 write!(f, "ObjectStore({})", self.scheme)
151 }
152}
153
154pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
155 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
160}
161
162#[derive(Debug, Clone)]
163pub struct ChainedWrappingObjectStore {
164 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
165}
166
167impl ChainedWrappingObjectStore {
168 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
169 Self { wrappers }
170 }
171
172 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
173 self.wrappers.push(wrapper);
174 }
175}
176
177impl WrappingObjectStore for ChainedWrappingObjectStore {
178 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
179 self.wrappers
180 .iter()
181 .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
182 }
183}
184
185#[derive(Debug, Clone)]
188pub struct ObjectStoreParams {
189 pub block_size: Option<usize>,
190 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
191 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
192 pub s3_credentials_refresh_offset: Duration,
195 #[cfg(feature = "aws")]
196 pub aws_credentials: Option<AwsCredentialProvider>,
197 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
198 pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
204 pub use_constant_size_upload_parts: bool,
209 pub list_is_lexically_ordered: Option<bool>,
210}
211
212impl Default for ObjectStoreParams {
213 fn default() -> Self {
214 #[allow(deprecated)]
215 Self {
216 object_store: None,
217 block_size: None,
218 s3_credentials_refresh_offset: Duration::from_secs(60),
219 #[cfg(feature = "aws")]
220 aws_credentials: None,
221 object_store_wrapper: None,
222 storage_options_accessor: None,
223 use_constant_size_upload_parts: false,
224 list_is_lexically_ordered: None,
225 }
226 }
227}
228
229impl ObjectStoreParams {
230 pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
232 self.storage_options_accessor.clone()
233 }
234
235 pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
239 self.storage_options_accessor
240 .as_ref()
241 .and_then(|a| a.initial_storage_options())
242 }
243}
244
245impl std::hash::Hash for ObjectStoreParams {
247 #[allow(deprecated)]
248 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
249 self.block_size.hash(state);
251 if let Some((store, url)) = &self.object_store {
252 Arc::as_ptr(store).hash(state);
253 url.hash(state);
254 }
255 self.s3_credentials_refresh_offset.hash(state);
256 #[cfg(feature = "aws")]
257 if let Some(aws_credentials) = &self.aws_credentials {
258 Arc::as_ptr(aws_credentials).hash(state);
259 }
260 if let Some(wrapper) = &self.object_store_wrapper {
261 Arc::as_ptr(wrapper).hash(state);
262 }
263 if let Some(accessor) = &self.storage_options_accessor {
264 accessor.accessor_id().hash(state);
265 }
266 self.use_constant_size_upload_parts.hash(state);
267 self.list_is_lexically_ordered.hash(state);
268 }
269}
270
271impl Eq for ObjectStoreParams {}
273impl PartialEq for ObjectStoreParams {
274 #[allow(deprecated)]
275 fn eq(&self, other: &Self) -> bool {
276 #[cfg(feature = "aws")]
277 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
278 return false;
279 }
280
281 self.block_size == other.block_size
284 && self
285 .object_store
286 .as_ref()
287 .map(|(store, url)| (Arc::as_ptr(store), url))
288 == other
289 .object_store
290 .as_ref()
291 .map(|(store, url)| (Arc::as_ptr(store), url))
292 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
293 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
294 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
295 && self
296 .storage_options_accessor
297 .as_ref()
298 .map(|a| a.accessor_id())
299 == other
300 .storage_options_accessor
301 .as_ref()
302 .map(|a| a.accessor_id())
303 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
304 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
305 }
306}
307
308pub fn uri_to_url(uri: &str) -> Result<Url> {
329 match Url::parse(uri) {
330 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
331 local_path_to_url(uri)
333 }
334 Ok(url) => Ok(url),
335 Err(_) => local_path_to_url(uri),
336 }
337}
338
339fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
340 let str_path = str_path.as_ref();
341 let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
342
343 let mut expanded_path = path_abs::PathAbs::new(expanded)
344 .unwrap()
345 .as_path()
346 .to_path_buf();
347 if let Some(s) = expanded_path.as_path().to_str()
349 && s.is_empty()
350 {
351 expanded_path = std::env::current_dir()?;
352 }
353
354 Ok(expanded_path)
355}
356
357fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
358 let home_dir = std::env::home_dir()?;
359 if path == "~" {
360 return Some(home_dir);
361 }
362 if let Some(stripped) = path.strip_prefix("~/") {
363 return Some(home_dir.join(stripped));
364 }
365 #[cfg(windows)]
366 if let Some(stripped) = path.strip_prefix("~\\") {
367 return Some(home_dir.join(stripped));
368 }
369
370 None
371}
372
373fn local_path_to_url(str_path: &str) -> Result<Url> {
374 let expanded_path = expand_path(str_path)?;
375
376 Url::from_directory_path(expanded_path).map_err(|_| {
377 Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
378 })
379}
380
381#[cfg(feature = "huggingface")]
382fn parse_hf_repo_id(url: &Url) -> Result<String> {
383 let mut segments: Vec<String> = Vec::new();
385 if let Some(host) = url.host_str() {
386 segments.push(host.to_string());
387 }
388 segments.extend(
389 url.path()
390 .trim_start_matches('/')
391 .split('/')
392 .map(|s| s.to_string()),
393 );
394
395 if segments.len() < 2 {
396 return Err(Error::invalid_input(
397 "Huggingface URL must contain at least owner and repo",
398 ));
399 }
400
401 let repo_type_candidates = ["models", "datasets", "spaces"];
402 let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
403 if segments.len() < 3 {
404 return Err(Error::invalid_input(
405 "Huggingface URL missing owner/repo after repo type",
406 ));
407 }
408 (segments[1].as_str(), segments[2].as_str())
409 } else {
410 (segments[0].as_str(), segments[1].as_str())
411 };
412
413 let repo = repo_with_rev
414 .split_once('@')
415 .map(|(r, _)| r)
416 .unwrap_or(repo_with_rev);
417 Ok(format!("{owner}/{repo}"))
418}
419
420impl ObjectStore {
421 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
429 let registry = Arc::new(ObjectStoreRegistry::default());
430
431 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
432 }
433
434 pub async fn from_uri_and_params(
438 registry: Arc<ObjectStoreRegistry>,
439 uri: &str,
440 params: &ObjectStoreParams,
441 ) -> Result<(Arc<Self>, Path)> {
442 #[allow(deprecated)]
443 if let Some((store, path)) = params.object_store.as_ref() {
444 let mut inner = store.clone();
445 let store_prefix =
446 registry.calculate_object_store_prefix(uri, params.storage_options())?;
447 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
448 inner = wrapper.wrap(&store_prefix, inner);
449 }
450
451 let io_tracker = IOTracker::default();
453 let tracked_store = io_tracker.wrap("", inner);
454
455 let store = Self {
456 inner: tracked_store,
457 scheme: path.scheme().to_string(),
458 block_size: params.block_size.unwrap_or(64 * 1024),
459 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
460 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
461 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
462 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
463 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
464 io_tracker,
465 store_prefix,
466 };
467 let path = Path::parse(path.path())?;
468 return Ok((Arc::new(store), path));
469 }
470 let url = uri_to_url(uri)?;
471
472 let store = registry.get_store(url.clone(), params).await?;
473 let provider = registry.get_provider(url.scheme()).expect_ok()?;
475 let path = provider.extract_path(&url)?;
476
477 Ok((store, path))
478 }
479
480 pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
494 let url = uri_to_url(uri)?;
495 let provider = registry
496 .get_provider(url.scheme())
497 .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
498 provider.extract_path(&url)
499 }
500
501 #[deprecated(note = "Use `from_uri` instead")]
502 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
503 Self::from_uri_and_params(
504 Arc::new(ObjectStoreRegistry::default()),
505 str_path,
506 &Default::default(),
507 )
508 .now_or_never()
509 .unwrap()
510 }
511
512 pub fn local() -> Self {
514 let provider = FileStoreProvider;
515 provider
516 .new_store(Url::parse("file:///").unwrap(), &Default::default())
517 .now_or_never()
518 .unwrap()
519 .unwrap()
520 }
521
522 pub fn memory() -> Self {
524 let provider = MemoryStoreProvider;
525 provider
526 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
527 .now_or_never()
528 .unwrap()
529 .unwrap()
530 }
531
532 pub fn is_local(&self) -> bool {
534 self.scheme == "file"
535 }
536
537 pub fn is_cloud(&self) -> bool {
538 self.scheme != "file" && self.scheme != "memory"
539 }
540
541 pub fn scheme(&self) -> &str {
542 &self.scheme
543 }
544
545 pub fn block_size(&self) -> usize {
546 self.block_size
547 }
548
549 pub fn max_iop_size(&self) -> u64 {
550 self.max_iop_size
551 }
552
553 pub fn io_parallelism(&self) -> usize {
554 std::env::var("LANCE_IO_THREADS")
555 .map(|val| val.parse::<usize>().unwrap())
556 .unwrap_or(self.io_parallelism)
557 }
558
559 pub fn io_tracker(&self) -> &IOTracker {
564 &self.io_tracker
565 }
566
567 pub fn io_stats_snapshot(&self) -> IoStats {
572 self.io_tracker.stats()
573 }
574
575 pub fn io_stats_incremental(&self) -> IoStats {
581 self.io_tracker.incremental_stats()
582 }
583
584 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
589 match self.scheme.as_str() {
590 "file" => {
591 LocalObjectReader::open_with_tracker(
592 path,
593 self.block_size,
594 None,
595 Arc::new(self.io_tracker.clone()),
596 )
597 .await
598 }
599 _ => Ok(Box::new(CloudObjectReader::new(
600 self.inner.clone(),
601 path.clone(),
602 self.block_size,
603 None,
604 self.download_retry_count,
605 )?)),
606 }
607 }
608
609 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
615 if known_size <= self.block_size {
618 return Ok(Box::new(SmallReader::new(
619 self.inner.clone(),
620 path.clone(),
621 self.download_retry_count,
622 known_size,
623 )));
624 }
625
626 match self.scheme.as_str() {
627 "file" => {
628 LocalObjectReader::open_with_tracker(
629 path,
630 self.block_size,
631 Some(known_size),
632 Arc::new(self.io_tracker.clone()),
633 )
634 .await
635 }
636 _ => Ok(Box::new(CloudObjectReader::new(
637 self.inner.clone(),
638 path.clone(),
639 self.block_size,
640 Some(known_size),
641 self.download_retry_count,
642 )?)),
643 }
644 }
645
646 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
648 let object_store = Self::local();
649 let absolute_path = expand_path(path.to_string_lossy())?;
650 let os_path = Path::from_absolute_path(absolute_path)?;
651 ObjectWriter::new(&object_store, &os_path).await
652 }
653
654 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
656 let object_store = Self::local();
657 let absolute_path = expand_path(path.to_string_lossy())?;
658 let os_path = Path::from_absolute_path(absolute_path)?;
659 object_store.open(&os_path).await
660 }
661
662 pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
664 match self.scheme.as_str() {
665 "file" => {
666 let local_path = super::local::to_local_path(path);
667 let local_path = std::path::PathBuf::from(&local_path);
668 if let Some(parent) = local_path.parent() {
669 tokio::fs::create_dir_all(parent).await?;
670 }
671 let parent = local_path
672 .parent()
673 .expect("file path must have parent")
674 .to_owned();
675 let named_temp =
676 tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
677 .await
678 .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
679 let (std_file, temp_path) = named_temp.into_parts();
680 let file = tokio::fs::File::from_std(std_file);
681 Ok(Box::new(LocalWriter::new(
682 file,
683 path.clone(),
684 temp_path,
685 Arc::new(self.io_tracker.clone()),
686 )))
687 }
688 _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
689 }
690 }
691
692 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
694 let mut writer = self.create(path).await?;
695 writer.write_all(content).await?;
696 Writer::shutdown(writer.as_mut()).await
697 }
698
699 pub async fn delete(&self, path: &Path) -> Result<()> {
700 self.inner.delete(path).await?;
701 Ok(())
702 }
703
704 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
705 if self.is_local() {
706 return super::local::copy_file(from, to);
708 }
709 Ok(self.inner.copy(from, to).await?)
710 }
711
712 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
714 let path = dir_path.into();
715 let path = Path::parse(&path)?;
716 let output = self.inner.list_with_delimiter(Some(&path)).await?;
717 Ok(output
718 .common_prefixes
719 .iter()
720 .chain(output.objects.iter().map(|o| &o.location))
721 .map(|s| s.filename().unwrap().to_string())
722 .collect())
723 }
724
725 pub fn list(
726 &self,
727 path: Option<Path>,
728 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
729 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
730 }
731
732 pub fn read_dir_all<'a, 'b>(
736 &'a self,
737 dir_path: impl Into<&'b Path> + Send,
738 unmodified_since: Option<DateTime<Utc>>,
739 ) -> BoxStream<'a, Result<ObjectMeta>> {
740 self.inner.read_dir_all(dir_path, unmodified_since)
741 }
742
743 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
745 let path = dir_path.into();
746 let path = Path::parse(&path)?;
747
748 if self.is_local() {
749 return super::local::remove_dir_all(&path);
751 }
752 let sub_entries = self
753 .inner
754 .list(Some(&path))
755 .map(|m| m.map(|meta| meta.location))
756 .boxed();
757 self.inner
758 .delete_stream(sub_entries)
759 .try_collect::<Vec<_>>()
760 .await?;
761 if self.scheme == "file-object-store" {
762 return super::local::remove_dir_all(&path);
765 }
766 Ok(())
767 }
768
769 pub fn remove_stream<'a>(
770 &'a self,
771 locations: BoxStream<'a, Result<Path>>,
772 ) -> BoxStream<'a, Result<Path>> {
773 self.inner
774 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
775 .err_into::<Error>()
776 .boxed()
777 }
778
779 pub async fn exists(&self, path: &Path) -> Result<bool> {
781 match self.inner.head(path).await {
782 Ok(_) => Ok(true),
783 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
784 Err(e) => Err(e.into()),
785 }
786 }
787
788 pub async fn size(&self, path: &Path) -> Result<u64> {
790 Ok(self.inner.head(path).await?.size)
791 }
792
793 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
795 let reader = self.open(path).await?;
796 Ok(reader.get_all().await?)
797 }
798
799 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
804 let reader = self.open(path).await?;
805 Ok(reader.get_range(range).await?)
806 }
807}
808
809#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
811pub enum LanceConfigKey {
812 DownloadRetryCount,
814}
815
816impl FromStr for LanceConfigKey {
817 type Err = Error;
818
819 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
820 match s.to_ascii_lowercase().as_str() {
821 "download_retry_count" => Ok(Self::DownloadRetryCount),
822 _ => Err(Error::invalid_input_source(
823 format!("Invalid LanceConfigKey: {}", s).into(),
824 )),
825 }
826 }
827}
828
829#[derive(Clone, Debug, Default)]
830pub struct StorageOptions(pub HashMap<String, String>);
831
832impl StorageOptions {
833 pub fn new(options: HashMap<String, String>) -> Self {
835 let mut options = options;
836 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
837 options.insert("allow_http".into(), value);
838 }
839 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
840 options.insert("allow_http".into(), value);
841 }
842 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
843 options.insert("allow_http".into(), value);
844 }
845 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
846 options.insert("client_max_retries".into(), value);
847 }
848 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
849 options.insert("client_retry_timeout".into(), value);
850 }
851 Self(options)
852 }
853
854 pub fn allow_http(&self) -> bool {
856 self.0.iter().any(|(key, value)| {
857 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
858 })
859 }
860
861 pub fn download_retry_count(&self) -> usize {
863 self.0
864 .iter()
865 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
866 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
867 .unwrap_or(3)
868 }
869
870 pub fn client_max_retries(&self) -> usize {
872 self.0
873 .iter()
874 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
875 .and_then(|(_, value)| value.parse::<usize>().ok())
876 .unwrap_or(10)
877 }
878
879 pub fn client_retry_timeout(&self) -> u64 {
881 self.0
882 .iter()
883 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
884 .and_then(|(_, value)| value.parse::<u64>().ok())
885 .unwrap_or(180)
886 }
887
888 pub fn get(&self, key: &str) -> Option<&String> {
889 self.0.get(key)
890 }
891
892 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
900 pub fn client_options(&self) -> Result<ClientOptions> {
901 let mut headers = HeaderMap::new();
902 for (key, value) in &self.0 {
903 if let Some(header_name) = key.strip_prefix("headers.") {
904 let name = header_name
905 .parse::<http::header::HeaderName>()
906 .map_err(|e| {
907 Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
908 })?;
909 let val = HeaderValue::from_str(value).map_err(|e| {
910 Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
911 })?;
912 headers.insert(name, val);
913 }
914 }
915 let mut client_options = ClientOptions::default();
916 if !headers.is_empty() {
917 client_options = client_options.with_default_headers(headers);
918 }
919 Ok(client_options)
920 }
921
922 pub fn expires_at_millis(&self) -> Option<u64> {
924 self.0
925 .get(EXPIRES_AT_MILLIS_KEY)
926 .and_then(|s| s.parse::<u64>().ok())
927 }
928}
929
930impl From<HashMap<String, String>> for StorageOptions {
931 fn from(value: HashMap<String, String>) -> Self {
932 Self::new(value)
933 }
934}
935
936static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
937 std::sync::LazyLock::new(ObjectStoreRegistry::default);
938
939impl ObjectStore {
940 #[allow(clippy::too_many_arguments)]
941 pub fn new(
942 store: Arc<DynObjectStore>,
943 location: Url,
944 block_size: Option<usize>,
945 wrapper: Option<Arc<dyn WrappingObjectStore>>,
946 use_constant_size_upload_parts: bool,
947 list_is_lexically_ordered: bool,
948 io_parallelism: usize,
949 download_retry_count: usize,
950 storage_options: Option<&HashMap<String, String>>,
951 ) -> Self {
952 let scheme = location.scheme();
953 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
954 let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
955 Some(provider) => provider
956 .calculate_object_store_prefix(&location, storage_options)
957 .unwrap(),
958 None => {
959 let store_prefix = format!("{}${}", location.scheme(), location.authority());
960 log::warn!(
961 "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
962 store_prefix
963 );
964 store_prefix
965 }
966 };
967 let store = match wrapper {
968 Some(wrapper) => wrapper.wrap(&store_prefix, store),
969 None => store,
970 };
971
972 let io_tracker = IOTracker::default();
974 let tracked_store = io_tracker.wrap("", store);
975
976 Self {
977 inner: tracked_store,
978 scheme: scheme.into(),
979 block_size,
980 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
981 use_constant_size_upload_parts,
982 list_is_lexically_ordered,
983 io_parallelism,
984 download_retry_count,
985 io_tracker,
986 store_prefix,
987 }
988 }
989}
990
991fn infer_block_size(scheme: &str) -> usize {
992 match scheme {
996 "file" => 4 * 1024,
997 _ => 64 * 1024,
998 }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004 use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1005 use object_store::memory::InMemory;
1006 use rstest::rstest;
1007 use std::env::set_current_dir;
1008 use std::fs::{create_dir_all, write};
1009 use std::path::Path as StdPath;
1010 use std::sync::atomic::{AtomicBool, Ordering};
1011
1012 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1014 let path = expand_path(path_str).map_err(std::io::Error::other)?;
1015 std::fs::create_dir_all(path.parent().unwrap())?;
1016 write(path, contents)
1017 }
1018
1019 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1020 let test_file_store = store.open(path).await.unwrap();
1021 let size = test_file_store.size().await.unwrap();
1022 let bytes = test_file_store.get_range(0..size).await.unwrap();
1023 let contents = String::from_utf8(bytes.to_vec()).unwrap();
1024 Ok(contents)
1025 }
1026
1027 #[tokio::test]
1028 async fn test_absolute_paths() {
1029 let tmp_path = TempStrDir::default();
1030 write_to_file(
1031 &format!("{tmp_path}/bar/foo.lance/test_file"),
1032 "TEST_CONTENT",
1033 )
1034 .unwrap();
1035
1036 for uri in &[
1038 format!("{tmp_path}/bar/foo.lance"),
1039 format!("{tmp_path}/./bar/foo.lance"),
1040 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1041 ] {
1042 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1043 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1044 .await
1045 .unwrap();
1046 assert_eq!(contents, "TEST_CONTENT");
1047 }
1048 }
1049
1050 #[tokio::test]
1051 async fn test_cloud_paths() {
1052 let uri = "s3://bucket/foo.lance";
1053 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1054 assert_eq!(store.scheme, "s3");
1055 assert_eq!(path.to_string(), "foo.lance");
1056
1057 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1058 .await
1059 .unwrap();
1060 assert_eq!(store.scheme, "s3");
1061 assert_eq!(path.to_string(), "foo.lance");
1062
1063 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1064 .await
1065 .unwrap();
1066 assert_eq!(store.scheme, "gs");
1067 assert_eq!(path.to_string(), "foo.lance");
1068
1069 let (store, path) =
1070 ObjectStore::from_uri("abfss://filesystem@account.dfs.core.windows.net/foo.lance")
1071 .await
1072 .unwrap();
1073 assert_eq!(store.scheme, "abfss");
1074 assert_eq!(path.to_string(), "foo.lance");
1075 }
1076
1077 async fn test_block_size_used_test_helper(
1078 uri: &str,
1079 storage_options: Option<HashMap<String, String>>,
1080 default_expected_block_size: usize,
1081 ) {
1082 let registry = Arc::new(ObjectStoreRegistry::default());
1084 let accessor = storage_options
1085 .clone()
1086 .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1087 let params = ObjectStoreParams {
1088 storage_options_accessor: accessor.clone(),
1089 ..ObjectStoreParams::default()
1090 };
1091 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1092 .await
1093 .unwrap();
1094 assert_eq!(store.block_size, default_expected_block_size);
1095
1096 let registry = Arc::new(ObjectStoreRegistry::default());
1098 let params = ObjectStoreParams {
1099 block_size: Some(1024),
1100 storage_options_accessor: accessor,
1101 ..ObjectStoreParams::default()
1102 };
1103 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1104 .await
1105 .unwrap();
1106 assert_eq!(store.block_size, 1024);
1107 }
1108
1109 #[rstest]
1110 #[case("s3://bucket/foo.lance", None)]
1111 #[case("gs://bucket/foo.lance", None)]
1112 #[case("az://account/bucket/foo.lance",
1113 Some(HashMap::from([
1114 (String::from("account_name"), String::from("account")),
1115 (String::from("container_name"), String::from("container"))
1116 ])))]
1117 #[case("abfss://filesystem@account.dfs.core.windows.net/foo.lance",
1118 Some(HashMap::from([
1119 (String::from("account_name"), String::from("account")),
1120 (String::from("container_name"), String::from("filesystem"))
1121 ])))]
1122 #[tokio::test]
1123 async fn test_block_size_used_cloud(
1124 #[case] uri: &str,
1125 #[case] storage_options: Option<HashMap<String, String>>,
1126 ) {
1127 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1128 }
1129
1130 #[rstest]
1131 #[case("file")]
1132 #[case("file-object-store")]
1133 #[case("memory:///bucket/foo.lance")]
1134 #[tokio::test]
1135 async fn test_block_size_used_file(#[case] prefix: &str) {
1136 let tmp_path = TempStrDir::default();
1137 let path = format!("{tmp_path}/bar/foo.lance/test_file");
1138 write_to_file(&path, "URL").unwrap();
1139 let uri = format!("{prefix}:///{path}");
1140 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1141 }
1142
1143 #[tokio::test]
1144 async fn test_relative_paths() {
1145 let tmp_path = TempStrDir::default();
1146 write_to_file(
1147 &format!("{tmp_path}/bar/foo.lance/test_file"),
1148 "RELATIVE_URL",
1149 )
1150 .unwrap();
1151
1152 set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1153 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1154
1155 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1156 .await
1157 .unwrap();
1158 assert_eq!(contents, "RELATIVE_URL");
1159 }
1160
1161 #[tokio::test]
1162 async fn test_tilde_expansion() {
1163 let uri = "~/foo.lance";
1164 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1165 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1166 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1167 .await
1168 .unwrap();
1169 assert_eq!(contents, "TILDE");
1170 }
1171
1172 #[tokio::test]
1173 async fn test_read_directory() {
1174 let path = TempStdDir::default();
1175 create_dir_all(path.join("foo").join("bar")).unwrap();
1176 create_dir_all(path.join("foo").join("zoo")).unwrap();
1177 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1178 write_to_file(
1179 path.join("foo").join("test_file").to_str().unwrap(),
1180 "read_dir",
1181 )
1182 .unwrap();
1183 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1184
1185 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1186 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1187 }
1188
1189 #[tokio::test]
1190 async fn test_delete_directory_local_store() {
1191 test_delete_directory("").await;
1192 }
1193
1194 #[tokio::test]
1195 async fn test_delete_directory_file_object_store() {
1196 test_delete_directory("file-object-store").await;
1197 }
1198
1199 async fn test_delete_directory(scheme: &str) {
1200 let path = TempStdDir::default();
1201 create_dir_all(path.join("foo").join("bar")).unwrap();
1202 create_dir_all(path.join("foo").join("zoo")).unwrap();
1203 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1204 write_to_file(
1205 path.join("foo")
1206 .join("bar")
1207 .join("test_file")
1208 .to_str()
1209 .unwrap(),
1210 "delete",
1211 )
1212 .unwrap();
1213 let file_url = Url::from_directory_path(&path).unwrap();
1214 let url = if scheme.is_empty() {
1215 file_url
1216 } else {
1217 let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1218 url.set_path(file_url.path());
1220 url
1221 };
1222 let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1223 store.remove_dir_all(base.child("foo")).await.unwrap();
1224
1225 assert!(!path.join("foo").exists());
1226 }
1227
1228 #[derive(Debug)]
1229 struct TestWrapper {
1230 called: AtomicBool,
1231
1232 return_value: Arc<dyn OSObjectStore>,
1233 }
1234
1235 impl WrappingObjectStore for TestWrapper {
1236 fn wrap(
1237 &self,
1238 _store_prefix: &str,
1239 _original: Arc<dyn OSObjectStore>,
1240 ) -> Arc<dyn OSObjectStore> {
1241 self.called.store(true, Ordering::Relaxed);
1242
1243 self.return_value.clone()
1245 }
1246 }
1247
1248 impl TestWrapper {
1249 fn called(&self) -> bool {
1250 self.called.load(Ordering::Relaxed)
1251 }
1252 }
1253
1254 #[tokio::test]
1255 async fn test_wrapping_object_store_option_is_used() {
1256 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1258 let registry = Arc::new(ObjectStoreRegistry::default());
1259
1260 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1261
1262 let wrapper = Arc::new(TestWrapper {
1263 called: AtomicBool::new(false),
1264 return_value: mock_inner_store.clone(),
1265 });
1266
1267 let params = ObjectStoreParams {
1268 object_store_wrapper: Some(wrapper.clone()),
1269 ..ObjectStoreParams::default()
1270 };
1271
1272 assert!(!wrapper.called());
1274
1275 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
1276 .await
1277 .unwrap();
1278
1279 assert!(wrapper.called());
1281
1282 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1285 }
1286
1287 #[tokio::test]
1288 async fn test_local_paths() {
1289 let file_path = TempStdFile::default();
1290 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1291 writer.write_all(b"LOCAL").await.unwrap();
1292 Writer::shutdown(&mut writer).await.unwrap();
1293
1294 let reader = ObjectStore::open_local(&file_path).await.unwrap();
1295 let buf = reader.get_range(0..5).await.unwrap();
1296 assert_eq!(buf.as_ref(), b"LOCAL");
1297 }
1298
1299 #[tokio::test]
1300 async fn test_read_one() {
1301 let file_path = TempStdFile::default();
1302 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1303 writer.write_all(b"LOCAL").await.unwrap();
1304 Writer::shutdown(&mut writer).await.unwrap();
1305
1306 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1307 let obj_store = ObjectStore::local();
1308 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1309 assert_eq!(buf.as_ref(), b"LOCAL");
1310
1311 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1312 assert_eq!(buf.as_ref(), b"LOCAL");
1313 }
1314
1315 #[tokio::test]
1316 #[cfg(windows)]
1317 async fn test_windows_paths() {
1318 use std::path::Component;
1319 use std::path::Prefix;
1320 use std::path::Prefix::*;
1321
1322 fn get_path_prefix(path: &StdPath) -> Prefix {
1323 match path.components().next().unwrap() {
1324 Component::Prefix(prefix_component) => prefix_component.kind(),
1325 _ => panic!(),
1326 }
1327 }
1328
1329 fn get_drive_letter(prefix: Prefix) -> String {
1330 match prefix {
1331 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1332 _ => panic!(),
1333 }
1334 }
1335
1336 let tmp_path = TempStdFile::default();
1337 let prefix = get_path_prefix(&tmp_path);
1338 let drive_letter = get_drive_letter(prefix);
1339
1340 write_to_file(
1341 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1342 "WINDOWS",
1343 )
1344 .unwrap();
1345
1346 for uri in &[
1347 format!("{drive_letter}:/test_folder/test.lance"),
1348 format!("{drive_letter}:\\test_folder\\test.lance"),
1349 ] {
1350 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1351 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1352 .await
1353 .unwrap();
1354 assert_eq!(contents, "WINDOWS");
1355 }
1356 }
1357
1358 #[tokio::test]
1359 async fn test_cross_filesystem_copy() {
1360 let source_dir = TempStdDir::default();
1362 let dest_dir = TempStdDir::default();
1363
1364 let source_file_name = "test_file.txt";
1366 let source_file = source_dir.join(source_file_name);
1367 std::fs::write(&source_file, b"test content").unwrap();
1368
1369 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1371 .await
1372 .unwrap();
1373
1374 let from_path = base_path.child(source_file_name);
1376
1377 let dest_file = dest_dir.join("copied_file.txt");
1379 let dest_str = dest_file.to_str().unwrap();
1380 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1381
1382 store.copy(&from_path, &to_path).await.unwrap();
1384
1385 assert!(dest_file.exists());
1387 let copied_content = std::fs::read(&dest_file).unwrap();
1388 assert_eq!(copied_content, b"test content");
1389 }
1390
1391 #[tokio::test]
1392 async fn test_copy_creates_parent_directories() {
1393 let source_dir = TempStdDir::default();
1394 let dest_dir = TempStdDir::default();
1395
1396 let source_file_name = "test_file.txt";
1398 let source_file = source_dir.join(source_file_name);
1399 std::fs::write(&source_file, b"test content").unwrap();
1400
1401 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1403 .await
1404 .unwrap();
1405
1406 let from_path = base_path.child(source_file_name);
1408
1409 let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1411 let dest_str = dest_file.to_str().unwrap();
1412 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1413
1414 store.copy(&from_path, &to_path).await.unwrap();
1416
1417 assert!(dest_file.exists());
1419 assert!(dest_file.parent().unwrap().exists());
1420 let copied_content = std::fs::read(&dest_file).unwrap();
1421 assert_eq!(copied_content, b"test content");
1422 }
1423
1424 #[test]
1425 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1426 fn test_client_options_extracts_headers() {
1427 let opts = StorageOptions(HashMap::from([
1428 ("headers.x-custom-foo".to_string(), "bar".to_string()),
1429 ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1430 ("region".to_string(), "us-west-2".to_string()),
1431 ]));
1432 let client_options = opts.client_options().unwrap();
1433
1434 let opts_no_headers = StorageOptions(HashMap::from([(
1437 "region".to_string(),
1438 "us-west-2".to_string(),
1439 )]));
1440 opts_no_headers.client_options().unwrap();
1441
1442 #[cfg(feature = "gcp")]
1446 {
1447 use object_store::gcp::GoogleCloudStorageBuilder;
1448 let _builder = GoogleCloudStorageBuilder::new()
1449 .with_client_options(client_options)
1450 .with_url("gs://test-bucket");
1451 }
1452 }
1453
1454 #[test]
1455 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1456 fn test_client_options_rejects_invalid_header_name() {
1457 let opts = StorageOptions(HashMap::from([(
1458 "headers.bad header".to_string(),
1459 "value".to_string(),
1460 )]));
1461 let err = opts.client_options().unwrap_err();
1462 assert!(err.to_string().contains("invalid header name"));
1463 }
1464
1465 #[test]
1466 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1467 fn test_client_options_rejects_invalid_header_value() {
1468 let opts = StorageOptions(HashMap::from([(
1469 "headers.x-good-name".to_string(),
1470 "bad\x01value".to_string(),
1471 )]));
1472 let err = opts.client_options().unwrap_err();
1473 assert!(err.to_string().contains("invalid header value"));
1474 }
1475
1476 #[test]
1477 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1478 fn test_client_options_empty_when_no_header_keys() {
1479 let opts = StorageOptions(HashMap::from([
1480 ("region".to_string(), "us-east-1".to_string()),
1481 ("access_key_id".to_string(), "AKID".to_string()),
1482 ]));
1483 opts.client_options().unwrap();
1484 }
1485}