1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{FutureExt, Stream};
18use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::Error as ObjectStoreError;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35#[cfg(target_os = "linux")]
36use crate::uring::{UringCurrentThreadReader, UringReader};
37#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
38pub(crate) mod dynamic_credentials;
39#[cfg(any(feature = "oss", feature = "huggingface"))]
40pub(crate) mod dynamic_opendal;
41mod list_retry;
42pub mod providers;
43pub mod storage_options;
44#[cfg(test)]
45pub(crate) mod test_utils;
46pub mod throttle;
47mod tracing;
48use crate::object_reader::SmallReader;
49use crate::object_writer::{LocalWriter, WriteResult};
50use crate::traits::Writer;
51use crate::utils::tracking_store::{IOTracker, IoStats};
52use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
53use lance_core::{Error, Result};
54
55pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
60pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
62
63const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
65const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
68 std::env::var("LANCE_MAX_IOP_SIZE")
69 .map(|val| val.parse().unwrap())
70 .unwrap_or(16 * 1024 * 1024)
71});
72
73pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
74
75pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
76pub use storage_options::{
77 EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
78 StorageOptionsAccessor, StorageOptionsProvider,
79};
80
81#[async_trait]
82pub trait ObjectStoreExt {
83 async fn exists(&self, path: &Path) -> Result<bool>;
85
86 fn read_dir_all<'a, 'b>(
90 &'a self,
91 dir_path: impl Into<&'b Path> + Send,
92 unmodified_since: Option<DateTime<Utc>>,
93 ) -> BoxStream<'a, Result<ObjectMeta>>;
94}
95
96#[async_trait]
97impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
98 fn read_dir_all<'a, 'b>(
99 &'a self,
100 dir_path: impl Into<&'b Path> + Send,
101 unmodified_since: Option<DateTime<Utc>>,
102 ) -> BoxStream<'a, Result<ObjectMeta>> {
103 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
104 if let Some(unmodified_since_val) = unmodified_since {
105 output
106 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
107 .boxed()
108 } else {
109 output.boxed()
110 }
111 }
112
113 async fn exists(&self, path: &Path) -> Result<bool> {
114 match self.head(path).await {
115 Ok(_) => Ok(true),
116 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
117 Err(e) => Err(e.into()),
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct ObjectStore {
125 pub inner: Arc<dyn OSObjectStore>,
127 scheme: String,
128 block_size: usize,
129 max_iop_size: u64,
130 pub use_constant_size_upload_parts: bool,
133 pub list_is_lexically_ordered: bool,
136 io_parallelism: usize,
137 download_retry_count: usize,
139 io_tracker: IOTracker,
141 pub store_prefix: String,
145}
146
147impl DeepSizeOf for ObjectStore {
148 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
149 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
154 }
155}
156
157impl std::fmt::Display for ObjectStore {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 write!(f, "ObjectStore({})", self.scheme)
160 }
161}
162
163pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
164 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
169}
170
171#[derive(Debug, Clone)]
172pub struct ChainedWrappingObjectStore {
173 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
174}
175
176impl ChainedWrappingObjectStore {
177 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
178 Self { wrappers }
179 }
180
181 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
182 self.wrappers.push(wrapper);
183 }
184}
185
186impl WrappingObjectStore for ChainedWrappingObjectStore {
187 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
188 self.wrappers
189 .iter()
190 .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
191 }
192}
193
194#[derive(Debug, Clone)]
197pub struct ObjectStoreParams {
198 pub block_size: Option<usize>,
199 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
200 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
201 pub s3_credentials_refresh_offset: Duration,
204 #[cfg(feature = "aws")]
205 pub aws_credentials: Option<AwsCredentialProvider>,
206 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
207 pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
213 pub use_constant_size_upload_parts: bool,
218 pub list_is_lexically_ordered: Option<bool>,
219}
220
221impl Default for ObjectStoreParams {
222 fn default() -> Self {
223 #[allow(deprecated)]
224 Self {
225 object_store: None,
226 block_size: None,
227 s3_credentials_refresh_offset: Duration::from_secs(60),
228 #[cfg(feature = "aws")]
229 aws_credentials: None,
230 object_store_wrapper: None,
231 storage_options_accessor: None,
232 use_constant_size_upload_parts: false,
233 list_is_lexically_ordered: None,
234 }
235 }
236}
237
238impl ObjectStoreParams {
239 pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
241 self.storage_options_accessor.clone()
242 }
243
244 pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
248 self.storage_options_accessor
249 .as_ref()
250 .and_then(|a| a.initial_storage_options())
251 }
252}
253
254impl std::hash::Hash for ObjectStoreParams {
256 #[allow(deprecated)]
257 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
258 self.block_size.hash(state);
260 if let Some((store, url)) = &self.object_store {
261 Arc::as_ptr(store).hash(state);
262 url.hash(state);
263 }
264 self.s3_credentials_refresh_offset.hash(state);
265 #[cfg(feature = "aws")]
266 if let Some(aws_credentials) = &self.aws_credentials {
267 Arc::as_ptr(aws_credentials).hash(state);
268 }
269 if let Some(wrapper) = &self.object_store_wrapper {
270 Arc::as_ptr(wrapper).hash(state);
271 }
272 if let Some(accessor) = &self.storage_options_accessor {
273 accessor.accessor_id().hash(state);
274 }
275 self.use_constant_size_upload_parts.hash(state);
276 self.list_is_lexically_ordered.hash(state);
277 }
278}
279
280impl Eq for ObjectStoreParams {}
282impl PartialEq for ObjectStoreParams {
283 #[allow(deprecated)]
284 fn eq(&self, other: &Self) -> bool {
285 #[cfg(feature = "aws")]
286 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
287 return false;
288 }
289
290 self.block_size == other.block_size
293 && self
294 .object_store
295 .as_ref()
296 .map(|(store, url)| (Arc::as_ptr(store), url))
297 == other
298 .object_store
299 .as_ref()
300 .map(|(store, url)| (Arc::as_ptr(store), url))
301 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
302 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
303 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
304 && self
305 .storage_options_accessor
306 .as_ref()
307 .map(|a| a.accessor_id())
308 == other
309 .storage_options_accessor
310 .as_ref()
311 .map(|a| a.accessor_id())
312 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
313 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
314 }
315}
316
317pub fn uri_to_url(uri: &str) -> Result<Url> {
338 match Url::parse(uri) {
339 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
340 local_path_to_url(uri)
342 }
343 Ok(url) => Ok(url),
344 Err(_) => local_path_to_url(uri),
345 }
346}
347
348fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
349 let str_path = str_path.as_ref();
350 let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
351
352 let mut expanded_path = path_abs::PathAbs::new(expanded)
353 .unwrap()
354 .as_path()
355 .to_path_buf();
356 if let Some(s) = expanded_path.as_path().to_str()
358 && s.is_empty()
359 {
360 expanded_path = std::env::current_dir()?;
361 }
362
363 Ok(expanded_path)
364}
365
366fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
367 let home_dir = std::env::home_dir()?;
368 if path == "~" {
369 return Some(home_dir);
370 }
371 if let Some(stripped) = path.strip_prefix("~/") {
372 return Some(home_dir.join(stripped));
373 }
374 #[cfg(windows)]
375 if let Some(stripped) = path.strip_prefix("~\\") {
376 return Some(home_dir.join(stripped));
377 }
378
379 None
380}
381
382fn local_path_to_url(str_path: &str) -> Result<Url> {
383 let expanded_path = expand_path(str_path)?;
384
385 Url::from_directory_path(expanded_path).map_err(|_| {
386 Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
387 })
388}
389
390#[cfg(feature = "huggingface")]
391fn parse_hf_repo_id(url: &Url) -> Result<String> {
392 let mut segments: Vec<String> = Vec::new();
394 if let Some(host) = url.host_str() {
395 segments.push(host.to_string());
396 }
397 segments.extend(
398 url.path()
399 .trim_start_matches('/')
400 .split('/')
401 .map(|s| s.to_string()),
402 );
403
404 if segments.len() < 2 {
405 return Err(Error::invalid_input(
406 "Huggingface URL must contain at least owner and repo",
407 ));
408 }
409
410 let repo_type_candidates = ["models", "datasets", "spaces"];
411 let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
412 if segments.len() < 3 {
413 return Err(Error::invalid_input(
414 "Huggingface URL missing owner/repo after repo type",
415 ));
416 }
417 (segments[1].as_str(), segments[2].as_str())
418 } else {
419 (segments[0].as_str(), segments[1].as_str())
420 };
421
422 let repo = repo_with_rev
423 .split_once('@')
424 .map(|(r, _)| r)
425 .unwrap_or(repo_with_rev);
426 Ok(format!("{owner}/{repo}"))
427}
428
429impl ObjectStore {
430 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
438 let registry = Arc::new(ObjectStoreRegistry::default());
439
440 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
441 }
442
443 pub async fn from_uri_and_params(
447 registry: Arc<ObjectStoreRegistry>,
448 uri: &str,
449 params: &ObjectStoreParams,
450 ) -> Result<(Arc<Self>, Path)> {
451 #[allow(deprecated)]
452 if let Some((store, path)) = params.object_store.as_ref() {
453 let mut inner = store.clone();
454 let store_prefix =
455 registry.calculate_object_store_prefix(uri, params.storage_options())?;
456 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
457 inner = wrapper.wrap(&store_prefix, inner);
458 }
459
460 let io_tracker = IOTracker::default();
462 let tracked_store = io_tracker.wrap("", inner);
463
464 let store = Self {
465 inner: tracked_store,
466 scheme: path.scheme().to_string(),
467 block_size: params.block_size.unwrap_or(64 * 1024),
468 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
469 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
470 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
471 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
472 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
473 io_tracker,
474 store_prefix,
475 };
476 let path = Path::parse(path.path())?;
477 return Ok((Arc::new(store), path));
478 }
479 let url = uri_to_url(uri)?;
480
481 let store = registry.get_store(url.clone(), params).await?;
482 let provider = registry.get_provider(url.scheme()).expect_ok()?;
484 let path = provider.extract_path(&url)?;
485
486 Ok((store, path))
487 }
488
489 pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
503 let url = uri_to_url(uri)?;
504 let provider = registry
505 .get_provider(url.scheme())
506 .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
507 provider.extract_path(&url)
508 }
509
510 #[deprecated(note = "Use `from_uri` instead")]
511 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
512 Self::from_uri_and_params(
513 Arc::new(ObjectStoreRegistry::default()),
514 str_path,
515 &Default::default(),
516 )
517 .now_or_never()
518 .unwrap()
519 }
520
521 pub fn local() -> Self {
523 let provider = FileStoreProvider;
524 provider
525 .new_store(Url::parse("file:///").unwrap(), &Default::default())
526 .now_or_never()
527 .unwrap()
528 .unwrap()
529 }
530
531 pub fn memory() -> Self {
533 let provider = MemoryStoreProvider;
534 provider
535 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
536 .now_or_never()
537 .unwrap()
538 .unwrap()
539 }
540
541 pub fn is_local(&self) -> bool {
543 self.scheme == "file" || self.scheme == "file+uring"
544 }
545
546 pub fn is_cloud(&self) -> bool {
547 !self.is_local() && self.scheme != "memory"
548 }
549
550 pub fn prefers_lite_scheduler(&self) -> bool {
555 self.scheme == "file+uring"
556 }
557
558 pub fn scheme(&self) -> &str {
559 &self.scheme
560 }
561
562 pub fn block_size(&self) -> usize {
563 self.block_size
564 }
565
566 pub fn max_iop_size(&self) -> u64 {
567 self.max_iop_size
568 }
569
570 pub fn io_parallelism(&self) -> usize {
571 std::env::var("LANCE_IO_THREADS")
572 .map(|val| val.parse::<usize>().unwrap())
573 .unwrap_or(self.io_parallelism)
574 }
575
576 pub fn io_tracker(&self) -> &IOTracker {
581 &self.io_tracker
582 }
583
584 pub fn io_stats_snapshot(&self) -> IoStats {
589 self.io_tracker.stats()
590 }
591
592 pub fn io_stats_incremental(&self) -> IoStats {
598 self.io_tracker.incremental_stats()
599 }
600
601 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
606 match self.scheme.as_str() {
607 "file" => {
608 LocalObjectReader::open_with_tracker(
609 path,
610 self.block_size,
611 None,
612 Arc::new(self.io_tracker.clone()),
613 )
614 .await
615 }
616 #[cfg(target_os = "linux")]
617 "file+uring" => {
618 let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
620 .map(|v| str_is_truthy(&v))
621 .unwrap_or(false);
622
623 if use_current_thread {
624 UringCurrentThreadReader::open(
625 path,
626 self.block_size,
627 None,
628 Arc::new(self.io_tracker.clone()),
629 )
630 .await
631 } else {
632 UringReader::open(
633 path,
634 self.block_size,
635 None,
636 Arc::new(self.io_tracker.clone()),
637 )
638 .await
639 }
640 }
641 _ => Ok(Box::new(CloudObjectReader::new(
642 self.inner.clone(),
643 path.clone(),
644 self.block_size,
645 None,
646 self.download_retry_count,
647 )?)),
648 }
649 }
650
651 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
657 if known_size <= self.block_size {
660 return Ok(Box::new(SmallReader::new(
661 self.inner.clone(),
662 path.clone(),
663 self.download_retry_count,
664 known_size,
665 )));
666 }
667
668 match self.scheme.as_str() {
669 "file" => {
670 LocalObjectReader::open_with_tracker(
671 path,
672 self.block_size,
673 Some(known_size),
674 Arc::new(self.io_tracker.clone()),
675 )
676 .await
677 }
678 #[cfg(target_os = "linux")]
679 "file+uring" => {
680 let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
682 .map(|v| str_is_truthy(&v))
683 .unwrap_or(false);
684
685 if use_current_thread {
686 UringCurrentThreadReader::open(
687 path,
688 self.block_size,
689 Some(known_size),
690 Arc::new(self.io_tracker.clone()),
691 )
692 .await
693 } else {
694 UringReader::open(
695 path,
696 self.block_size,
697 Some(known_size),
698 Arc::new(self.io_tracker.clone()),
699 )
700 .await
701 }
702 }
703 _ => Ok(Box::new(CloudObjectReader::new(
704 self.inner.clone(),
705 path.clone(),
706 self.block_size,
707 Some(known_size),
708 self.download_retry_count,
709 )?)),
710 }
711 }
712
713 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
715 let object_store = Self::local();
716 let absolute_path = expand_path(path.to_string_lossy())?;
717 let os_path = Path::from_absolute_path(absolute_path)?;
718 ObjectWriter::new(&object_store, &os_path).await
719 }
720
721 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
723 let object_store = Self::local();
724 let absolute_path = expand_path(path.to_string_lossy())?;
725 let os_path = Path::from_absolute_path(absolute_path)?;
726 object_store.open(&os_path).await
727 }
728
729 pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
731 match self.scheme.as_str() {
732 "file" => {
733 let local_path = super::local::to_local_path(path);
734 let local_path = std::path::PathBuf::from(&local_path);
735 if let Some(parent) = local_path.parent() {
736 tokio::fs::create_dir_all(parent).await?;
737 }
738 let parent = local_path
739 .parent()
740 .expect("file path must have parent")
741 .to_owned();
742 let named_temp =
743 tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
744 .await
745 .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
746 let (std_file, temp_path) = named_temp.into_parts();
747 let file = tokio::fs::File::from_std(std_file);
748 Ok(Box::new(LocalWriter::new(
749 file,
750 path.clone(),
751 temp_path,
752 Arc::new(self.io_tracker.clone()),
753 )))
754 }
755 _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
756 }
757 }
758
759 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
761 let mut writer = self.create(path).await?;
762 writer.write_all(content).await?;
763 Writer::shutdown(writer.as_mut()).await
764 }
765
766 pub async fn delete(&self, path: &Path) -> Result<()> {
767 self.inner.delete(path).await?;
768 Ok(())
769 }
770
771 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
772 if self.is_local() {
773 return super::local::copy_file(from, to);
775 }
776 Ok(self.inner.copy(from, to).await?)
777 }
778
779 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
781 let path = dir_path.into();
782 let path = Path::parse(&path)?;
783 let output = self.inner.list_with_delimiter(Some(&path)).await?;
784 Ok(output
785 .common_prefixes
786 .iter()
787 .chain(output.objects.iter().map(|o| &o.location))
788 .map(|s| s.filename().unwrap().to_string())
789 .collect())
790 }
791
792 pub fn list(
793 &self,
794 path: Option<Path>,
795 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
796 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
797 }
798
799 pub fn read_dir_all<'a, 'b>(
803 &'a self,
804 dir_path: impl Into<&'b Path> + Send,
805 unmodified_since: Option<DateTime<Utc>>,
806 ) -> BoxStream<'a, Result<ObjectMeta>> {
807 self.inner.read_dir_all(dir_path, unmodified_since)
808 }
809
810 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
812 let path = dir_path.into();
813 let path = Path::parse(&path)?;
814
815 if self.is_local() {
816 return super::local::remove_dir_all(&path);
818 }
819 let sub_entries = self
820 .inner
821 .list(Some(&path))
822 .map(|m| m.map(|meta| meta.location))
823 .boxed();
824 self.inner
825 .delete_stream(sub_entries)
826 .try_collect::<Vec<_>>()
827 .await?;
828 if self.scheme == "file-object-store" {
829 return super::local::remove_dir_all(&path);
832 }
833 Ok(())
834 }
835
836 pub fn remove_stream<'a>(
837 &'a self,
838 locations: BoxStream<'a, Result<Path>>,
839 ) -> BoxStream<'a, Result<Path>> {
840 self.inner
841 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
842 .err_into::<Error>()
843 .boxed()
844 }
845
846 pub async fn exists(&self, path: &Path) -> Result<bool> {
848 match self.inner.head(path).await {
849 Ok(_) => Ok(true),
850 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
851 Err(e) => Err(e.into()),
852 }
853 }
854
855 pub async fn size(&self, path: &Path) -> Result<u64> {
857 Ok(self.inner.head(path).await?.size)
858 }
859
860 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
862 let reader = self.open(path).await?;
863 Ok(reader.get_all().await?)
864 }
865
866 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
871 let reader = self.open(path).await?;
872 Ok(reader.get_range(range).await?)
873 }
874}
875
876#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
878pub enum LanceConfigKey {
879 DownloadRetryCount,
881}
882
883impl FromStr for LanceConfigKey {
884 type Err = Error;
885
886 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
887 match s.to_ascii_lowercase().as_str() {
888 "download_retry_count" => Ok(Self::DownloadRetryCount),
889 _ => Err(Error::invalid_input_source(
890 format!("Invalid LanceConfigKey: {}", s).into(),
891 )),
892 }
893 }
894}
895
896#[derive(Clone, Debug, Default)]
897pub struct StorageOptions(pub HashMap<String, String>);
898
899impl StorageOptions {
900 pub fn new(options: HashMap<String, String>) -> Self {
902 let mut options = options;
903 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
904 options.insert("allow_http".into(), value);
905 }
906 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
907 options.insert("allow_http".into(), value);
908 }
909 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
910 options.insert("allow_http".into(), value);
911 }
912 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
913 options.insert("client_max_retries".into(), value);
914 }
915 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
916 options.insert("client_retry_timeout".into(), value);
917 }
918 Self(options)
919 }
920
921 pub fn allow_http(&self) -> bool {
923 self.0.iter().any(|(key, value)| {
924 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
925 })
926 }
927
928 pub fn download_retry_count(&self) -> usize {
930 self.0
931 .iter()
932 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
933 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
934 .unwrap_or(3)
935 }
936
937 pub fn client_max_retries(&self) -> usize {
939 self.0
940 .iter()
941 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
942 .and_then(|(_, value)| value.parse::<usize>().ok())
943 .unwrap_or(3)
944 }
945
946 pub fn client_retry_timeout(&self) -> u64 {
948 self.0
949 .iter()
950 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
951 .and_then(|(_, value)| value.parse::<u64>().ok())
952 .unwrap_or(180)
953 }
954
955 pub fn get(&self, key: &str) -> Option<&String> {
956 self.0.get(key)
957 }
958
959 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
967 pub fn client_options(&self) -> Result<ClientOptions> {
968 let mut headers = HeaderMap::new();
969 for (key, value) in &self.0 {
970 if let Some(header_name) = key.strip_prefix("headers.") {
971 let name = header_name
972 .parse::<http::header::HeaderName>()
973 .map_err(|e| {
974 Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
975 })?;
976 let val = HeaderValue::from_str(value).map_err(|e| {
977 Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
978 })?;
979 headers.insert(name, val);
980 }
981 }
982 let mut client_options = ClientOptions::default();
983 if !headers.is_empty() {
984 client_options = client_options.with_default_headers(headers);
985 }
986 Ok(client_options)
987 }
988
989 pub fn expires_at_millis(&self) -> Option<u64> {
991 self.0
992 .get(EXPIRES_AT_MILLIS_KEY)
993 .and_then(|s| s.parse::<u64>().ok())
994 }
995}
996
997impl From<HashMap<String, String>> for StorageOptions {
998 fn from(value: HashMap<String, String>) -> Self {
999 Self::new(value)
1000 }
1001}
1002
1003static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
1004 std::sync::LazyLock::new(ObjectStoreRegistry::default);
1005
1006impl ObjectStore {
1007 #[allow(clippy::too_many_arguments)]
1008 pub fn new(
1009 store: Arc<DynObjectStore>,
1010 location: Url,
1011 block_size: Option<usize>,
1012 wrapper: Option<Arc<dyn WrappingObjectStore>>,
1013 use_constant_size_upload_parts: bool,
1014 list_is_lexically_ordered: bool,
1015 io_parallelism: usize,
1016 download_retry_count: usize,
1017 storage_options: Option<&HashMap<String, String>>,
1018 ) -> Self {
1019 let scheme = location.scheme();
1020 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
1021 let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
1022 Some(provider) => provider
1023 .calculate_object_store_prefix(&location, storage_options)
1024 .unwrap(),
1025 None => {
1026 let store_prefix = format!("{}${}", location.scheme(), location.authority());
1027 log::warn!(
1028 "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
1029 store_prefix
1030 );
1031 store_prefix
1032 }
1033 };
1034 let store = match wrapper {
1035 Some(wrapper) => wrapper.wrap(&store_prefix, store),
1036 None => store,
1037 };
1038
1039 let io_tracker = IOTracker::default();
1041 let tracked_store = io_tracker.wrap("", store);
1042
1043 Self {
1044 inner: tracked_store,
1045 scheme: scheme.into(),
1046 block_size,
1047 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
1048 use_constant_size_upload_parts,
1049 list_is_lexically_ordered,
1050 io_parallelism,
1051 download_retry_count,
1052 io_tracker,
1053 store_prefix,
1054 }
1055 }
1056}
1057
1058fn infer_block_size(scheme: &str) -> usize {
1059 match scheme {
1063 "file" => 4 * 1024,
1064 _ => 64 * 1024,
1065 }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070 use super::*;
1071 use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1072 use object_store::memory::InMemory;
1073 use rstest::rstest;
1074 use std::env::set_current_dir;
1075 use std::fs::{create_dir_all, write};
1076 use std::path::Path as StdPath;
1077 use std::sync::atomic::{AtomicBool, Ordering};
1078
1079 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1081 let path = expand_path(path_str).map_err(std::io::Error::other)?;
1082 std::fs::create_dir_all(path.parent().unwrap())?;
1083 write(path, contents)
1084 }
1085
1086 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1087 let test_file_store = store.open(path).await.unwrap();
1088 let size = test_file_store.size().await.unwrap();
1089 let bytes = test_file_store.get_range(0..size).await.unwrap();
1090 let contents = String::from_utf8(bytes.to_vec()).unwrap();
1091 Ok(contents)
1092 }
1093
1094 #[tokio::test]
1095 async fn test_absolute_paths() {
1096 let tmp_path = TempStrDir::default();
1097 write_to_file(
1098 &format!("{tmp_path}/bar/foo.lance/test_file"),
1099 "TEST_CONTENT",
1100 )
1101 .unwrap();
1102
1103 for uri in &[
1105 format!("{tmp_path}/bar/foo.lance"),
1106 format!("{tmp_path}/./bar/foo.lance"),
1107 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1108 ] {
1109 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1110 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1111 .await
1112 .unwrap();
1113 assert_eq!(contents, "TEST_CONTENT");
1114 }
1115 }
1116
1117 #[tokio::test]
1118 async fn test_cloud_paths() {
1119 let uri = "s3://bucket/foo.lance";
1120 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1121 assert_eq!(store.scheme, "s3");
1122 assert_eq!(path.to_string(), "foo.lance");
1123
1124 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1125 .await
1126 .unwrap();
1127 assert_eq!(store.scheme, "s3");
1128 assert_eq!(path.to_string(), "foo.lance");
1129
1130 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1131 .await
1132 .unwrap();
1133 assert_eq!(store.scheme, "gs");
1134 assert_eq!(path.to_string(), "foo.lance");
1135
1136 let (store, path) =
1137 ObjectStore::from_uri("abfss://filesystem@account.dfs.core.windows.net/foo.lance")
1138 .await
1139 .unwrap();
1140 assert_eq!(store.scheme, "abfss");
1141 assert_eq!(path.to_string(), "foo.lance");
1142 }
1143
1144 async fn test_block_size_used_test_helper(
1145 uri: &str,
1146 storage_options: Option<HashMap<String, String>>,
1147 default_expected_block_size: usize,
1148 ) {
1149 let registry = Arc::new(ObjectStoreRegistry::default());
1151 let accessor = storage_options
1152 .clone()
1153 .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1154 let params = ObjectStoreParams {
1155 storage_options_accessor: accessor.clone(),
1156 ..ObjectStoreParams::default()
1157 };
1158 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1159 .await
1160 .unwrap();
1161 assert_eq!(store.block_size, default_expected_block_size);
1162
1163 let registry = Arc::new(ObjectStoreRegistry::default());
1165 let params = ObjectStoreParams {
1166 block_size: Some(1024),
1167 storage_options_accessor: accessor,
1168 ..ObjectStoreParams::default()
1169 };
1170 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1171 .await
1172 .unwrap();
1173 assert_eq!(store.block_size, 1024);
1174 }
1175
1176 #[rstest]
1177 #[case("s3://bucket/foo.lance", None)]
1178 #[case("gs://bucket/foo.lance", None)]
1179 #[case("az://account/bucket/foo.lance",
1180 Some(HashMap::from([
1181 (String::from("account_name"), String::from("account")),
1182 (String::from("container_name"), String::from("container"))
1183 ])))]
1184 #[case("abfss://filesystem@account.dfs.core.windows.net/foo.lance",
1185 Some(HashMap::from([
1186 (String::from("account_name"), String::from("account")),
1187 (String::from("container_name"), String::from("filesystem"))
1188 ])))]
1189 #[tokio::test]
1190 async fn test_block_size_used_cloud(
1191 #[case] uri: &str,
1192 #[case] storage_options: Option<HashMap<String, String>>,
1193 ) {
1194 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1195 }
1196
1197 #[rstest]
1198 #[case("file")]
1199 #[case("file-object-store")]
1200 #[case("memory:///bucket/foo.lance")]
1201 #[tokio::test]
1202 async fn test_block_size_used_file(#[case] prefix: &str) {
1203 let tmp_path = TempStrDir::default();
1204 let path = format!("{tmp_path}/bar/foo.lance/test_file");
1205 write_to_file(&path, "URL").unwrap();
1206 let uri = format!("{prefix}:///{path}");
1207 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1208 }
1209
1210 #[tokio::test]
1211 async fn test_relative_paths() {
1212 let tmp_path = TempStrDir::default();
1213 write_to_file(
1214 &format!("{tmp_path}/bar/foo.lance/test_file"),
1215 "RELATIVE_URL",
1216 )
1217 .unwrap();
1218
1219 set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1220 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1221
1222 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1223 .await
1224 .unwrap();
1225 assert_eq!(contents, "RELATIVE_URL");
1226 }
1227
1228 #[tokio::test]
1229 async fn test_tilde_expansion() {
1230 let uri = "~/foo.lance";
1231 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1232 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1233 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1234 .await
1235 .unwrap();
1236 assert_eq!(contents, "TILDE");
1237 }
1238
1239 #[tokio::test]
1240 async fn test_read_directory() {
1241 let path = TempStdDir::default();
1242 create_dir_all(path.join("foo").join("bar")).unwrap();
1243 create_dir_all(path.join("foo").join("zoo")).unwrap();
1244 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1245 write_to_file(
1246 path.join("foo").join("test_file").to_str().unwrap(),
1247 "read_dir",
1248 )
1249 .unwrap();
1250 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1251
1252 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1253 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1254 }
1255
1256 #[tokio::test]
1257 async fn test_delete_directory_local_store() {
1258 test_delete_directory("").await;
1259 }
1260
1261 #[tokio::test]
1262 async fn test_delete_directory_file_object_store() {
1263 test_delete_directory("file-object-store").await;
1264 }
1265
1266 async fn test_delete_directory(scheme: &str) {
1267 let path = TempStdDir::default();
1268 create_dir_all(path.join("foo").join("bar")).unwrap();
1269 create_dir_all(path.join("foo").join("zoo")).unwrap();
1270 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1271 write_to_file(
1272 path.join("foo")
1273 .join("bar")
1274 .join("test_file")
1275 .to_str()
1276 .unwrap(),
1277 "delete",
1278 )
1279 .unwrap();
1280 let file_url = Url::from_directory_path(&path).unwrap();
1281 let url = if scheme.is_empty() {
1282 file_url
1283 } else {
1284 let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1285 url.set_path(file_url.path());
1287 url
1288 };
1289 let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1290 store.remove_dir_all(base.child("foo")).await.unwrap();
1291
1292 assert!(!path.join("foo").exists());
1293 }
1294
1295 #[derive(Debug)]
1296 struct TestWrapper {
1297 called: AtomicBool,
1298
1299 return_value: Arc<dyn OSObjectStore>,
1300 }
1301
1302 impl WrappingObjectStore for TestWrapper {
1303 fn wrap(
1304 &self,
1305 _store_prefix: &str,
1306 _original: Arc<dyn OSObjectStore>,
1307 ) -> Arc<dyn OSObjectStore> {
1308 self.called.store(true, Ordering::Relaxed);
1309
1310 self.return_value.clone()
1312 }
1313 }
1314
1315 impl TestWrapper {
1316 fn called(&self) -> bool {
1317 self.called.load(Ordering::Relaxed)
1318 }
1319 }
1320
1321 #[tokio::test]
1322 async fn test_wrapping_object_store_option_is_used() {
1323 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1325 let registry = Arc::new(ObjectStoreRegistry::default());
1326
1327 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1328
1329 let wrapper = Arc::new(TestWrapper {
1330 called: AtomicBool::new(false),
1331 return_value: mock_inner_store.clone(),
1332 });
1333
1334 let params = ObjectStoreParams {
1335 object_store_wrapper: Some(wrapper.clone()),
1336 ..ObjectStoreParams::default()
1337 };
1338
1339 assert!(!wrapper.called());
1341
1342 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
1343 .await
1344 .unwrap();
1345
1346 assert!(wrapper.called());
1348
1349 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1352 }
1353
1354 #[tokio::test]
1355 async fn test_local_paths() {
1356 let file_path = TempStdFile::default();
1357 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1358 writer.write_all(b"LOCAL").await.unwrap();
1359 Writer::shutdown(&mut writer).await.unwrap();
1360
1361 let reader = ObjectStore::open_local(&file_path).await.unwrap();
1362 let buf = reader.get_range(0..5).await.unwrap();
1363 assert_eq!(buf.as_ref(), b"LOCAL");
1364 }
1365
1366 #[tokio::test]
1367 async fn test_read_one() {
1368 let file_path = TempStdFile::default();
1369 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1370 writer.write_all(b"LOCAL").await.unwrap();
1371 Writer::shutdown(&mut writer).await.unwrap();
1372
1373 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1374 let obj_store = ObjectStore::local();
1375 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1376 assert_eq!(buf.as_ref(), b"LOCAL");
1377
1378 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1379 assert_eq!(buf.as_ref(), b"LOCAL");
1380 }
1381
1382 #[tokio::test]
1383 #[cfg(windows)]
1384 async fn test_windows_paths() {
1385 use std::path::Component;
1386 use std::path::Prefix;
1387 use std::path::Prefix::*;
1388
1389 fn get_path_prefix(path: &StdPath) -> Prefix {
1390 match path.components().next().unwrap() {
1391 Component::Prefix(prefix_component) => prefix_component.kind(),
1392 _ => panic!(),
1393 }
1394 }
1395
1396 fn get_drive_letter(prefix: Prefix) -> String {
1397 match prefix {
1398 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1399 _ => panic!(),
1400 }
1401 }
1402
1403 let tmp_path = TempStdFile::default();
1404 let prefix = get_path_prefix(&tmp_path);
1405 let drive_letter = get_drive_letter(prefix);
1406
1407 write_to_file(
1408 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1409 "WINDOWS",
1410 )
1411 .unwrap();
1412
1413 for uri in &[
1414 format!("{drive_letter}:/test_folder/test.lance"),
1415 format!("{drive_letter}:\\test_folder\\test.lance"),
1416 ] {
1417 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1418 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1419 .await
1420 .unwrap();
1421 assert_eq!(contents, "WINDOWS");
1422 }
1423 }
1424
1425 #[tokio::test]
1426 async fn test_cross_filesystem_copy() {
1427 let source_dir = TempStdDir::default();
1429 let dest_dir = TempStdDir::default();
1430
1431 let source_file_name = "test_file.txt";
1433 let source_file = source_dir.join(source_file_name);
1434 std::fs::write(&source_file, b"test content").unwrap();
1435
1436 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1438 .await
1439 .unwrap();
1440
1441 let from_path = base_path.child(source_file_name);
1443
1444 let dest_file = dest_dir.join("copied_file.txt");
1446 let dest_str = dest_file.to_str().unwrap();
1447 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1448
1449 store.copy(&from_path, &to_path).await.unwrap();
1451
1452 assert!(dest_file.exists());
1454 let copied_content = std::fs::read(&dest_file).unwrap();
1455 assert_eq!(copied_content, b"test content");
1456 }
1457
1458 #[tokio::test]
1459 async fn test_copy_creates_parent_directories() {
1460 let source_dir = TempStdDir::default();
1461 let dest_dir = TempStdDir::default();
1462
1463 let source_file_name = "test_file.txt";
1465 let source_file = source_dir.join(source_file_name);
1466 std::fs::write(&source_file, b"test content").unwrap();
1467
1468 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1470 .await
1471 .unwrap();
1472
1473 let from_path = base_path.child(source_file_name);
1475
1476 let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1478 let dest_str = dest_file.to_str().unwrap();
1479 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1480
1481 store.copy(&from_path, &to_path).await.unwrap();
1483
1484 assert!(dest_file.exists());
1486 assert!(dest_file.parent().unwrap().exists());
1487 let copied_content = std::fs::read(&dest_file).unwrap();
1488 assert_eq!(copied_content, b"test content");
1489 }
1490
1491 #[test]
1492 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1493 fn test_client_options_extracts_headers() {
1494 let opts = StorageOptions(HashMap::from([
1495 ("headers.x-custom-foo".to_string(), "bar".to_string()),
1496 ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1497 ("region".to_string(), "us-west-2".to_string()),
1498 ]));
1499 let client_options = opts.client_options().unwrap();
1500
1501 let opts_no_headers = StorageOptions(HashMap::from([(
1504 "region".to_string(),
1505 "us-west-2".to_string(),
1506 )]));
1507 opts_no_headers.client_options().unwrap();
1508
1509 #[cfg(feature = "gcp")]
1513 {
1514 use object_store::gcp::GoogleCloudStorageBuilder;
1515 let _builder = GoogleCloudStorageBuilder::new()
1516 .with_client_options(client_options)
1517 .with_url("gs://test-bucket");
1518 }
1519 }
1520
1521 #[test]
1522 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1523 fn test_client_options_rejects_invalid_header_name() {
1524 let opts = StorageOptions(HashMap::from([(
1525 "headers.bad header".to_string(),
1526 "value".to_string(),
1527 )]));
1528 let err = opts.client_options().unwrap_err();
1529 assert!(err.to_string().contains("invalid header name"));
1530 }
1531
1532 #[test]
1533 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1534 fn test_client_options_rejects_invalid_header_value() {
1535 let opts = StorageOptions(HashMap::from([(
1536 "headers.x-good-name".to_string(),
1537 "bad\x01value".to_string(),
1538 )]));
1539 let err = opts.client_options().unwrap_err();
1540 assert!(err.to_string().contains("invalid header value"));
1541 }
1542
1543 #[test]
1544 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1545 fn test_client_options_empty_when_no_header_keys() {
1546 let opts = StorageOptions(HashMap::from([
1547 ("region".to_string(), "us-east-1".to_string()),
1548 ("access_key_id".to_string(), "AKID".to_string()),
1549 ]));
1550 opts.client_options().unwrap();
1551 }
1552}