1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use futures::{FutureExt, Stream};
17use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
18use lance_core::deepsize::DeepSizeOf;
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22use object_store::DynObjectStore;
23use object_store::ObjectStoreExt as OSObjectStoreExt;
24#[cfg(feature = "aws")]
25use object_store::aws::AwsCredentialProvider;
26#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
27use object_store::{ClientOptions, HeaderMap, HeaderValue};
28use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
29use providers::local::FileStoreProvider;
30use providers::memory::MemoryStoreProvider;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35#[cfg(target_os = "linux")]
36use crate::uring::{UringCurrentThreadReader, UringReader};
37#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
38pub(crate) mod dynamic_credentials;
39#[cfg(any(feature = "oss", feature = "huggingface", feature = "tos"))]
40pub(crate) mod dynamic_opendal;
41mod list_retry;
42pub mod providers;
43pub mod storage_options;
44#[cfg(test)]
45pub(crate) mod test_utils;
46pub mod throttle;
47mod tracing;
48use crate::object_reader::SmallReader;
49use crate::object_writer::{LocalWriter, WriteResult};
50use crate::traits::{WriteExt, Writer};
51use crate::utils::tracking_store::{IOTracker, IoStats};
52use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
53use lance_core::{Error, Result};
54
55pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
60pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
62
63const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(
65 feature = "aws",
66 feature = "gcp",
67 feature = "azure",
68 feature = "oss",
69 feature = "tencent",
70 feature = "huggingface",
71 feature = "tos",
72))]
73const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
76 std::env::var("LANCE_MAX_IOP_SIZE")
77 .map(|val| val.parse().unwrap())
78 .unwrap_or(16 * 1024 * 1024)
79});
80
81pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
82
83pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
84pub use storage_options::{
85 EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
86 StorageOptionsAccessor, StorageOptionsProvider,
87};
88
89#[async_trait]
90pub trait ObjectStoreExt {
91 async fn exists(&self, path: &Path) -> Result<bool>;
93
94 fn read_dir_all<'a, 'b>(
98 &'a self,
99 dir_path: impl Into<&'b Path> + Send,
100 unmodified_since: Option<DateTime<Utc>>,
101 ) -> BoxStream<'a, Result<ObjectMeta>>;
102}
103
104#[async_trait]
105impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
106 fn read_dir_all<'a, 'b>(
107 &'a self,
108 dir_path: impl Into<&'b Path> + Send,
109 unmodified_since: Option<DateTime<Utc>>,
110 ) -> BoxStream<'a, Result<ObjectMeta>> {
111 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
112 if let Some(unmodified_since_val) = unmodified_since {
113 output
114 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
115 .boxed()
116 } else {
117 output.boxed()
118 }
119 }
120
121 async fn exists(&self, path: &Path) -> Result<bool> {
122 match self.head(path).await {
123 Ok(_) => Ok(true),
124 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
125 Err(e) => Err(e.into()),
126 }
127 }
128}
129
130#[derive(Debug, Clone)]
132pub struct ObjectStore {
133 pub inner: Arc<dyn OSObjectStore>,
135 scheme: String,
136 block_size: usize,
137 max_iop_size: u64,
138 pub use_constant_size_upload_parts: bool,
141 pub list_is_lexically_ordered: bool,
144 io_parallelism: usize,
145 download_retry_count: usize,
147 io_tracker: IOTracker,
149 pub store_prefix: String,
153}
154
155impl DeepSizeOf for ObjectStore {
156 fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
157 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
162 }
163}
164
165impl std::fmt::Display for ObjectStore {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 write!(f, "ObjectStore({})", self.scheme)
168 }
169}
170
171pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
172 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
177}
178
179#[derive(Debug, Clone)]
180pub struct ChainedWrappingObjectStore {
181 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
182}
183
184impl ChainedWrappingObjectStore {
185 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
186 Self { wrappers }
187 }
188
189 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
190 self.wrappers.push(wrapper);
191 }
192}
193
194impl WrappingObjectStore for ChainedWrappingObjectStore {
195 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
196 self.wrappers
197 .iter()
198 .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
199 }
200}
201
202#[derive(Debug, Clone)]
205pub struct ObjectStoreParams {
206 pub block_size: Option<usize>,
207 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
208 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
209 pub s3_credentials_refresh_offset: Duration,
212 #[cfg(feature = "aws")]
213 pub aws_credentials: Option<AwsCredentialProvider>,
214 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
215 pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
221 pub use_constant_size_upload_parts: bool,
226 pub list_is_lexically_ordered: Option<bool>,
227}
228
229impl Default for ObjectStoreParams {
230 fn default() -> Self {
231 #[allow(deprecated)]
232 Self {
233 object_store: None,
234 block_size: None,
235 s3_credentials_refresh_offset: Duration::from_secs(60),
236 #[cfg(feature = "aws")]
237 aws_credentials: None,
238 object_store_wrapper: None,
239 storage_options_accessor: None,
240 use_constant_size_upload_parts: false,
241 list_is_lexically_ordered: None,
242 }
243 }
244}
245
246impl ObjectStoreParams {
247 pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
249 self.storage_options_accessor.clone()
250 }
251
252 pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
256 self.storage_options_accessor
257 .as_ref()
258 .and_then(|a| a.initial_storage_options())
259 }
260}
261
262impl std::hash::Hash for ObjectStoreParams {
264 #[allow(deprecated)]
265 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
266 self.block_size.hash(state);
268 if let Some((store, url)) = &self.object_store {
269 Arc::as_ptr(store).hash(state);
270 url.hash(state);
271 }
272 self.s3_credentials_refresh_offset.hash(state);
273 #[cfg(feature = "aws")]
274 if let Some(aws_credentials) = &self.aws_credentials {
275 Arc::as_ptr(aws_credentials).hash(state);
276 }
277 if let Some(wrapper) = &self.object_store_wrapper {
278 Arc::as_ptr(wrapper).hash(state);
279 }
280 if let Some(accessor) = &self.storage_options_accessor {
281 accessor.accessor_id().hash(state);
282 }
283 self.use_constant_size_upload_parts.hash(state);
284 self.list_is_lexically_ordered.hash(state);
285 }
286}
287
288impl Eq for ObjectStoreParams {}
290impl PartialEq for ObjectStoreParams {
291 #[allow(deprecated)]
292 fn eq(&self, other: &Self) -> bool {
293 #[cfg(feature = "aws")]
294 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
295 return false;
296 }
297
298 self.block_size == other.block_size
301 && self
302 .object_store
303 .as_ref()
304 .map(|(store, url)| (Arc::as_ptr(store), url))
305 == other
306 .object_store
307 .as_ref()
308 .map(|(store, url)| (Arc::as_ptr(store), url))
309 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
310 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
311 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
312 && self
313 .storage_options_accessor
314 .as_ref()
315 .map(|a| a.accessor_id())
316 == other
317 .storage_options_accessor
318 .as_ref()
319 .map(|a| a.accessor_id())
320 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
321 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
322 }
323}
324
325pub fn uri_to_url(uri: &str) -> Result<Url> {
346 match Url::parse(uri) {
347 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
348 local_path_to_url(uri)
350 }
351 Ok(url) => Ok(url),
352 Err(_) => local_path_to_url(uri),
353 }
354}
355
356fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
357 let str_path = str_path.as_ref();
358 let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
359
360 let mut expanded_path = path_abs::PathAbs::new(expanded)
361 .unwrap()
362 .as_path()
363 .to_path_buf();
364 if let Some(s) = expanded_path.as_path().to_str()
366 && s.is_empty()
367 {
368 expanded_path = std::env::current_dir()?;
369 }
370
371 Ok(expanded_path)
372}
373
374fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
375 let home_dir = std::env::home_dir()?;
376 if path == "~" {
377 return Some(home_dir);
378 }
379 if let Some(stripped) = path.strip_prefix("~/") {
380 return Some(home_dir.join(stripped));
381 }
382 #[cfg(windows)]
383 if let Some(stripped) = path.strip_prefix("~\\") {
384 return Some(home_dir.join(stripped));
385 }
386
387 None
388}
389
390fn local_path_to_url(str_path: &str) -> Result<Url> {
391 let expanded_path = expand_path(str_path)?;
392
393 Url::from_directory_path(expanded_path).map_err(|_| {
394 Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
395 })
396}
397
398#[cfg(feature = "huggingface")]
399fn parse_hf_repo_id(url: &Url) -> Result<String> {
400 let mut segments: Vec<String> = Vec::new();
402 if let Some(host) = url.host_str() {
403 segments.push(host.to_string());
404 }
405 segments.extend(
406 url.path()
407 .trim_start_matches('/')
408 .split('/')
409 .map(|s| s.to_string()),
410 );
411
412 if segments.len() < 2 {
413 return Err(Error::invalid_input(
414 "Huggingface URL must contain at least owner and repo",
415 ));
416 }
417
418 let repo_type_candidates = ["models", "datasets", "spaces"];
419 let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
420 if segments.len() < 3 {
421 return Err(Error::invalid_input(
422 "Huggingface URL missing owner/repo after repo type",
423 ));
424 }
425 (segments[1].as_str(), segments[2].as_str())
426 } else {
427 (segments[0].as_str(), segments[1].as_str())
428 };
429
430 let repo = repo_with_rev
431 .split_once('@')
432 .map(|(r, _)| r)
433 .unwrap_or(repo_with_rev);
434 Ok(format!("{owner}/{repo}"))
435}
436
437impl ObjectStore {
438 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
446 let registry = Arc::new(ObjectStoreRegistry::default());
447
448 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
449 }
450
451 pub async fn from_uri_and_params(
455 registry: Arc<ObjectStoreRegistry>,
456 uri: &str,
457 params: &ObjectStoreParams,
458 ) -> Result<(Arc<Self>, Path)> {
459 #[allow(deprecated)]
460 if let Some((store, path)) = params.object_store.as_ref() {
461 let mut inner = store.clone();
462 let store_prefix =
463 registry.calculate_object_store_prefix(uri, params.storage_options())?;
464 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
465 inner = wrapper.wrap(&store_prefix, inner);
466 }
467
468 let io_tracker = IOTracker::default();
470 let tracked_store = io_tracker.wrap("", inner);
471
472 let store = Self {
473 inner: tracked_store,
474 scheme: path.scheme().to_string(),
475 block_size: params.block_size.unwrap_or(64 * 1024),
476 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
477 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
478 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
479 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
480 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
481 io_tracker,
482 store_prefix,
483 };
484 let path = Path::parse(path.path())?;
485 return Ok((Arc::new(store), path));
486 }
487 let url = uri_to_url(uri)?;
488
489 let store = registry.get_store(url.clone(), params).await?;
490 let provider = registry.get_provider(url.scheme()).expect_ok()?;
492 let path = provider.extract_path(&url)?;
493
494 Ok((store, path))
495 }
496
497 pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
511 let url = uri_to_url(uri)?;
512 let provider = registry
513 .get_provider(url.scheme())
514 .ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
515 provider.extract_path(&url)
516 }
517
518 #[deprecated(note = "Use `from_uri` instead")]
519 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
520 Self::from_uri_and_params(
521 Arc::new(ObjectStoreRegistry::default()),
522 str_path,
523 &Default::default(),
524 )
525 .now_or_never()
526 .unwrap()
527 }
528
529 pub fn local() -> Self {
531 let provider = FileStoreProvider;
532 provider
533 .new_store(Url::parse("file:///").unwrap(), &Default::default())
534 .now_or_never()
535 .unwrap()
536 .unwrap()
537 }
538
539 pub fn memory() -> Self {
541 let provider = MemoryStoreProvider;
542 provider
543 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
544 .now_or_never()
545 .unwrap()
546 .unwrap()
547 }
548
549 pub fn is_local(&self) -> bool {
551 self.scheme == "file" || self.scheme == "file+uring"
552 }
553
554 pub fn is_cloud(&self) -> bool {
555 if self.is_local() || self.scheme == "memory" || self.scheme == "shared-memory" {
556 return false;
557 }
558 true
559 }
560
561 pub fn prefers_lite_scheduler(&self) -> bool {
566 self.scheme == "file+uring"
567 }
568
569 pub fn scheme(&self) -> &str {
570 &self.scheme
571 }
572
573 pub fn block_size(&self) -> usize {
574 self.block_size
575 }
576
577 pub fn max_iop_size(&self) -> u64 {
578 self.max_iop_size
579 }
580
581 pub fn io_parallelism(&self) -> usize {
582 std::env::var("LANCE_IO_THREADS")
583 .map(|val| val.parse::<usize>().unwrap())
584 .unwrap_or(self.io_parallelism)
585 }
586
587 pub fn io_tracker(&self) -> &IOTracker {
592 &self.io_tracker
593 }
594
595 pub fn io_stats_snapshot(&self) -> IoStats {
600 self.io_tracker.stats()
601 }
602
603 pub fn io_stats_incremental(&self) -> IoStats {
609 self.io_tracker.incremental_stats()
610 }
611
612 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
617 match self.scheme.as_str() {
618 "file" => {
619 LocalObjectReader::open_with_tracker(
620 path,
621 self.block_size,
622 None,
623 Arc::new(self.io_tracker.clone()),
624 )
625 .await
626 }
627 #[cfg(target_os = "linux")]
628 "file+uring" => {
629 let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
631 .map(|v| str_is_truthy(&v))
632 .unwrap_or(false);
633
634 if use_current_thread {
635 UringCurrentThreadReader::open(
636 path,
637 self.block_size,
638 None,
639 Arc::new(self.io_tracker.clone()),
640 )
641 .await
642 } else {
643 UringReader::open(
644 path,
645 self.block_size,
646 None,
647 Arc::new(self.io_tracker.clone()),
648 )
649 .await
650 }
651 }
652 _ => Ok(Box::new(CloudObjectReader::new(
653 self.inner.clone(),
654 path.clone(),
655 self.block_size,
656 None,
657 self.download_retry_count,
658 )?)),
659 }
660 }
661
662 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
668 if known_size <= self.block_size {
671 return Ok(Box::new(SmallReader::new(
672 self.inner.clone(),
673 path.clone(),
674 self.download_retry_count,
675 known_size,
676 )));
677 }
678
679 match self.scheme.as_str() {
680 "file" => {
681 LocalObjectReader::open_with_tracker(
682 path,
683 self.block_size,
684 Some(known_size),
685 Arc::new(self.io_tracker.clone()),
686 )
687 .await
688 }
689 #[cfg(target_os = "linux")]
690 "file+uring" => {
691 let use_current_thread = std::env::var("LANCE_URING_CURRENT_THREAD")
693 .map(|v| str_is_truthy(&v))
694 .unwrap_or(false);
695
696 if use_current_thread {
697 UringCurrentThreadReader::open(
698 path,
699 self.block_size,
700 Some(known_size),
701 Arc::new(self.io_tracker.clone()),
702 )
703 .await
704 } else {
705 UringReader::open(
706 path,
707 self.block_size,
708 Some(known_size),
709 Arc::new(self.io_tracker.clone()),
710 )
711 .await
712 }
713 }
714 _ => Ok(Box::new(CloudObjectReader::new(
715 self.inner.clone(),
716 path.clone(),
717 self.block_size,
718 Some(known_size),
719 self.download_retry_count,
720 )?)),
721 }
722 }
723
724 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
726 let object_store = Self::local();
727 let absolute_path = expand_path(path.to_string_lossy())?;
728 let os_path = Path::from_absolute_path(absolute_path)?;
729 ObjectWriter::new(&object_store, &os_path).await
730 }
731
732 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
734 let object_store = Self::local();
735 let absolute_path = expand_path(path.to_string_lossy())?;
736 let os_path = Path::from_absolute_path(absolute_path)?;
737 object_store.open(&os_path).await
738 }
739
740 pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
742 match self.scheme.as_str() {
743 "file" => {
744 let local_path = super::local::to_local_path(path);
745 let local_path = std::path::PathBuf::from(&local_path);
746 if let Some(parent) = local_path.parent() {
747 tokio::fs::create_dir_all(parent).await?;
748 }
749 let parent = local_path
750 .parent()
751 .expect("file path must have parent")
752 .to_owned();
753 let named_temp =
754 tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
755 .await
756 .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
757 let (std_file, temp_path) = named_temp.into_parts();
758 let file = tokio::fs::File::from_std(std_file);
759 Ok(Box::new(LocalWriter::new(
760 file,
761 path.clone(),
762 temp_path,
763 Arc::new(self.io_tracker.clone()),
764 )))
765 }
766 _ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
767 }
768 }
769
770 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
772 let mut writer = self.create(path).await?;
773 writer.write_all(content).await?;
774 Writer::shutdown(writer.as_mut()).await
775 }
776
777 pub async fn delete(&self, path: &Path) -> Result<()> {
778 self.inner.delete(path).await?;
779 Ok(())
780 }
781
782 const MAX_SINGLE_COPY_BYTES: u64 = 5 * 1024 * 1024 * 1024; pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
787 let multipart_copy_fallback = matches!(self.scheme.as_str(), "s3" | "s3+ddb" | "gs");
793 self.copy_impl(
794 from,
795 to,
796 multipart_copy_fallback,
797 Self::MAX_SINGLE_COPY_BYTES,
798 )
799 .await
800 }
801
802 async fn copy_impl(
808 &self,
809 from: &Path,
810 to: &Path,
811 multipart_copy_fallback: bool,
812 max_single_copy: u64,
813 ) -> Result<()> {
814 if self.is_local() {
815 return super::local::copy_file(from, to);
817 }
818 if multipart_copy_fallback {
819 let reader = self.open(from).await?;
822 if reader.size().await? as u64 > max_single_copy {
823 let mut writer = self.create(to).await?;
824 writer.copy_from_reader(reader.as_ref()).await?;
825 Writer::shutdown(writer.as_mut()).await?;
826 return Ok(());
827 }
828 }
829 Ok(self.inner.copy(from, to).await?)
830 }
831
832 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
834 let path = dir_path.into();
835 let path = Path::parse(&path)?;
836 let output = self.inner.list_with_delimiter(Some(&path)).await?;
837 Ok(output
838 .common_prefixes
839 .iter()
840 .chain(output.objects.iter().map(|o| &o.location))
841 .filter_map(|s| s.filename().map(|f| f.to_string()))
842 .collect())
843 }
844
845 pub fn list(
846 &self,
847 path: Option<Path>,
848 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
849 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
850 }
851
852 pub fn read_dir_all<'a, 'b>(
856 &'a self,
857 dir_path: impl Into<&'b Path> + Send,
858 unmodified_since: Option<DateTime<Utc>>,
859 ) -> BoxStream<'a, Result<ObjectMeta>> {
860 self.inner.read_dir_all(dir_path, unmodified_since)
861 }
862
863 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
865 let path = dir_path.into();
866 let path = Path::parse(&path)?;
867
868 if self.is_local() {
869 return super::local::remove_dir_all(&path);
871 }
872 let sub_entries = self
873 .inner
874 .list(Some(&path))
875 .map(|m| m.map(|meta| meta.location))
876 .boxed();
877 self.inner
878 .delete_stream(sub_entries)
879 .try_collect::<Vec<_>>()
880 .await?;
881 if self.scheme == "file-object-store" {
882 return super::local::remove_dir_all(&path);
885 }
886 Ok(())
887 }
888
889 pub fn remove_stream<'a>(
890 &'a self,
891 locations: BoxStream<'a, Result<Path>>,
892 ) -> BoxStream<'a, Result<Path>> {
893 let store = Arc::clone(&self.inner);
894 locations
895 .and_then(move |location| {
896 let store = Arc::clone(&store);
897 async move {
898 store.delete(&location).await?;
899 Ok(location)
900 }
901 })
902 .boxed()
903 }
904
905 pub async fn exists(&self, path: &Path) -> Result<bool> {
907 match self.inner.head(path).await {
908 Ok(_) => Ok(true),
909 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
910 Err(e) => Err(e.into()),
911 }
912 }
913
914 pub async fn size(&self, path: &Path) -> Result<u64> {
916 Ok(self.inner.head(path).await?.size)
917 }
918
919 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
921 let reader = self.open(path).await?;
922 Ok(reader.get_all().await?)
923 }
924
925 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
930 let reader = self.open(path).await?;
931 Ok(reader.get_range(range).await?)
932 }
933}
934
935#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
937pub enum LanceConfigKey {
938 DownloadRetryCount,
940}
941
942impl FromStr for LanceConfigKey {
943 type Err = Error;
944
945 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
946 match s.to_ascii_lowercase().as_str() {
947 "download_retry_count" => Ok(Self::DownloadRetryCount),
948 _ => Err(Error::invalid_input_source(
949 format!("Invalid LanceConfigKey: {}", s).into(),
950 )),
951 }
952 }
953}
954
955#[derive(Clone, Debug, Default)]
956pub struct StorageOptions(pub HashMap<String, String>);
957
958impl StorageOptions {
959 pub fn new(options: HashMap<String, String>) -> Self {
961 let mut options = options;
962 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
963 options.insert("allow_http".into(), value);
964 }
965 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
966 options.insert("allow_http".into(), value);
967 }
968 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
969 options.insert("allow_http".into(), value);
970 }
971 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
972 options.insert("client_max_retries".into(), value);
973 }
974 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
975 options.insert("client_retry_timeout".into(), value);
976 }
977 Self(options)
978 }
979
980 pub fn allow_http(&self) -> bool {
982 self.0.iter().any(|(key, value)| {
983 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
984 })
985 }
986
987 pub fn download_retry_count(&self) -> usize {
989 self.0
990 .iter()
991 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
992 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
993 .unwrap_or(3)
994 }
995
996 pub fn client_max_retries(&self) -> usize {
998 self.0
999 .iter()
1000 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
1001 .and_then(|(_, value)| value.parse::<usize>().ok())
1002 .unwrap_or(3)
1003 }
1004
1005 pub fn client_retry_timeout(&self) -> u64 {
1007 self.0
1008 .iter()
1009 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
1010 .and_then(|(_, value)| value.parse::<u64>().ok())
1011 .unwrap_or(180)
1012 }
1013
1014 pub fn get(&self, key: &str) -> Option<&String> {
1015 self.0.get(key)
1016 }
1017
1018 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1026 pub fn client_options(&self) -> Result<ClientOptions> {
1027 let mut headers = HeaderMap::new();
1028 for (key, value) in &self.0 {
1029 if let Some(header_name) = key.strip_prefix("headers.") {
1030 let name = header_name
1031 .parse::<http::header::HeaderName>()
1032 .map_err(|e| {
1033 Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
1034 })?;
1035 let val = HeaderValue::from_str(value).map_err(|e| {
1036 Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
1037 })?;
1038 headers.insert(name, val);
1039 }
1040 }
1041 let mut client_options = ClientOptions::default();
1042 if !headers.is_empty() {
1043 client_options = client_options.with_default_headers(headers);
1044 }
1045 Ok(client_options)
1046 }
1047
1048 pub fn expires_at_millis(&self) -> Option<u64> {
1050 self.0
1051 .get(EXPIRES_AT_MILLIS_KEY)
1052 .and_then(|s| s.parse::<u64>().ok())
1053 }
1054}
1055
1056impl From<HashMap<String, String>> for StorageOptions {
1057 fn from(value: HashMap<String, String>) -> Self {
1058 Self::new(value)
1059 }
1060}
1061
1062static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
1063 std::sync::LazyLock::new(ObjectStoreRegistry::default);
1064
1065impl ObjectStore {
1066 #[allow(clippy::too_many_arguments)]
1067 pub fn new(
1068 store: Arc<DynObjectStore>,
1069 location: Url,
1070 block_size: Option<usize>,
1071 wrapper: Option<Arc<dyn WrappingObjectStore>>,
1072 use_constant_size_upload_parts: bool,
1073 list_is_lexically_ordered: bool,
1074 io_parallelism: usize,
1075 download_retry_count: usize,
1076 storage_options: Option<&HashMap<String, String>>,
1077 ) -> Self {
1078 let scheme = location.scheme();
1079 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
1080 let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
1081 Some(provider) => provider
1082 .calculate_object_store_prefix(&location, storage_options)
1083 .unwrap(),
1084 None => {
1085 let store_prefix = format!("{}${}", location.scheme(), location.authority());
1086 log::warn!(
1087 "Guessing that object store prefix is {}, since object store scheme is not found in registry.",
1088 store_prefix
1089 );
1090 store_prefix
1091 }
1092 };
1093 let store = match wrapper {
1094 Some(wrapper) => wrapper.wrap(&store_prefix, store),
1095 None => store,
1096 };
1097
1098 let io_tracker = IOTracker::default();
1100 let tracked_store = io_tracker.wrap("", store);
1101
1102 Self {
1103 inner: tracked_store,
1104 scheme: scheme.into(),
1105 block_size,
1106 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
1107 use_constant_size_upload_parts,
1108 list_is_lexically_ordered,
1109 io_parallelism,
1110 download_retry_count,
1111 io_tracker,
1112 store_prefix,
1113 }
1114 }
1115}
1116
1117fn infer_block_size(scheme: &str) -> usize {
1118 match scheme {
1122 "file" => 4 * 1024,
1123 _ => 64 * 1024,
1124 }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use super::*;
1130 use async_trait::async_trait;
1131 use bytes::Bytes;
1132 use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
1133 use object_store::memory::InMemory;
1134 use object_store::{
1135 CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, PutMultipartOptions,
1136 PutOptions, PutPayload, PutResult, Result as OSResult,
1137 };
1138 use rstest::rstest;
1139 use std::env::set_current_dir;
1140 use std::fmt::{Display, Formatter};
1141 use std::fs::{create_dir_all, write};
1142 use std::ops::Range;
1143 use std::path::Path as StdPath;
1144 use std::sync::atomic::{AtomicBool, Ordering};
1145
1146 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
1148 let path = expand_path(path_str).map_err(std::io::Error::other)?;
1149 std::fs::create_dir_all(path.parent().unwrap())?;
1150 write(path, contents)
1151 }
1152
1153 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
1154 let test_file_store = store.open(path).await.unwrap();
1155 let size = test_file_store.size().await.unwrap();
1156 let bytes = test_file_store.get_range(0..size).await.unwrap();
1157 let contents = String::from_utf8(bytes.to_vec()).unwrap();
1158 Ok(contents)
1159 }
1160
1161 #[tokio::test]
1162 async fn test_absolute_paths() {
1163 let tmp_path = TempStrDir::default();
1164 write_to_file(
1165 &format!("{tmp_path}/bar/foo.lance/test_file"),
1166 "TEST_CONTENT",
1167 )
1168 .unwrap();
1169
1170 for uri in &[
1172 format!("{tmp_path}/bar/foo.lance"),
1173 format!("{tmp_path}/./bar/foo.lance"),
1174 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
1175 ] {
1176 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1177 let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1178 .await
1179 .unwrap();
1180 assert_eq!(contents, "TEST_CONTENT");
1181 }
1182 }
1183
1184 #[tokio::test]
1185 async fn test_cloud_paths() {
1186 let uri = "s3://bucket/foo.lance";
1187 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1188 assert_eq!(store.scheme, "s3");
1189 assert_eq!(path.to_string(), "foo.lance");
1190
1191 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
1192 .await
1193 .unwrap();
1194 assert_eq!(store.scheme, "s3");
1195 assert_eq!(path.to_string(), "foo.lance");
1196
1197 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
1198 .await
1199 .unwrap();
1200 assert_eq!(store.scheme, "gs");
1201 assert_eq!(path.to_string(), "foo.lance");
1202
1203 let (store, path) =
1204 ObjectStore::from_uri("abfss://filesystem@account.dfs.core.windows.net/foo.lance")
1205 .await
1206 .unwrap();
1207 assert_eq!(store.scheme, "abfss");
1208 assert_eq!(path.to_string(), "foo.lance");
1209 }
1210
1211 async fn test_block_size_used_test_helper(
1212 uri: &str,
1213 storage_options: Option<HashMap<String, String>>,
1214 default_expected_block_size: usize,
1215 ) {
1216 let registry = Arc::new(ObjectStoreRegistry::default());
1218 let accessor = storage_options
1219 .clone()
1220 .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
1221 let params = ObjectStoreParams {
1222 storage_options_accessor: accessor.clone(),
1223 ..ObjectStoreParams::default()
1224 };
1225 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1226 .await
1227 .unwrap();
1228 assert_eq!(store.block_size, default_expected_block_size);
1229
1230 let registry = Arc::new(ObjectStoreRegistry::default());
1232 let params = ObjectStoreParams {
1233 block_size: Some(1024),
1234 storage_options_accessor: accessor,
1235 ..ObjectStoreParams::default()
1236 };
1237 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1238 .await
1239 .unwrap();
1240 assert_eq!(store.block_size, 1024);
1241 }
1242
1243 #[rstest]
1244 #[case("s3://bucket/foo.lance", None)]
1245 #[case("gs://bucket/foo.lance", None)]
1246 #[case("az://account/bucket/foo.lance",
1247 Some(HashMap::from([
1248 (String::from("account_name"), String::from("account")),
1249 (String::from("container_name"), String::from("container"))
1250 ])))]
1251 #[case("abfss://filesystem@account.dfs.core.windows.net/foo.lance",
1252 Some(HashMap::from([
1253 (String::from("account_name"), String::from("account")),
1254 (String::from("container_name"), String::from("filesystem"))
1255 ])))]
1256 #[tokio::test]
1257 async fn test_block_size_used_cloud(
1258 #[case] uri: &str,
1259 #[case] storage_options: Option<HashMap<String, String>>,
1260 ) {
1261 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1262 }
1263
1264 #[rstest]
1265 #[case("file")]
1266 #[case("file-object-store")]
1267 #[case("memory:///bucket/foo.lance")]
1268 #[tokio::test]
1269 async fn test_block_size_used_file(#[case] prefix: &str) {
1270 let tmp_path = TempStrDir::default();
1271 let path = format!("{tmp_path}/bar/foo.lance/test_file");
1272 write_to_file(&path, "URL").unwrap();
1273 let uri = format!("{prefix}:///{path}");
1274 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1275 }
1276
1277 #[tokio::test]
1278 async fn test_relative_paths() {
1279 let tmp_path = TempStrDir::default();
1280 write_to_file(
1281 &format!("{tmp_path}/bar/foo.lance/test_file"),
1282 "RELATIVE_URL",
1283 )
1284 .unwrap();
1285
1286 set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1287 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1288
1289 let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1290 .await
1291 .unwrap();
1292 assert_eq!(contents, "RELATIVE_URL");
1293 }
1294
1295 #[tokio::test]
1296 async fn test_tilde_expansion() {
1297 let uri = "~/foo.lance";
1298 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1299 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1300 let contents = read_from_store(store.as_ref(), &path.clone().join("test_file"))
1301 .await
1302 .unwrap();
1303 assert_eq!(contents, "TILDE");
1304 }
1305
1306 #[tokio::test]
1307 async fn test_read_directory() {
1308 let path = TempStdDir::default();
1309 create_dir_all(path.join("foo").join("bar")).unwrap();
1310 create_dir_all(path.join("foo").join("zoo")).unwrap();
1311 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1312 write_to_file(
1313 path.join("foo").join("test_file").to_str().unwrap(),
1314 "read_dir",
1315 )
1316 .unwrap();
1317 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1318
1319 let sub_dirs = store.read_dir(base.clone().join("foo")).await.unwrap();
1320 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1321 }
1322
1323 #[tokio::test]
1324 async fn test_delete_directory_local_store() {
1325 test_delete_directory("").await;
1326 }
1327
1328 #[tokio::test]
1329 async fn test_delete_directory_file_object_store() {
1330 test_delete_directory("file-object-store").await;
1331 }
1332
1333 async fn test_delete_directory(scheme: &str) {
1334 let path = TempStdDir::default();
1335 create_dir_all(path.join("foo").join("bar")).unwrap();
1336 create_dir_all(path.join("foo").join("zoo")).unwrap();
1337 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1338 write_to_file(
1339 path.join("foo")
1340 .join("bar")
1341 .join("test_file")
1342 .to_str()
1343 .unwrap(),
1344 "delete",
1345 )
1346 .unwrap();
1347 let file_url = Url::from_directory_path(&path).unwrap();
1348 let url = if scheme.is_empty() {
1349 file_url
1350 } else {
1351 let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
1352 url.set_path(file_url.path());
1354 url
1355 };
1356 let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
1357 store
1358 .remove_dir_all(base.clone().join("foo"))
1359 .await
1360 .unwrap();
1361
1362 assert!(!path.join("foo").exists());
1363 }
1364
1365 #[derive(Debug)]
1366 struct TestWrapper {
1367 called: AtomicBool,
1368
1369 return_value: Arc<dyn OSObjectStore>,
1370 }
1371
1372 impl WrappingObjectStore for TestWrapper {
1373 fn wrap(
1374 &self,
1375 _store_prefix: &str,
1376 _original: Arc<dyn OSObjectStore>,
1377 ) -> Arc<dyn OSObjectStore> {
1378 self.called.store(true, Ordering::Relaxed);
1379
1380 self.return_value.clone()
1382 }
1383 }
1384
1385 impl TestWrapper {
1386 fn called(&self) -> bool {
1387 self.called.load(Ordering::Relaxed)
1388 }
1389 }
1390
1391 #[tokio::test]
1392 async fn test_wrapping_object_store_option_is_used() {
1393 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1395 let registry = Arc::new(ObjectStoreRegistry::default());
1396
1397 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1398
1399 let wrapper = Arc::new(TestWrapper {
1400 called: AtomicBool::new(false),
1401 return_value: mock_inner_store.clone(),
1402 });
1403
1404 let params = ObjectStoreParams {
1405 object_store_wrapper: Some(wrapper.clone()),
1406 ..ObjectStoreParams::default()
1407 };
1408
1409 assert!(!wrapper.called());
1411
1412 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
1413 .await
1414 .unwrap();
1415
1416 assert!(wrapper.called());
1418
1419 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1422 }
1423
1424 #[tokio::test]
1425 async fn test_local_paths() {
1426 let file_path = TempStdFile::default();
1427 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1428 writer.write_all(b"LOCAL").await.unwrap();
1429 Writer::shutdown(&mut writer).await.unwrap();
1430
1431 let reader = ObjectStore::open_local(&file_path).await.unwrap();
1432 let buf = reader.get_range(0..5).await.unwrap();
1433 assert_eq!(buf.as_ref(), b"LOCAL");
1434 }
1435
1436 #[tokio::test]
1437 async fn test_read_one() {
1438 let file_path = TempStdFile::default();
1439 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1440 writer.write_all(b"LOCAL").await.unwrap();
1441 Writer::shutdown(&mut writer).await.unwrap();
1442
1443 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1444 let obj_store = ObjectStore::local();
1445 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1446 assert_eq!(buf.as_ref(), b"LOCAL");
1447
1448 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1449 assert_eq!(buf.as_ref(), b"LOCAL");
1450 }
1451
1452 #[tokio::test]
1453 #[cfg(windows)]
1454 async fn test_windows_paths() {
1455 use std::path::Component;
1456 use std::path::Prefix;
1457 use std::path::Prefix::*;
1458
1459 fn get_path_prefix(path: &StdPath) -> Prefix<'_> {
1460 match path.components().next().unwrap() {
1461 Component::Prefix(prefix_component) => prefix_component.kind(),
1462 _ => panic!(),
1463 }
1464 }
1465
1466 fn get_drive_letter(prefix: Prefix) -> String {
1467 match prefix {
1468 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1469 _ => panic!(),
1470 }
1471 }
1472
1473 let tmp_path = TempStdFile::default();
1474 let prefix = get_path_prefix(&tmp_path);
1475 let drive_letter = get_drive_letter(prefix);
1476
1477 write_to_file(
1478 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1479 "WINDOWS",
1480 )
1481 .unwrap();
1482
1483 for uri in &[
1484 format!("{drive_letter}:/test_folder/test.lance"),
1485 format!("{drive_letter}:\\test_folder\\test.lance"),
1486 ] {
1487 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1488 let contents = read_from_store(store.as_ref(), &base.clone().join("test_file"))
1489 .await
1490 .unwrap();
1491 assert_eq!(contents, "WINDOWS");
1492 }
1493 }
1494
1495 #[tokio::test]
1496 async fn test_cross_filesystem_copy() {
1497 let source_dir = TempStdDir::default();
1499 let dest_dir = TempStdDir::default();
1500
1501 let source_file_name = "test_file.txt";
1503 let source_file = source_dir.join(source_file_name);
1504 std::fs::write(&source_file, b"test content").unwrap();
1505
1506 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1508 .await
1509 .unwrap();
1510
1511 let from_path = base_path.clone().join(source_file_name);
1513
1514 let dest_file = dest_dir.join("copied_file.txt");
1516 let dest_str = dest_file.to_str().unwrap();
1517 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1518
1519 store.copy(&from_path, &to_path).await.unwrap();
1521
1522 assert!(dest_file.exists());
1524 let copied_content = std::fs::read(&dest_file).unwrap();
1525 assert_eq!(copied_content, b"test content");
1526 }
1527
1528 #[tokio::test]
1529 async fn test_copy_creates_parent_directories() {
1530 let source_dir = TempStdDir::default();
1531 let dest_dir = TempStdDir::default();
1532
1533 let source_file_name = "test_file.txt";
1535 let source_file = source_dir.join(source_file_name);
1536 std::fs::write(&source_file, b"test content").unwrap();
1537
1538 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1540 .await
1541 .unwrap();
1542
1543 let from_path = base_path.clone().join(source_file_name);
1545
1546 let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1548 let dest_str = dest_file.to_str().unwrap();
1549 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1550
1551 store.copy(&from_path, &to_path).await.unwrap();
1553
1554 assert!(dest_file.exists());
1556 assert!(dest_file.parent().unwrap().exists());
1557 let copied_content = std::fs::read(&dest_file).unwrap();
1558 assert_eq!(copied_content, b"test content");
1559 }
1560
1561 #[derive(Debug)]
1566 struct CopyFailingStore {
1567 inner: InMemory,
1568 }
1569
1570 impl Display for CopyFailingStore {
1571 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1572 write!(f, "CopyFailingStore")
1573 }
1574 }
1575
1576 #[async_trait]
1577 impl OSObjectStore for CopyFailingStore {
1578 async fn put_opts(
1579 &self,
1580 location: &Path,
1581 bytes: PutPayload,
1582 opts: PutOptions,
1583 ) -> OSResult<PutResult> {
1584 self.inner.put_opts(location, bytes, opts).await
1585 }
1586 async fn put_multipart_opts(
1587 &self,
1588 location: &Path,
1589 opts: PutMultipartOptions,
1590 ) -> OSResult<Box<dyn MultipartUpload>> {
1591 self.inner.put_multipart_opts(location, opts).await
1592 }
1593 async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
1594 self.inner.get_opts(location, options).await
1595 }
1596 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> OSResult<Vec<Bytes>> {
1597 self.inner.get_ranges(location, ranges).await
1598 }
1599 fn delete_stream(
1600 &self,
1601 locations: BoxStream<'static, OSResult<Path>>,
1602 ) -> BoxStream<'static, OSResult<Path>> {
1603 self.inner.delete_stream(locations)
1604 }
1605 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult<ObjectMeta>> {
1606 self.inner.list(prefix)
1607 }
1608 fn list_with_offset(
1609 &self,
1610 prefix: Option<&Path>,
1611 offset: &Path,
1612 ) -> BoxStream<'static, OSResult<ObjectMeta>> {
1613 self.inner.list_with_offset(prefix, offset)
1614 }
1615 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
1616 self.inner.list_with_delimiter(prefix).await
1617 }
1618 async fn copy_opts(&self, _from: &Path, _to: &Path, _opts: CopyOptions) -> OSResult<()> {
1619 Err(object_store::Error::Generic {
1620 store: "CopyFailingStore",
1621 source: "single-shot copy disabled in test".into(),
1622 })
1623 }
1624 }
1625
1626 #[tokio::test]
1627 async fn test_copy_streams_objects_larger_than_threshold() {
1628 let mut store = ObjectStore::memory();
1634 store.inner = Arc::new(CopyFailingStore {
1635 inner: InMemory::new(),
1636 });
1637
1638 let from = Path::from("source.bin");
1639 let contents = b"streaming multipart copy payload well past the tiny threshold";
1640 store.put(&from, contents).await.unwrap();
1641
1642 let streamed = Path::from("streamed.bin");
1645 store.copy_impl(&from, &streamed, true, 8).await.unwrap();
1646 let copied = store.read_one_all(&streamed).await.unwrap();
1647 assert_eq!(copied.as_ref(), contents.as_slice());
1648
1649 let native = Path::from("native.bin");
1653 assert!(
1654 store
1655 .copy_impl(&from, &native, true, u64::MAX)
1656 .await
1657 .is_err()
1658 );
1659 }
1660
1661 #[test]
1662 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1663 fn test_client_options_extracts_headers() {
1664 let opts = StorageOptions(HashMap::from([
1665 ("headers.x-custom-foo".to_string(), "bar".to_string()),
1666 ("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
1667 ("region".to_string(), "us-west-2".to_string()),
1668 ]));
1669 let client_options = opts.client_options().unwrap();
1670
1671 let opts_no_headers = StorageOptions(HashMap::from([(
1674 "region".to_string(),
1675 "us-west-2".to_string(),
1676 )]));
1677 opts_no_headers.client_options().unwrap();
1678
1679 #[cfg(feature = "gcp")]
1683 {
1684 use object_store::gcp::GoogleCloudStorageBuilder;
1685 let _builder = GoogleCloudStorageBuilder::new()
1686 .with_client_options(client_options)
1687 .with_url("gs://test-bucket");
1688 }
1689 }
1690
1691 #[test]
1692 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1693 fn test_client_options_rejects_invalid_header_name() {
1694 let opts = StorageOptions(HashMap::from([(
1695 "headers.bad header".to_string(),
1696 "value".to_string(),
1697 )]));
1698 let err = opts.client_options().unwrap_err();
1699 assert!(err.to_string().contains("invalid header name"));
1700 }
1701
1702 #[test]
1703 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1704 fn test_client_options_rejects_invalid_header_value() {
1705 let opts = StorageOptions(HashMap::from([(
1706 "headers.x-good-name".to_string(),
1707 "bad\x01value".to_string(),
1708 )]));
1709 let err = opts.client_options().unwrap_err();
1710 assert!(err.to_string().contains("invalid header value"));
1711 }
1712
1713 #[test]
1714 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
1715 fn test_client_options_empty_when_no_header_keys() {
1716 let opts = StorageOptions(HashMap::from([
1717 ("region".to_string(), "us-east-1".to_string()),
1718 ("access_key_id".to_string(), "AKID".to_string()),
1719 ]));
1720 opts.client_options().unwrap();
1721 }
1722}