1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{FutureExt, Stream};
18use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::ObjectStoreExt as OSObjectStoreExt;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35#[cfg(target_os = "linux")]
36use crate::uring::{UringCurrentThreadReader, UringReader};
37#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
38pub(crate) mod dynamic_credentials;
39#[cfg(any(feature = "oss", feature = "huggingface"))]
40pub(crate) mod dynamic_opendal;
41mod list_retry;
42pub mod providers;
43pub mod storage_options;
44#[cfg(test)]
45pub(crate) mod test_utils;
46pub mod throttle;
47mod tracing;
48use crate::object_reader::SmallReader;
49use crate::object_writer::{LocalWriter, WriteResult};
50use crate::traits::Writer;
51use crate::utils::tracking_store::{IOTracker, IoStats};
52use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
53use lance_core::{Error, Result};
54
55pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
60pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
62
63const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
65const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
68 std::env::var("LANCE_MAX_IOP_SIZE")
69 .map(|val| val.parse().unwrap())
70 .unwrap_or(16 * 1024 * 1024)
71});
72
73pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
74
75pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
76pub use storage_options::{
77 EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
78 StorageOptionsAccessor, StorageOptionsProvider,
79};
80
81#[async_trait]
82pub trait ObjectStoreExt {
83 async fn exists(&self, path: &Path) -> Result<bool>;
85
86 fn read_dir_all<'a, 'b>(
90 &'a self,
91 dir_path: impl Into<&'b Path> + Send,
92 unmodified_since: Option<DateTime<Utc>>,
93 ) -> BoxStream<'a, Result<ObjectMeta>>;
94}
95
96#[async_trait]
97impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
98 fn read_dir_all<'a, 'b>(
99 &'a self,
100 dir_path: impl Into<&'b Path> + Send,
101 unmodified_since: Option<DateTime<Utc>>,
102 ) -> BoxStream<'a, Result<ObjectMeta>> {
103 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
104 if let Some(unmodified_since_val) = unmodified_since {
105 output
106 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
107 .boxed()
108 } else {
109 output.boxed()
110 }
111 }
112
113 async fn exists(&self, path: &Path) -> Result<bool> {
114 match self.head(path).await {
115 Ok(_) => Ok(true),
116 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
117 Err(e) => Err(e.into()),
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct ObjectStore {
125 pub inner: Arc<dyn OSObjectStore>,
127 scheme: String,
128 block_size: usize,
129 max_iop_size: u64,
130 pub use_constant_size_upload_parts: bool,
133 pub list_is_lexically_ordered: bool,
136 io_parallelism: usize,
137 download_retry_count: usize,
139 io_tracker: IOTracker,
141 pub store_prefix: String,
145}
146
147impl DeepSizeOf for ObjectStore {
148 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
149 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
154 }
155}
156
157impl std::fmt::Display for ObjectStore {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 write!(f, "ObjectStore({})", self.scheme)
160 }
161}
162
163pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
164 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
169}
170
171#[derive(Debug, Clone)]
172pub struct ChainedWrappingObjectStore {
173 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
174}
175
176impl ChainedWrappingObjectStore {
177 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
178 Self { wrappers }
179 }
180
181 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
182 self.wrappers.push(wrapper);
183 }
184}
185
186impl WrappingObjectStore for ChainedWrappingObjectStore {
187 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
188 self.wrappers
189 .iter()
190 .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
191 }
192}
193
194#[derive(Debug, Clone)]
197pub struct ObjectStoreParams {
198 pub block_size: Option<usize>,
199 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
200 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
201 pub s3_credentials_refresh_offset: Duration,
204 #[cfg(feature = "aws")]
205 pub aws_credentials: Option<AwsCredentialProvider>,
206 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
207 pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
213 pub use_constant_size_upload_parts: bool,
218 pub list_is_lexically_ordered: Option<bool>,
219}
220
221impl Default for ObjectStoreParams {
222 fn default() -> Self {
223 #[allow(deprecated)]
224 Self {
225 object_store: None,
226 block_size: None,
227 s3_credentials_refresh_offset: Duration::from_secs(60),
228 #[cfg(feature = "aws")]
229 aws_credentials: None,
230 object_store_wrapper: None,
231 storage_options_accessor: None,
232 use_constant_size_upload_parts: false,
233 list_is_lexically_ordered: None,
234 }
235 }
236}
237
238impl ObjectStoreParams {
239 pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
241 self.storage_options_accessor.clone()
242 }
243
244 pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
248 self.storage_options_accessor
249 .as_ref()
250 .and_then(|a| a.initial_storage_options())
251 }
252}
253
254impl std::hash::Hash for ObjectStoreParams {
256 #[allow(deprecated)]
257 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
258 self.block_size.hash(state);
260 if let Some((store, url)) = &self.object_store {
261 Arc::as_ptr(store).hash(state);
262 url.hash(state);
263 }
264 self.s3_credentials_refresh_offset.hash(state);
265 #[cfg(feature = "aws")]
266 if let Some(aws_credentials) = &self.aws_credentials {
267 Arc::as_ptr(aws_credentials).hash(state);
268 }
269 if let Some(wrapper) = &self.object_store_wrapper {
270 Arc::as_ptr(wrapper).hash(state);
271 }
272 if let Some(accessor) = &self.storage_options_accessor {
273 accessor.accessor_id().hash(state);
274 }
275 self.use_constant_size_upload_parts.hash(state);
276 self.list_is_lexically_ordered.hash(state);
277 }
278}
279
280impl Eq for ObjectStoreParams {}
282impl PartialEq for ObjectStoreParams {
283 #[allow(deprecated)]
284 fn eq(&self, other: &Self) -> bool {
285 #[cfg(feature = "aws")]
286 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
287 return false;
288 }
289
290 self.block_size == other.block_size
293 && self
294 .object_store
295 .as_ref()
296 .map(|(store, url)| (Arc::as_ptr(store), url))
297 == other
298 .object_store
299 .as_ref()
300 .map(|(store, url)| (Arc::as_ptr(store), url))
301 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
302 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
303 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
304 && self
305 .storage_options_accessor
306 .as_ref()
307 .map(|a| a.accessor_id())
308 == other
309 .storage_options_accessor
310 .as_ref()
311 .map(|a| a.accessor_id())
312 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
313 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
314 }
315}
316
317pub fn uri_to_url(uri: &str) -> Result<Url> {
338 match Url::parse(uri) {
339 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
340 local_path_to_url(uri)
342 }
343 Ok(url) => Ok(url),
344 Err(_) => local_path_to_url(uri),
345 }
346}
347
348fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
349 let str_path = str_path.as_ref();
350 let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
351
352 let mut expanded_path = path_abs::PathAbs::new(expanded)
353 .unwrap()
354 .as_path()
355 .to_path_buf();
356 if let Some(s) = expanded_path.as_path().to_str()
358 && s.is_empty()
359 {
360 expanded_path = std::env::current_dir()?;
361 }
362
363 Ok(expanded_path)
364}
365
366fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
367 let home_dir = std::env::home_dir()?;
368 if path == "~" {
369 return Some(home_dir);
370 }
371 if let Some(stripped) = path.strip_prefix("~/") {
372 return Some(home_dir.join(stripped));
373 }
374 #[cfg(windows)]
375 if let Some(stripped) = path.strip_prefix("~\\") {
376 return Some(home_dir.join(stripped));
377 }
378
379 None
380}
381
382fn local_path_to_url(str_path: &str) -> Result<Url> {
383 let expanded_path = expand_path(str_path)?;
384
385 Url::from_directory_path(expanded_path).map_err(|_| {
386 Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
387 })
388}
389
390#[cfg(feature = "huggingface")]
391fn parse_hf_repo_id(url: &Url) -> Result<String> {
392 let mut segments: Vec<String> = Vec::new();
394 if let Some(host) = url.host_str() {
395 segments.push(host.to_string());
396 }
397 segments.extend(
398 url.path()
399 .trim_start_matches('/')
400 .split('/')
401 .map(|s| s.to_string()),
402 );
403
404 if segments.len() < 2 {
405 return Err(Error::invalid_input(
406 "Huggingface URL must contain at least owner and repo",
407 ));
408 }
409
410 let repo_type_candidates = ["models", "datasets", "spaces"];
411 let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
412 if segments.len() < 3 {
413 return Err(Error::invalid_input(
414 "Huggingface URL missing owner/repo after repo type",
415 ));
416 }
417 (segments[1].as_str(), segments[2].as_str())
418 } else {
419 (segments[0].as_str(), segments[1].as_str())
420 };
421
422 let repo = repo_with_rev
423 .split_once('@')
424 .map(|(r, _)| r)
425 .unwrap_or(repo_with_rev);
426 Ok(format!("{owner}/{repo}"))
427}
428
429impl ObjectStore {
430 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
438 let registry = Arc::new(ObjectStoreRegistry::default());
439
440 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
441 }
442
443 pub async fn from_uri_and_params(
447 registry: Arc<ObjectStoreRegistry>,
448 uri: &str,
449 params: &ObjectStoreParams,
450 ) -> Result<(Arc<Self>, Path)> {
451 #[allow(deprecated)]
452 if let Some((store, path)) = params.object_store.as_ref() {
453 let mut inner = store.clone();
454 let store_prefix =
455 registry.calculate_object_store_prefix(uri, params.storage_options())?;
456 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
457 inner = wrapper.wrap(&store_prefix, inner);
458 }
459
460 let io_tracker = IOTracker::default();
462 let tracked_store = io_tracker.wrap("", inner);
463
464 let store = Self {
465 inner: tracked_store,
466 scheme: path.scheme().to_string(),
467 block_size: params.block_size.unwrap_or(64 * 1024),
468 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
469 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
470 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
471 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
472 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
473 io_tracker,
474 store_prefix,
475 };
476 let path = Path::parse(path.path())?;
477 return Ok((Arc::new(store), path));
478 }
479 let url = uri_to_url(uri)?;
480
481 let store = registry.get_store(url.clone(), params).await?;
482 let provider = registry.get_provider(url.scheme()).expect_ok()?;
484 let path = provider.extract_path(&url)?;
485
486 Ok((store, path))
487 }
488
489 pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
503 let url = uri_to_url(uri)?;
504 let provider = registry
505 .get_provider(url.scheme())
506 .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
507 provider.extract_path(&url)
508 }
509
510 #[deprecated(note = "Use `from_uri` instead")]
511 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
512 Self::from_uri_and_params(
513 Arc::new(ObjectStoreRegistry::default()),
514 str_path,
515 &Default::default(),
516 )
517 .now_or_never()
518 .unwrap()
519 }
520
521 pub fn local() -> Self {
523 let provider = FileStoreProvider;
524 provider
525 .new_store(Url::parse("file:///").unwrap(), &Default::default())
526 .now_or_never()
527 .unwrap()
528 .unwrap()
529 }
530
531 pub fn memory() -> Self {
533 let provider = MemoryStoreProvider;
534 provider
535 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
536 .now_or_never()
537 .unwrap()
538 .unwrap()
539 }
540
541 pub fn is_local(&self) -> bool {
543 self.scheme == "file" || self.scheme == "file+uring"
544 }
545
546 pub fn is_cloud(&self) -> bool {
547 if self.is_local() || self.scheme == "memory" || self.scheme == "shared-memory" {
548 return false;
549 }
550 true
551 }
552
553 pub fn prefers_lite_scheduler(&self) -> bool {
558 self.scheme == "file+uring"
559 }
560
561 pub fn scheme(&self) -> &str {
562 &self.scheme
563 }
564
565 pub fn block_size(&self) -> usize {
566 self.block_size
567 }
568
569 pub fn max_iop_size(&self) -> u64 {
570 self.max_iop_size
571 }
572
573 pub fn io_parallelism(&self) -> usize {
574 std::env::var("LANCE_IO_THREADS")
575 .map(|val| val.parse::<usize>().unwrap())
576 .unwrap_or(self.io_parallelism)
577 }
578
579 pub fn io_tracker(&self) -> &IOTracker {
584 &self.io_tracker
585 }
586
587 pub fn io_stats_snapshot(&self) -> IoStats {
592 self.io_tracker.stats()
593 }
594
595 pub fn io_stats_incremental(&self) -> IoStats {
601 self.io_tracker.incremental_stats()
602 }
603
604 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
609 match self.scheme.as_str() {
610 "file" => {
611 LocalObjectReader::open_with_tracker(
612 path,
613 self.block_size,
614 None,
615 Arc::new(self.io_tracker.clone()),
616 )
617 .await
618 }
619 #[cfg(target_os = "linux")]
620 "file+uring" => {
621 let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
623 .map(|v| str_is_truthy(&v))
624 .unwrap_or(false);
625
626 if use_current_thread {
627 UringCurrentThreadReader::open(
628 path,
629 self.block_size,
630 None,
631 Arc::new(self.io_tracker.clone()),
632 )
633 .await
634 } else {
635 UringReader::open(
636 path,
637 self.block_size,
638 None,
639 Arc::new(self.io_tracker.clone()),
640 )
641 .await
642 }
643 }
644 _ => Ok(Box::new(CloudObjectReader::new(
645 self.inner.clone(),
646 path.clone(),
647 self.block_size,
648 None,
649 self.download_retry_count,
650 )?)),
651 }
652 }
653
654 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
660 if known_size <= self.block_size {
663 return Ok(Box::new(SmallReader::new(
664 self.inner.clone(),
665 path.clone(),
666 self.download_retry_count,
667 known_size,
668 )));
669 }
670
671 match self.scheme.as_str() {
672 "file" => {
673 LocalObjectReader::open_with_tracker(
674 path,
675 self.block_size,
676 Some(known_size),
677 Arc::new(self.io_tracker.clone()),
678 )
679 .await
680 }
681 #[cfg(target_os = "linux")]
682 "file+uring" => {
683 let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
685 .map(|v| str_is_truthy(&v))
686 .unwrap_or(false);
687
688 if use_current_thread {
689 UringCurrentThreadReader::open(
690 path,
691 self.block_size,
692 Some(known_size),
693 Arc::new(self.io_tracker.clone()),
694 )
695 .await
696 } else {
697 UringReader::open(
698 path,
699 self.block_size,
700 Some(known_size),
701 Arc::new(self.io_tracker.clone()),
702 )
703 .await
704 }
705 }
706 _ => Ok(Box::new(CloudObjectReader::new(
707 self.inner.clone(),
708 path.clone(),
709 self.block_size,
710 Some(known_size),
711 self.download_retry_count,
712 )?)),
713 }
714 }
715
716 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
718 let object_store = Self::local();
719 let absolute_path = expand_path(path.to_string_lossy())?;
720 let os_path = Path::from_absolute_path(absolute_path)?;
721 ObjectWriter::new(&object_store, &os_path).await
722 }
723
724 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
726 let object_store = Self::local();
727 let absolute_path = expand_path(path.to_string_lossy())?;
728 let os_path = Path::from_absolute_path(absolute_path)?;
729 object_store.open(&os_path).await
730 }
731
732 pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
734 match self.scheme.as_str() {
735 "file" => {
736 let local_path = super::local::to_local_path(path);
737 let local_path = std::path::PathBuf::from(&local_path);
738 if let Some(parent) = local_path.parent() {
739 tokio::fs::create_dir_all(parent).await?;
740 }
741 let parent = local_path
742 .parent()
743 .expect("file path must have parent")
744 .to_owned();
745 let named_temp =
746 tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
747 .await
748 .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
749 let (std_file, temp_path) = named_temp.into_parts();
750 let file = tokio::fs::File::from_std(std_file);
751 Ok(Box::new(LocalWriter::new(
752 file,
753 path.clone(),
754 temp_path,
755 Arc::new(self.io_tracker.clone()),
756 )))
757 }
758 _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
759 }
760 }
761
762 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
764 let mut writer = self.create(path).await?;
765 writer.write_all(content).await?;
766 Writer::shutdown(writer.as_mut()).await
767 }
768
769 pub async fn delete(&self, path: &Path) -> Result<()> {
770 self.inner.delete(path).await?;
771 Ok(())
772 }
773
774 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
775 if self.is_local() {
776 return super::local::copy_file(from, to);
778 }
779 Ok(self.inner.copy(from, to).await?)
780 }
781
782 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
784 let path = dir_path.into();
785 let path = Path::parse(&path)?;
786 let output = self.inner.list_with_delimiter(Some(&path)).await?;
787 Ok(output
788 .common_prefixes
789 .iter()
790 .chain(output.objects.iter().map(|o| &o.location))
791 .map(|s| s.filename().unwrap().to_string())
792 .collect())
793 }
794
795 pub fn list(
796 &self,
797 path: Option<Path>,
798 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
799 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
800 }
801
802 pub fn read_dir_all<'a, 'b>(
806 &'a self,
807 dir_path: impl Into<&'b Path> + Send,
808 unmodified_since: Option<DateTime<Utc>>,
809 ) -> BoxStream<'a, Result<ObjectMeta>> {
810 self.inner.read_dir_all(dir_path, unmodified_since)
811 }
812
813 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
815 let path = dir_path.into();
816 let path = Path::parse(&path)?;
817
818 if self.is_local() {
819 return super::local::remove_dir_all(&path);
821 }
822 let sub_entries = self
823 .inner
824 .list(Some(&path))
825 .map(|m| m.map(|meta| meta.location))
826 .boxed();
827 self.inner
828 .delete_stream(sub_entries)
829 .try_collect::<Vec<_>>()
830 .await?;
831 if self.scheme == "file-object-store" {
832 return super::local::remove_dir_all(&path);
835 }
836 Ok(())
837 }
838
839 pub fn remove_stream<'a>(
840 &'a self,
841 locations: BoxStream<'a, Result<Path>>,
842 ) -> BoxStream<'a, Result<Path>> {
843 let store = Arc::clone(&self.inner);
844 locations
845 .and_then(move |location| {
846 let store = Arc::clone(&store);
847 async move {
848 store.delete(&location).await?;
849 Ok(location)
850 }
851 })
852 .boxed()
853 }
854
855 pub async fn exists(&self, path: &Path) -> Result<bool> {
857 match self.inner.head(path).await {
858 Ok(_) => Ok(true),
859 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
860 Err(e) => Err(e.into()),
861 }
862 }
863
864 pub async fn size(&self, path: &Path) -> Result<u64> {
866 Ok(self.inner.head(path).await?.size)
867 }
868
869 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
871 let reader = self.open(path).await?;
872 Ok(reader.get_all().await?)
873 }
874
875 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
880 let reader = self.open(path).await?;
881 Ok(reader.get_range(range).await?)
882 }
883}
884
885#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
887pub enum LanceConfigKey {
888 DownloadRetryCount,
890}
891
892impl FromStr for LanceConfigKey {
893 type Err = Error;
894
895 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
896 match s.to_ascii_lowercase().as_str() {
897 "download_retry_count" => Ok(Self::DownloadRetryCount),
898 _ => Err(Error::invalid_input_source(
899 format!("Invalid LanceConfigKey: {}", s).into(),
900 )),
901 }
902 }
903}
904
905#[derive(Clone, Debug, Default)]
906pub struct StorageOptions(pub HashMap<String, String>);
907
908impl StorageOptions {
909 pub fn new(options: HashMap<String, String>) -> Self {
911 let mut options = options;
912 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
913 options.insert("allow_http".into(), value);
914 }
915 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
916 options.insert("allow_http".into(), value);
917 }
918 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
919 options.insert("allow_http".into(), value);
920 }
921 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
922 options.insert("client_max_retries".into(), value);
923 }
924 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
925 options.insert("client_retry_timeout".into(), value);
926 }
927 Self(options)
928 }
929
930 pub fn allow_http(&self) -> bool {
932 self.0.iter().any(|(key, value)| {
933 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
934 })
935 }
936
937 pub fn download_retry_count(&self) -> usize {
939 self.0
940 .iter()
941 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
942 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
943 .unwrap_or(3)
944 }
945
946 pub fn client_max_retries(&self) -> usize {
948 self.0
949 .iter()
950 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
951 .and_then(|(_, value)| value.parse::<usize>().ok())
952 .unwrap_or(3)
953 }
954
955 pub fn client_retry_timeout(&self) -> u64 {
957 self.0
958 .iter()
959 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
960 .and_then(|(_, value)| value.parse::<u64>().ok())
961 .unwrap_or(180)
962 }
963
964 pub fn get(&self, key: &str) -> Option<&String> {
965 self.0.get(key)
966 }
967
968 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
976 pub fn client_options(&self) -> Result<ClientOptions> {
977 let mut headers = HeaderMap::new();
978 for (key, value) in &self.0 {
979 if let Some(header_name) = key.strip_prefix("headers.") {
980 let name = header_name
981 .parse::<http::header::HeaderName>()
982 .map_err(|e| {
983 Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
984 })?;
985 let val = HeaderValue::from_str(value).map_err(|e| {
986 Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
987 })?;
988 headers.insert(name, val);
989 }
990 }
991 let mut client_options = ClientOptions::default();
992 if !headers.is_empty() {
993 client_options = client_options.with_default_headers(headers);
994 }
995 Ok(client_options)
996 }
997
998 pub fn expires_at_millis(&self) -> Option<u64> {
1000 self.0
1001 .get(EXPIRES_AT_MILLIS_KEY)
1002 .and_then(|s| s.parse::<u64>().ok())
1003 }
1004}
1005
1006impl From<HashMap<String, String>> for StorageOptions {
1007 fn from(value: HashMap<String, String>) -> Self {
1008 Self::new(value)
1009 }
1010}
1011
1012static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
1013 std::sync::LazyLock::new(ObjectStoreRegistry::default);
1014
1015impl ObjectStore {
1016 #[allow(clippy::too_many_arguments)]
1017 pub fn new(
1018 store: Arc<DynObjectStore>,
1019 location: Url,
1020 block_size: Option<usize>,
1021 wrapper: Option<Arc<dyn WrappingObjectStore>>,
1022 use_constant_size_upload_parts: bool,
1023 list_is_lexically_ordered: bool,
1024 io_parallelism: usize,
1025 download_retry_count: usize,
1026 storage_options: Option<&HashMap<String, String>>,
1027 ) -> Self {
1028 let scheme = location.scheme();
1029 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
1030 let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
1031 Some(provider) => provider
1032 .calculate_object_store_prefix(&location, storage_options)
1033 .unwrap(),
1034 None => {
1035 let store_prefix = format!("{}${}", location.scheme(), location.authority());
1036 log::warn!(
1037 "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
1038 store_prefix
1039 );
1040 store_prefix
1041 }
1042 };
1043 let store = match wrapper {
1044 Some(wrapper) => wrapper.wrap(&store_prefix, store),
1045 None => store,
1046 };
1047
1048 let io_tracker = IOTracker::default();
1050 let tracked_store = io_tracker.wrap("", store);
1051
1052 Self {
1053 inner: tracked_store,
1054 scheme: scheme.into(),
1055 block_size,
1056 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
1057 use_constant_size_upload_parts,
1058 list_is_lexically_ordered,
1059 io_parallelism,
1060 download_retry_count,
1061 io_tracker,
1062 store_prefix,
1063 }
1064 }
1065}
1066
1067fn infer_block_size(scheme: &str) -> usize {
1068 match scheme {
1072 "file" => 4 * 1024,
1073 _ => 64 * 1024,
1074 }
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079 use super::*;
1080 use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1081 use object_store::memory::InMemory;
1082 use rstest::rstest;
1083 use std::env::set_current_dir;
1084 use std::fs::{create_dir_all, write};
1085 use std::path::Path as StdPath;
1086 use std::sync::atomic::{AtomicBool, Ordering};
1087
1088 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1090 let path = expand_path(path_str).map_err(std::io::Error::other)?;
1091 std::fs::create_dir_all(path.parent().unwrap())?;
1092 write(path, contents)
1093 }
1094
1095 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1096 let test_file_store = store.open(path).await.unwrap();
1097 let size = test_file_store.size().await.unwrap();
1098 let bytes = test_file_store.get_range(0..size).await.unwrap();
1099 let contents = String::from_utf8(bytes.to_vec()).unwrap();
1100 Ok(contents)
1101 }
1102
1103 #[tokio::test]
1104 async fn test_absolute_paths() {
1105 let tmp_path = TempStrDir::default();
1106 write_to_file(
1107 &format!("{tmp_path}/bar/foo.lance/test_file"),
1108 "TEST_CONTENT",
1109 )
1110 .unwrap();
1111
1112 for uri in &[
1114 format!("{tmp_path}/bar/foo.lance"),
1115 format!("{tmp_path}/./bar/foo.lance"),
1116 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1117 ] {
1118 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1119 let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1120 .await
1121 .unwrap();
1122 assert_eq!(contents, "TEST_CONTENT");
1123 }
1124 }
1125
1126 #[tokio::test]
1127 async fn test_cloud_paths() {
1128 let uri = "s3://bucket/foo.lance";
1129 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1130 assert_eq!(store.scheme, "s3");
1131 assert_eq!(path.to_string(), "foo.lance");
1132
1133 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1134 .await
1135 .unwrap();
1136 assert_eq!(store.scheme, "s3");
1137 assert_eq!(path.to_string(), "foo.lance");
1138
1139 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1140 .await
1141 .unwrap();
1142 assert_eq!(store.scheme, "gs");
1143 assert_eq!(path.to_string(), "foo.lance");
1144
1145 let (store, path) =
1146 ObjectStore::from_uri("abfss://filesystem@account.dfs.core.windows.net/foo.lance")
1147 .await
1148 .unwrap();
1149 assert_eq!(store.scheme, "abfss");
1150 assert_eq!(path.to_string(), "foo.lance");
1151 }
1152
1153 async fn test_block_size_used_test_helper(
1154 uri: &str,
1155 storage_options: Option<HashMap<String, String>>,
1156 default_expected_block_size: usize,
1157 ) {
1158 let registry = Arc::new(ObjectStoreRegistry::default());
1160 let accessor = storage_options
1161 .clone()
1162 .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1163 let params = ObjectStoreParams {
1164 storage_options_accessor: accessor.clone(),
1165 ..ObjectStoreParams::default()
1166 };
1167 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1168 .await
1169 .unwrap();
1170 assert_eq!(store.block_size, default_expected_block_size);
1171
1172 let registry = Arc::new(ObjectStoreRegistry::default());
1174 let params = ObjectStoreParams {
1175 block_size: Some(1024),
1176 storage_options_accessor: accessor,
1177 ..ObjectStoreParams::default()
1178 };
1179 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1180 .await
1181 .unwrap();
1182 assert_eq!(store.block_size, 1024);
1183 }
1184
1185 #[rstest]
1186 #[case("s3://bucket/foo.lance", None)]
1187 #[case("gs://bucket/foo.lance", None)]
1188 #[case("az://account/bucket/foo.lance",
1189 Some(HashMap::from([
1190 (String::from("account_name"), String::from("account")),
1191 (String::from("container_name"), String::from("container"))
1192 ])))]
1193 #[case("abfss://filesystem@account.dfs.core.windows.net/foo.lance",
1194 Some(HashMap::from([
1195 (String::from("account_name"), String::from("account")),
1196 (String::from("container_name"), String::from("filesystem"))
1197 ])))]
1198 #[tokio::test]
1199 async fn test_block_size_used_cloud(
1200 #[case] uri: &str,
1201 #[case] storage_options: Option<HashMap<String, String>>,
1202 ) {
1203 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1204 }
1205
1206 #[rstest]
1207 #[case("file")]
1208 #[case("file-object-store")]
1209 #[case("memory:///bucket/foo.lance")]
1210 #[tokio::test]
1211 async fn test_block_size_used_file(#[case] prefix: &str) {
1212 let tmp_path = TempStrDir::default();
1213 let path = format!("{tmp_path}/bar/foo.lance/test_file");
1214 write_to_file(&path, "URL").unwrap();
1215 let uri = format!("{prefix}:///{path}");
1216 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1217 }
1218
1219 #[tokio::test]
1220 async fn test_relative_paths() {
1221 let tmp_path = TempStrDir::default();
1222 write_to_file(
1223 &format!("{tmp_path}/bar/foo.lance/test_file"),
1224 "RELATIVE_URL",
1225 )
1226 .unwrap();
1227
1228 set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1229 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1230
1231 let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1232 .await
1233 .unwrap();
1234 assert_eq!(contents, "RELATIVE_URL");
1235 }
1236
1237 #[tokio::test]
1238 async fn test_tilde_expansion() {
1239 let uri = "~/foo.lance";
1240 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1241 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1242 let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1243 .await
1244 .unwrap();
1245 assert_eq!(contents, "TILDE");
1246 }
1247
1248 #[tokio::test]
1249 async fn test_read_directory() {
1250 let path = TempStdDir::default();
1251 create_dir_all(path.join("foo").join("bar")).unwrap();
1252 create_dir_all(path.join("foo").join("zoo")).unwrap();
1253 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1254 write_to_file(
1255 path.join("foo").join("test_file").to_str().unwrap(),
1256 "read_dir",
1257 )
1258 .unwrap();
1259 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1260
1261 let sub_dirs = store.read_dir(base.clone().join("foo")).await.unwrap();
1262 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1263 }
1264
1265 #[tokio::test]
1266 async fn test_delete_directory_local_store() {
1267 test_delete_directory("").await;
1268 }
1269
1270 #[tokio::test]
1271 async fn test_delete_directory_file_object_store() {
1272 test_delete_directory("file-object-store").await;
1273 }
1274
1275 async fn test_delete_directory(scheme: &str) {
1276 let path = TempStdDir::default();
1277 create_dir_all(path.join("foo").join("bar")).unwrap();
1278 create_dir_all(path.join("foo").join("zoo")).unwrap();
1279 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1280 write_to_file(
1281 path.join("foo")
1282 .join("bar")
1283 .join("test_file")
1284 .to_str()
1285 .unwrap(),
1286 "delete",
1287 )
1288 .unwrap();
1289 let file_url = Url::from_directory_path(&path).unwrap();
1290 let url = if scheme.is_empty() {
1291 file_url
1292 } else {
1293 let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1294 url.set_path(file_url.path());
1296 url
1297 };
1298 let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1299 store
1300 .remove_dir_all(base.clone().join("foo"))
1301 .await
1302 .unwrap();
1303
1304 assert!(!path.join("foo").exists());
1305 }
1306
1307 #[derive(Debug)]
1308 struct TestWrapper {
1309 called: AtomicBool,
1310
1311 return_value: Arc<dyn OSObjectStore>,
1312 }
1313
1314 impl WrappingObjectStore for TestWrapper {
1315 fn wrap(
1316 &self,
1317 _store_prefix: &str,
1318 _original: Arc<dyn OSObjectStore>,
1319 ) -> Arc<dyn OSObjectStore> {
1320 self.called.store(true, Ordering::Relaxed);
1321
1322 self.return_value.clone()
1324 }
1325 }
1326
1327 impl TestWrapper {
1328 fn called(&self) -> bool {
1329 self.called.load(Ordering::Relaxed)
1330 }
1331 }
1332
1333 #[tokio::test]
1334 async fn test_wrapping_object_store_option_is_used() {
1335 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1337 let registry = Arc::new(ObjectStoreRegistry::default());
1338
1339 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1340
1341 let wrapper = Arc::new(TestWrapper {
1342 called: AtomicBool::new(false),
1343 return_value: mock_inner_store.clone(),
1344 });
1345
1346 let params = ObjectStoreParams {
1347 object_store_wrapper: Some(wrapper.clone()),
1348 ..ObjectStoreParams::default()
1349 };
1350
1351 assert!(!wrapper.called());
1353
1354 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
1355 .await
1356 .unwrap();
1357
1358 assert!(wrapper.called());
1360
1361 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1364 }
1365
1366 #[tokio::test]
1367 async fn test_local_paths() {
1368 let file_path = TempStdFile::default();
1369 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1370 writer.write_all(b"LOCAL").await.unwrap();
1371 Writer::shutdown(&mut writer).await.unwrap();
1372
1373 let reader = ObjectStore::open_local(&file_path).await.unwrap();
1374 let buf = reader.get_range(0..5).await.unwrap();
1375 assert_eq!(buf.as_ref(), b"LOCAL");
1376 }
1377
1378 #[tokio::test]
1379 async fn test_read_one() {
1380 let file_path = TempStdFile::default();
1381 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1382 writer.write_all(b"LOCAL").await.unwrap();
1383 Writer::shutdown(&mut writer).await.unwrap();
1384
1385 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1386 let obj_store = ObjectStore::local();
1387 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1388 assert_eq!(buf.as_ref(), b"LOCAL");
1389
1390 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1391 assert_eq!(buf.as_ref(), b"LOCAL");
1392 }
1393
1394 #[tokio::test]
1395 #[cfg(windows)]
1396 async fn test_windows_paths() {
1397 use std::path::Component;
1398 use std::path::Prefix;
1399 use std::path::Prefix::*;
1400
1401 fn get_path_prefix(path: &StdPath) -> Prefix<'_> {
1402 match path.components().next().unwrap() {
1403 Component::Prefix(prefix_component) => prefix_component.kind(),
1404 _ => panic!(),
1405 }
1406 }
1407
1408 fn get_drive_letter(prefix: Prefix) -> String {
1409 match prefix {
1410 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1411 _ => panic!(),
1412 }
1413 }
1414
1415 let tmp_path = TempStdFile::default();
1416 let prefix = get_path_prefix(&tmp_path);
1417 let drive_letter = get_drive_letter(prefix);
1418
1419 write_to_file(
1420 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1421 "WINDOWS",
1422 )
1423 .unwrap();
1424
1425 for uri in &[
1426 format!("{drive_letter}:/test_folder/test.lance"),
1427 format!("{drive_letter}:\\test_folder\\test.lance"),
1428 ] {
1429 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1430 let contents = read_from_store(store.as_ref(), &base.clone().join("test_file"))
1431 .await
1432 .unwrap();
1433 assert_eq!(contents, "WINDOWS");
1434 }
1435 }
1436
1437 #[tokio::test]
1438 async fn test_cross_filesystem_copy() {
1439 let source_dir = TempStdDir::default();
1441 let dest_dir = TempStdDir::default();
1442
1443 let source_file_name = "test_file.txt";
1445 let source_file = source_dir.join(source_file_name);
1446 std::fs::write(&source_file, b"test content").unwrap();
1447
1448 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1450 .await
1451 .unwrap();
1452
1453 let from_path = base_path.clone().join(source_file_name);
1455
1456 let dest_file = dest_dir.join("copied_file.txt");
1458 let dest_str = dest_file.to_str().unwrap();
1459 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1460
1461 store.copy(&from_path, &to_path).await.unwrap();
1463
1464 assert!(dest_file.exists());
1466 let copied_content = std::fs::read(&dest_file).unwrap();
1467 assert_eq!(copied_content, b"test content");
1468 }
1469
1470 #[tokio::test]
1471 async fn test_copy_creates_parent_directories() {
1472 let source_dir = TempStdDir::default();
1473 let dest_dir = TempStdDir::default();
1474
1475 let source_file_name = "test_file.txt";
1477 let source_file = source_dir.join(source_file_name);
1478 std::fs::write(&source_file, b"test content").unwrap();
1479
1480 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1482 .await
1483 .unwrap();
1484
1485 let from_path = base_path.clone().join(source_file_name);
1487
1488 let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1490 let dest_str = dest_file.to_str().unwrap();
1491 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1492
1493 store.copy(&from_path, &to_path).await.unwrap();
1495
1496 assert!(dest_file.exists());
1498 assert!(dest_file.parent().unwrap().exists());
1499 let copied_content = std::fs::read(&dest_file).unwrap();
1500 assert_eq!(copied_content, b"test content");
1501 }
1502
1503 #[test]
1504 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1505 fn test_client_options_extracts_headers() {
1506 let opts = StorageOptions(HashMap::from([
1507 ("headers.x-custom-foo".to_string(), "bar".to_string()),
1508 ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1509 ("region".to_string(), "us-west-2".to_string()),
1510 ]));
1511 let client_options = opts.client_options().unwrap();
1512
1513 let opts_no_headers = StorageOptions(HashMap::from([(
1516 "region".to_string(),
1517 "us-west-2".to_string(),
1518 )]));
1519 opts_no_headers.client_options().unwrap();
1520
1521 #[cfg(feature = "gcp")]
1525 {
1526 use object_store::gcp::GoogleCloudStorageBuilder;
1527 let _builder = GoogleCloudStorageBuilder::new()
1528 .with_client_options(client_options)
1529 .with_url("gs://test-bucket");
1530 }
1531 }
1532
1533 #[test]
1534 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1535 fn test_client_options_rejects_invalid_header_name() {
1536 let opts = StorageOptions(HashMap::from([(
1537 "headers.bad header".to_string(),
1538 "value".to_string(),
1539 )]));
1540 let err = opts.client_options().unwrap_err();
1541 assert!(err.to_string().contains("invalid header name"));
1542 }
1543
1544 #[test]
1545 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1546 fn test_client_options_rejects_invalid_header_value() {
1547 let opts = StorageOptions(HashMap::from([(
1548 "headers.x-good-name".to_string(),
1549 "bad\x01value".to_string(),
1550 )]));
1551 let err = opts.client_options().unwrap_err();
1552 assert!(err.to_string().contains("invalid header value"));
1553 }
1554
1555 #[test]
1556 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1557 fn test_client_options_empty_when_no_header_keys() {
1558 let opts = StorageOptions(HashMap::from([
1559 ("region".to_string(), "us-east-1".to_string()),
1560 ("access_key_id".to_string(), "AKID".to_string()),
1561 ]));
1562 opts.client_options().unwrap();
1563 }
1564}