1use std::collections::HashMap;
7use std::ops::Range;
8use std::pin::Pin;
9use std::str::FromStr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use bytes::Bytes;
15use chrono::{DateTime, Utc};
16use deepsize::DeepSizeOf;
17use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
18use futures::{FutureExt, Stream};
19use lance_core::error::LanceOptionExt;
20use lance_core::utils::parse::str_is_truthy;
21use list_retry::ListRetryStream;
22#[cfg(feature = "aws")]
23use object_store::aws::AwsCredentialProvider;
24use object_store::DynObjectStore;
25use object_store::Error as ObjectStoreError;
26use object_store::{path::Path, ObjectMeta, ObjectStore as OSObjectStore};
27use providers::local::FileStoreProvider;
28use providers::memory::MemoryStoreProvider;
29use shellexpand::tilde;
30use snafu::location;
31use tokio::io::AsyncWriteExt;
32use url::Url;
33
34use super::local::LocalObjectReader;
35mod list_retry;
36pub mod providers;
37pub mod storage_options;
38mod tracing;
39use crate::object_reader::SmallReader;
40use crate::object_writer::WriteResult;
41use crate::utils::tracking_store::{IOTracker, IoStats};
42use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
43use lance_core::{Error, Result};
44
45pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
50pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
52
53const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
55const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
58 std::env::var("LANCE_MAX_IOP_SIZE")
59 .map(|val| val.parse().unwrap())
60 .unwrap_or(16 * 1024 * 1024)
61});
62
63pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
64
65pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
66pub use storage_options::{
67 LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor, StorageOptionsProvider,
68 EXPIRES_AT_MILLIS_KEY, REFRESH_OFFSET_MILLIS_KEY,
69};
70
71#[async_trait]
72pub trait ObjectStoreExt {
73 async fn exists(&self, path: &Path) -> Result<bool>;
75
76 fn read_dir_all<'a, 'b>(
80 &'a self,
81 dir_path: impl Into<&'b Path> + Send,
82 unmodified_since: Option<DateTime<Utc>>,
83 ) -> BoxStream<'a, Result<ObjectMeta>>;
84}
85
86#[async_trait]
87impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
88 fn read_dir_all<'a, 'b>(
89 &'a self,
90 dir_path: impl Into<&'b Path> + Send,
91 unmodified_since: Option<DateTime<Utc>>,
92 ) -> BoxStream<'a, Result<ObjectMeta>> {
93 let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
94 if let Some(unmodified_since_val) = unmodified_since {
95 output
96 .try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
97 .boxed()
98 } else {
99 output.boxed()
100 }
101 }
102
103 async fn exists(&self, path: &Path) -> Result<bool> {
104 match self.head(path).await {
105 Ok(_) => Ok(true),
106 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
107 Err(e) => Err(e.into()),
108 }
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct ObjectStore {
115 pub inner: Arc<dyn OSObjectStore>,
117 scheme: String,
118 block_size: usize,
119 max_iop_size: u64,
120 pub use_constant_size_upload_parts: bool,
123 pub list_is_lexically_ordered: bool,
126 io_parallelism: usize,
127 download_retry_count: usize,
129 io_tracker: IOTracker,
131 pub store_prefix: String,
135}
136
137impl DeepSizeOf for ObjectStore {
138 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
139 self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
144 }
145}
146
147impl std::fmt::Display for ObjectStore {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 write!(f, "ObjectStore({})", self.scheme)
150 }
151}
152
153pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
154 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
159}
160
161#[derive(Debug, Clone)]
162pub struct ChainedWrappingObjectStore {
163 wrappers: Vec<Arc<dyn WrappingObjectStore>>,
164}
165
166impl ChainedWrappingObjectStore {
167 pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
168 Self { wrappers }
169 }
170
171 pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
172 self.wrappers.push(wrapper);
173 }
174}
175
176impl WrappingObjectStore for ChainedWrappingObjectStore {
177 fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
178 self.wrappers
179 .iter()
180 .fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
181 }
182}
183
184#[derive(Debug, Clone)]
187pub struct ObjectStoreParams {
188 pub block_size: Option<usize>,
189 #[deprecated(note = "Implement an ObjectStoreProvider instead")]
190 pub object_store: Option<(Arc<DynObjectStore>, Url)>,
191 pub s3_credentials_refresh_offset: Duration,
194 #[cfg(feature = "aws")]
195 pub aws_credentials: Option<AwsCredentialProvider>,
196 pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
197 pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
203 pub use_constant_size_upload_parts: bool,
208 pub list_is_lexically_ordered: Option<bool>,
209}
210
211impl Default for ObjectStoreParams {
212 fn default() -> Self {
213 #[allow(deprecated)]
214 Self {
215 object_store: None,
216 block_size: None,
217 s3_credentials_refresh_offset: Duration::from_secs(60),
218 #[cfg(feature = "aws")]
219 aws_credentials: None,
220 object_store_wrapper: None,
221 storage_options_accessor: None,
222 use_constant_size_upload_parts: false,
223 list_is_lexically_ordered: None,
224 }
225 }
226}
227
228impl ObjectStoreParams {
229 pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
231 self.storage_options_accessor.clone()
232 }
233
234 pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
238 self.storage_options_accessor
239 .as_ref()
240 .and_then(|a| a.initial_storage_options())
241 }
242}
243
244impl std::hash::Hash for ObjectStoreParams {
246 #[allow(deprecated)]
247 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
248 self.block_size.hash(state);
250 if let Some((store, url)) = &self.object_store {
251 Arc::as_ptr(store).hash(state);
252 url.hash(state);
253 }
254 self.s3_credentials_refresh_offset.hash(state);
255 #[cfg(feature = "aws")]
256 if let Some(aws_credentials) = &self.aws_credentials {
257 Arc::as_ptr(aws_credentials).hash(state);
258 }
259 if let Some(wrapper) = &self.object_store_wrapper {
260 Arc::as_ptr(wrapper).hash(state);
261 }
262 if let Some(accessor) = &self.storage_options_accessor {
263 accessor.accessor_id().hash(state);
264 }
265 self.use_constant_size_upload_parts.hash(state);
266 self.list_is_lexically_ordered.hash(state);
267 }
268}
269
270impl Eq for ObjectStoreParams {}
272impl PartialEq for ObjectStoreParams {
273 #[allow(deprecated)]
274 fn eq(&self, other: &Self) -> bool {
275 #[cfg(feature = "aws")]
276 if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
277 return false;
278 }
279
280 self.block_size == other.block_size
283 && self
284 .object_store
285 .as_ref()
286 .map(|(store, url)| (Arc::as_ptr(store), url))
287 == other
288 .object_store
289 .as_ref()
290 .map(|(store, url)| (Arc::as_ptr(store), url))
291 && self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
292 && self.object_store_wrapper.as_ref().map(Arc::as_ptr)
293 == other.object_store_wrapper.as_ref().map(Arc::as_ptr)
294 && self
295 .storage_options_accessor
296 .as_ref()
297 .map(|a| a.accessor_id())
298 == other
299 .storage_options_accessor
300 .as_ref()
301 .map(|a| a.accessor_id())
302 && self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
303 && self.list_is_lexically_ordered == other.list_is_lexically_ordered
304 }
305}
306
307pub fn uri_to_url(uri: &str) -> Result<Url> {
328 match Url::parse(uri) {
329 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
330 local_path_to_url(uri)
332 }
333 Ok(url) => Ok(url),
334 Err(_) => local_path_to_url(uri),
335 }
336}
337
338fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
339 let expanded = tilde(str_path.as_ref()).to_string();
340
341 let mut expanded_path = path_abs::PathAbs::new(expanded)
342 .unwrap()
343 .as_path()
344 .to_path_buf();
345 if let Some(s) = expanded_path.as_path().to_str() {
347 if s.is_empty() {
348 expanded_path = std::env::current_dir()?;
349 }
350 }
351
352 Ok(expanded_path)
353}
354
355fn local_path_to_url(str_path: &str) -> Result<Url> {
356 let expanded_path = expand_path(str_path)?;
357
358 Url::from_directory_path(expanded_path).map_err(|_| Error::InvalidInput {
359 source: format!("Invalid table location: '{}'", str_path).into(),
360 location: location!(),
361 })
362}
363
364#[cfg(feature = "huggingface")]
365fn parse_hf_repo_id(url: &Url) -> Result<String> {
366 let mut segments: Vec<String> = Vec::new();
368 if let Some(host) = url.host_str() {
369 segments.push(host.to_string());
370 }
371 segments.extend(
372 url.path()
373 .trim_start_matches('/')
374 .split('/')
375 .map(|s| s.to_string()),
376 );
377
378 if segments.len() < 2 {
379 return Err(Error::invalid_input(
380 "Huggingface URL must contain at least owner and repo",
381 location!(),
382 ));
383 }
384
385 let repo_type_candidates = ["models", "datasets", "spaces"];
386 let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
387 if segments.len() < 3 {
388 return Err(Error::invalid_input(
389 "Huggingface URL missing owner/repo after repo type",
390 location!(),
391 ));
392 }
393 (segments[1].as_str(), segments[2].as_str())
394 } else {
395 (segments[0].as_str(), segments[1].as_str())
396 };
397
398 let repo = repo_with_rev
399 .split_once('@')
400 .map(|(r, _)| r)
401 .unwrap_or(repo_with_rev);
402 Ok(format!("{owner}/{repo}"))
403}
404
405impl ObjectStore {
406 pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
414 let registry = Arc::new(ObjectStoreRegistry::default());
415
416 Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
417 }
418
419 pub async fn from_uri_and_params(
423 registry: Arc<ObjectStoreRegistry>,
424 uri: &str,
425 params: &ObjectStoreParams,
426 ) -> Result<(Arc<Self>, Path)> {
427 #[allow(deprecated)]
428 if let Some((store, path)) = params.object_store.as_ref() {
429 let mut inner = store.clone();
430 let store_prefix =
431 registry.calculate_object_store_prefix(uri, params.storage_options())?;
432 if let Some(wrapper) = params.object_store_wrapper.as_ref() {
433 inner = wrapper.wrap(&store_prefix, inner);
434 }
435
436 let io_tracker = IOTracker::default();
438 let tracked_store = io_tracker.wrap("", inner);
439
440 let store = Self {
441 inner: tracked_store,
442 scheme: path.scheme().to_string(),
443 block_size: params.block_size.unwrap_or(64 * 1024),
444 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
445 use_constant_size_upload_parts: params.use_constant_size_upload_parts,
446 list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
447 io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
448 download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
449 io_tracker,
450 store_prefix: String::new(), };
452 let path = Path::parse(path.path())?;
453 return Ok((Arc::new(store), path));
454 }
455 let url = uri_to_url(uri)?;
456
457 let store = registry.get_store(url.clone(), params).await?;
458 let provider = registry.get_provider(url.scheme()).expect_ok()?;
460 let path = provider.extract_path(&url)?;
461
462 Ok((store, path))
463 }
464
465 pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
479 let url = uri_to_url(uri)?;
480 let provider = registry.get_provider(url.scheme()).ok_or_else(|| {
481 Error::invalid_input(format!("Unknown scheme: {}", url.scheme()), location!())
482 })?;
483 provider.extract_path(&url)
484 }
485
486 #[deprecated(note = "Use `from_uri` instead")]
487 pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
488 Self::from_uri_and_params(
489 Arc::new(ObjectStoreRegistry::default()),
490 str_path,
491 &Default::default(),
492 )
493 .now_or_never()
494 .unwrap()
495 }
496
497 pub fn local() -> Self {
499 let provider = FileStoreProvider;
500 provider
501 .new_store(Url::parse("file:///").unwrap(), &Default::default())
502 .now_or_never()
503 .unwrap()
504 .unwrap()
505 }
506
507 pub fn memory() -> Self {
509 let provider = MemoryStoreProvider;
510 provider
511 .new_store(Url::parse("memory:///").unwrap(), &Default::default())
512 .now_or_never()
513 .unwrap()
514 .unwrap()
515 }
516
517 pub fn is_local(&self) -> bool {
519 self.scheme == "file"
520 }
521
522 pub fn is_cloud(&self) -> bool {
523 self.scheme != "file" && self.scheme != "memory"
524 }
525
526 pub fn scheme(&self) -> &str {
527 &self.scheme
528 }
529
530 pub fn block_size(&self) -> usize {
531 self.block_size
532 }
533
534 pub fn max_iop_size(&self) -> u64 {
535 self.max_iop_size
536 }
537
538 pub fn io_parallelism(&self) -> usize {
539 std::env::var("LANCE_IO_THREADS")
540 .map(|val| val.parse::<usize>().unwrap())
541 .unwrap_or(self.io_parallelism)
542 }
543
544 pub fn io_tracker(&self) -> &IOTracker {
549 &self.io_tracker
550 }
551
552 pub fn io_stats_snapshot(&self) -> IoStats {
557 self.io_tracker.stats()
558 }
559
560 pub fn io_stats_incremental(&self) -> IoStats {
566 self.io_tracker.incremental_stats()
567 }
568
569 pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
574 match self.scheme.as_str() {
575 "file" => {
576 LocalObjectReader::open_with_tracker(
577 path,
578 self.block_size,
579 None,
580 Arc::new(self.io_tracker.clone()),
581 )
582 .await
583 }
584 _ => Ok(Box::new(CloudObjectReader::new(
585 self.inner.clone(),
586 path.clone(),
587 self.block_size,
588 None,
589 self.download_retry_count,
590 )?)),
591 }
592 }
593
594 pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
600 if known_size <= self.block_size {
603 return Ok(Box::new(SmallReader::new(
604 self.inner.clone(),
605 path.clone(),
606 self.download_retry_count,
607 known_size,
608 )));
609 }
610
611 match self.scheme.as_str() {
612 "file" => {
613 LocalObjectReader::open_with_tracker(
614 path,
615 self.block_size,
616 Some(known_size),
617 Arc::new(self.io_tracker.clone()),
618 )
619 .await
620 }
621 _ => Ok(Box::new(CloudObjectReader::new(
622 self.inner.clone(),
623 path.clone(),
624 self.block_size,
625 Some(known_size),
626 self.download_retry_count,
627 )?)),
628 }
629 }
630
631 pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
633 let object_store = Self::local();
634 let absolute_path = expand_path(path.to_string_lossy())?;
635 let os_path = Path::from_absolute_path(absolute_path)?;
636 object_store.create(&os_path).await
637 }
638
639 pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
641 let object_store = Self::local();
642 let absolute_path = expand_path(path.to_string_lossy())?;
643 let os_path = Path::from_absolute_path(absolute_path)?;
644 object_store.open(&os_path).await
645 }
646
647 pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
649 ObjectWriter::new(self, path).await
650 }
651
652 pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
654 let mut writer = self.create(path).await?;
655 writer.write_all(content).await?;
656 writer.shutdown().await
657 }
658
659 pub async fn delete(&self, path: &Path) -> Result<()> {
660 self.inner.delete(path).await?;
661 Ok(())
662 }
663
664 pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
665 if self.is_local() {
666 return super::local::copy_file(from, to);
668 }
669 Ok(self.inner.copy(from, to).await?)
670 }
671
672 pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
674 let path = dir_path.into();
675 let path = Path::parse(&path)?;
676 let output = self.inner.list_with_delimiter(Some(&path)).await?;
677 Ok(output
678 .common_prefixes
679 .iter()
680 .chain(output.objects.iter().map(|o| &o.location))
681 .map(|s| s.filename().unwrap().to_string())
682 .collect())
683 }
684
685 pub fn list(
686 &self,
687 path: Option<Path>,
688 ) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
689 Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
690 }
691
692 pub fn read_dir_all<'a, 'b>(
696 &'a self,
697 dir_path: impl Into<&'b Path> + Send,
698 unmodified_since: Option<DateTime<Utc>>,
699 ) -> BoxStream<'a, Result<ObjectMeta>> {
700 self.inner.read_dir_all(dir_path, unmodified_since)
701 }
702
703 pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
705 let path = dir_path.into();
706 let path = Path::parse(&path)?;
707
708 if self.is_local() {
709 return super::local::remove_dir_all(&path);
711 }
712 let sub_entries = self
713 .inner
714 .list(Some(&path))
715 .map(|m| m.map(|meta| meta.location))
716 .boxed();
717 self.inner
718 .delete_stream(sub_entries)
719 .try_collect::<Vec<_>>()
720 .await?;
721 Ok(())
722 }
723
724 pub fn remove_stream<'a>(
725 &'a self,
726 locations: BoxStream<'a, Result<Path>>,
727 ) -> BoxStream<'a, Result<Path>> {
728 self.inner
729 .delete_stream(locations.err_into::<ObjectStoreError>().boxed())
730 .err_into::<Error>()
731 .boxed()
732 }
733
734 pub async fn exists(&self, path: &Path) -> Result<bool> {
736 match self.inner.head(path).await {
737 Ok(_) => Ok(true),
738 Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
739 Err(e) => Err(e.into()),
740 }
741 }
742
743 pub async fn size(&self, path: &Path) -> Result<u64> {
745 Ok(self.inner.head(path).await?.size)
746 }
747
748 pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
750 let reader = self.open(path).await?;
751 Ok(reader.get_all().await?)
752 }
753
754 pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
759 let reader = self.open(path).await?;
760 Ok(reader.get_range(range).await?)
761 }
762}
763
764#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
766pub enum LanceConfigKey {
767 DownloadRetryCount,
769}
770
771impl FromStr for LanceConfigKey {
772 type Err = Error;
773
774 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
775 match s.to_ascii_lowercase().as_str() {
776 "download_retry_count" => Ok(Self::DownloadRetryCount),
777 _ => Err(Error::InvalidInput {
778 source: format!("Invalid LanceConfigKey: {}", s).into(),
779 location: location!(),
780 }),
781 }
782 }
783}
784
785#[derive(Clone, Debug, Default)]
786pub struct StorageOptions(pub HashMap<String, String>);
787
788impl StorageOptions {
789 pub fn new(options: HashMap<String, String>) -> Self {
791 let mut options = options;
792 if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
793 options.insert("allow_http".into(), value);
794 }
795 if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
796 options.insert("allow_http".into(), value);
797 }
798 if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
799 options.insert("allow_http".into(), value);
800 }
801 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
802 options.insert("client_max_retries".into(), value);
803 }
804 if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
805 options.insert("client_retry_timeout".into(), value);
806 }
807 Self(options)
808 }
809
810 pub fn allow_http(&self) -> bool {
812 self.0.iter().any(|(key, value)| {
813 key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
814 })
815 }
816
817 pub fn download_retry_count(&self) -> usize {
819 self.0
820 .iter()
821 .find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
822 .map(|(_, value)| value.parse::<usize>().unwrap_or(3))
823 .unwrap_or(3)
824 }
825
826 pub fn client_max_retries(&self) -> usize {
828 self.0
829 .iter()
830 .find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
831 .and_then(|(_, value)| value.parse::<usize>().ok())
832 .unwrap_or(10)
833 }
834
835 pub fn client_retry_timeout(&self) -> u64 {
837 self.0
838 .iter()
839 .find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
840 .and_then(|(_, value)| value.parse::<u64>().ok())
841 .unwrap_or(180)
842 }
843
844 pub fn get(&self, key: &str) -> Option<&String> {
845 self.0.get(key)
846 }
847
848 pub fn expires_at_millis(&self) -> Option<u64> {
850 self.0
851 .get(EXPIRES_AT_MILLIS_KEY)
852 .and_then(|s| s.parse::<u64>().ok())
853 }
854}
855
856impl From<HashMap<String, String>> for StorageOptions {
857 fn from(value: HashMap<String, String>) -> Self {
858 Self::new(value)
859 }
860}
861
862static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
863 std::sync::LazyLock::new(ObjectStoreRegistry::default);
864
865impl ObjectStore {
866 #[allow(clippy::too_many_arguments)]
867 pub fn new(
868 store: Arc<DynObjectStore>,
869 location: Url,
870 block_size: Option<usize>,
871 wrapper: Option<Arc<dyn WrappingObjectStore>>,
872 use_constant_size_upload_parts: bool,
873 list_is_lexically_ordered: bool,
874 io_parallelism: usize,
875 download_retry_count: usize,
876 storage_options: Option<&HashMap<String, String>>,
877 ) -> Self {
878 let scheme = location.scheme();
879 let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
880
881 let store_prefix = DEFAULT_OBJECT_STORE_REGISTRY
882 .calculate_object_store_prefix(location.as_ref(), storage_options)
883 .unwrap_or_default();
884
885 let store = match wrapper {
886 Some(wrapper) => wrapper.wrap(&store_prefix, store),
887 None => store,
888 };
889
890 let io_tracker = IOTracker::default();
892 let tracked_store = io_tracker.wrap("", store);
893
894 Self {
895 inner: tracked_store,
896 scheme: scheme.into(),
897 block_size,
898 max_iop_size: *DEFAULT_MAX_IOP_SIZE,
899 use_constant_size_upload_parts,
900 list_is_lexically_ordered,
901 io_parallelism,
902 download_retry_count,
903 io_tracker,
904 store_prefix,
905 }
906 }
907}
908
909fn infer_block_size(scheme: &str) -> usize {
910 match scheme {
914 "file" => 4 * 1024,
915 _ => 64 * 1024,
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use super::*;
922 use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
923 use object_store::memory::InMemory;
924 use rstest::rstest;
925 use std::env::set_current_dir;
926 use std::fs::{create_dir_all, write};
927 use std::path::Path as StdPath;
928 use std::sync::atomic::{AtomicBool, Ordering};
929
930 fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
932 let expanded = tilde(path_str).to_string();
933 let path = StdPath::new(&expanded);
934 std::fs::create_dir_all(path.parent().unwrap())?;
935 write(path, contents)
936 }
937
938 async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
939 let test_file_store = store.open(path).await.unwrap();
940 let size = test_file_store.size().await.unwrap();
941 let bytes = test_file_store.get_range(0..size).await.unwrap();
942 let contents = String::from_utf8(bytes.to_vec()).unwrap();
943 Ok(contents)
944 }
945
946 #[tokio::test]
947 async fn test_absolute_paths() {
948 let tmp_path = TempStrDir::default();
949 write_to_file(
950 &format!("{tmp_path}/bar/foo.lance/test_file"),
951 "TEST_CONTENT",
952 )
953 .unwrap();
954
955 for uri in &[
957 format!("{tmp_path}/bar/foo.lance"),
958 format!("{tmp_path}/./bar/foo.lance"),
959 format!("{tmp_path}/bar/foo.lance/../foo.lance"),
960 ] {
961 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
962 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
963 .await
964 .unwrap();
965 assert_eq!(contents, "TEST_CONTENT");
966 }
967 }
968
969 #[tokio::test]
970 async fn test_cloud_paths() {
971 let uri = "s3://bucket/foo.lance";
972 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
973 assert_eq!(store.scheme, "s3");
974 assert_eq!(path.to_string(), "foo.lance");
975
976 let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
977 .await
978 .unwrap();
979 assert_eq!(store.scheme, "s3");
980 assert_eq!(path.to_string(), "foo.lance");
981
982 let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
983 .await
984 .unwrap();
985 assert_eq!(store.scheme, "gs");
986 assert_eq!(path.to_string(), "foo.lance");
987 }
988
989 async fn test_block_size_used_test_helper(
990 uri: &str,
991 storage_options: Option<HashMap<String, String>>,
992 default_expected_block_size: usize,
993 ) {
994 let registry = Arc::new(ObjectStoreRegistry::default());
996 let accessor = storage_options
997 .clone()
998 .map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
999 let params = ObjectStoreParams {
1000 storage_options_accessor: accessor.clone(),
1001 ..ObjectStoreParams::default()
1002 };
1003 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1004 .await
1005 .unwrap();
1006 assert_eq!(store.block_size, default_expected_block_size);
1007
1008 let registry = Arc::new(ObjectStoreRegistry::default());
1010 let params = ObjectStoreParams {
1011 block_size: Some(1024),
1012 storage_options_accessor: accessor,
1013 ..ObjectStoreParams::default()
1014 };
1015 let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
1016 .await
1017 .unwrap();
1018 assert_eq!(store.block_size, 1024);
1019 }
1020
1021 #[rstest]
1022 #[case("s3://bucket/foo.lance", None)]
1023 #[case("gs://bucket/foo.lance", None)]
1024 #[case("az://account/bucket/foo.lance",
1025 Some(HashMap::from([
1026 (String::from("account_name"), String::from("account")),
1027 (String::from("container_name"), String::from("container"))
1028 ])))]
1029 #[tokio::test]
1030 async fn test_block_size_used_cloud(
1031 #[case] uri: &str,
1032 #[case] storage_options: Option<HashMap<String, String>>,
1033 ) {
1034 test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
1035 }
1036
1037 #[rstest]
1038 #[case("file")]
1039 #[case("file-object-store")]
1040 #[case("memory:///bucket/foo.lance")]
1041 #[tokio::test]
1042 async fn test_block_size_used_file(#[case] prefix: &str) {
1043 let tmp_path = TempStrDir::default();
1044 let path = format!("{tmp_path}/bar/foo.lance/test_file");
1045 write_to_file(&path, "URL").unwrap();
1046 let uri = format!("{prefix}:///{path}");
1047 test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
1048 }
1049
1050 #[tokio::test]
1051 async fn test_relative_paths() {
1052 let tmp_path = TempStrDir::default();
1053 write_to_file(
1054 &format!("{tmp_path}/bar/foo.lance/test_file"),
1055 "RELATIVE_URL",
1056 )
1057 .unwrap();
1058
1059 set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
1060 let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
1061
1062 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1063 .await
1064 .unwrap();
1065 assert_eq!(contents, "RELATIVE_URL");
1066 }
1067
1068 #[tokio::test]
1069 async fn test_tilde_expansion() {
1070 let uri = "~/foo.lance";
1071 write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
1072 let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
1073 let contents = read_from_store(store.as_ref(), &path.child("test_file"))
1074 .await
1075 .unwrap();
1076 assert_eq!(contents, "TILDE");
1077 }
1078
1079 #[tokio::test]
1080 async fn test_read_directory() {
1081 let path = TempStdDir::default();
1082 create_dir_all(path.join("foo").join("bar")).unwrap();
1083 create_dir_all(path.join("foo").join("zoo")).unwrap();
1084 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1085 write_to_file(
1086 path.join("foo").join("test_file").to_str().unwrap(),
1087 "read_dir",
1088 )
1089 .unwrap();
1090 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1091
1092 let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
1093 assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
1094 }
1095
1096 #[tokio::test]
1097 async fn test_delete_directory() {
1098 let path = TempStdDir::default();
1099 create_dir_all(path.join("foo").join("bar")).unwrap();
1100 create_dir_all(path.join("foo").join("zoo")).unwrap();
1101 create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
1102 write_to_file(
1103 path.join("foo")
1104 .join("bar")
1105 .join("test_file")
1106 .to_str()
1107 .unwrap(),
1108 "delete",
1109 )
1110 .unwrap();
1111 write_to_file(path.join("foo").join("top").to_str().unwrap(), "delete_top").unwrap();
1112 let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
1113 store.remove_dir_all(base.child("foo")).await.unwrap();
1114
1115 assert!(!path.join("foo").exists());
1116 }
1117
1118 #[derive(Debug)]
1119 struct TestWrapper {
1120 called: AtomicBool,
1121
1122 return_value: Arc<dyn OSObjectStore>,
1123 }
1124
1125 impl WrappingObjectStore for TestWrapper {
1126 fn wrap(
1127 &self,
1128 _store_prefix: &str,
1129 _original: Arc<dyn OSObjectStore>,
1130 ) -> Arc<dyn OSObjectStore> {
1131 self.called.store(true, Ordering::Relaxed);
1132
1133 self.return_value.clone()
1135 }
1136 }
1137
1138 impl TestWrapper {
1139 fn called(&self) -> bool {
1140 self.called.load(Ordering::Relaxed)
1141 }
1142 }
1143
1144 #[tokio::test]
1145 async fn test_wrapping_object_store_option_is_used() {
1146 let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
1148 let registry = Arc::new(ObjectStoreRegistry::default());
1149
1150 assert_eq!(Arc::strong_count(&mock_inner_store), 1);
1151
1152 let wrapper = Arc::new(TestWrapper {
1153 called: AtomicBool::new(false),
1154 return_value: mock_inner_store.clone(),
1155 });
1156
1157 let params = ObjectStoreParams {
1158 object_store_wrapper: Some(wrapper.clone()),
1159 ..ObjectStoreParams::default()
1160 };
1161
1162 assert!(!wrapper.called());
1164
1165 let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
1166 .await
1167 .unwrap();
1168
1169 assert!(wrapper.called());
1171
1172 assert_eq!(Arc::strong_count(&mock_inner_store), 2);
1175 }
1176
1177 #[tokio::test]
1178 async fn test_local_paths() {
1179 let file_path = TempStdFile::default();
1180 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1181 writer.write_all(b"LOCAL").await.unwrap();
1182 writer.shutdown().await.unwrap();
1183
1184 let reader = ObjectStore::open_local(&file_path).await.unwrap();
1185 let buf = reader.get_range(0..5).await.unwrap();
1186 assert_eq!(buf.as_ref(), b"LOCAL");
1187 }
1188
1189 #[tokio::test]
1190 async fn test_read_one() {
1191 let file_path = TempStdFile::default();
1192 let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
1193 writer.write_all(b"LOCAL").await.unwrap();
1194 writer.shutdown().await.unwrap();
1195
1196 let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
1197 let obj_store = ObjectStore::local();
1198 let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
1199 assert_eq!(buf.as_ref(), b"LOCAL");
1200
1201 let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
1202 assert_eq!(buf.as_ref(), b"LOCAL");
1203 }
1204
1205 #[tokio::test]
1206 #[cfg(windows)]
1207 async fn test_windows_paths() {
1208 use std::path::Component;
1209 use std::path::Prefix;
1210 use std::path::Prefix::*;
1211
1212 fn get_path_prefix(path: &StdPath) -> Prefix {
1213 match path.components().next().unwrap() {
1214 Component::Prefix(prefix_component) => prefix_component.kind(),
1215 _ => panic!(),
1216 }
1217 }
1218
1219 fn get_drive_letter(prefix: Prefix) -> String {
1220 match prefix {
1221 Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
1222 _ => panic!(),
1223 }
1224 }
1225
1226 let tmp_path = TempStdFile::default();
1227 let prefix = get_path_prefix(&tmp_path);
1228 let drive_letter = get_drive_letter(prefix);
1229
1230 write_to_file(
1231 &(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
1232 "WINDOWS",
1233 )
1234 .unwrap();
1235
1236 for uri in &[
1237 format!("{drive_letter}:/test_folder/test.lance"),
1238 format!("{drive_letter}:\\test_folder\\test.lance"),
1239 ] {
1240 let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
1241 let contents = read_from_store(store.as_ref(), &base.child("test_file"))
1242 .await
1243 .unwrap();
1244 assert_eq!(contents, "WINDOWS");
1245 }
1246 }
1247
1248 #[tokio::test]
1249 async fn test_cross_filesystem_copy() {
1250 let source_dir = TempStdDir::default();
1252 let dest_dir = TempStdDir::default();
1253
1254 let source_file_name = "test_file.txt";
1256 let source_file = source_dir.join(source_file_name);
1257 std::fs::write(&source_file, b"test content").unwrap();
1258
1259 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1261 .await
1262 .unwrap();
1263
1264 let from_path = base_path.child(source_file_name);
1266
1267 let dest_file = dest_dir.join("copied_file.txt");
1269 let dest_str = dest_file.to_str().unwrap();
1270 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1271
1272 store.copy(&from_path, &to_path).await.unwrap();
1274
1275 assert!(dest_file.exists());
1277 let copied_content = std::fs::read(&dest_file).unwrap();
1278 assert_eq!(copied_content, b"test content");
1279 }
1280
1281 #[tokio::test]
1282 async fn test_copy_creates_parent_directories() {
1283 let source_dir = TempStdDir::default();
1284 let dest_dir = TempStdDir::default();
1285
1286 let source_file_name = "test_file.txt";
1288 let source_file = source_dir.join(source_file_name);
1289 std::fs::write(&source_file, b"test content").unwrap();
1290
1291 let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
1293 .await
1294 .unwrap();
1295
1296 let from_path = base_path.child(source_file_name);
1298
1299 let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
1301 let dest_str = dest_file.to_str().unwrap();
1302 let to_path = object_store::path::Path::parse(dest_str).unwrap();
1303
1304 store.copy(&from_path, &to_path).await.unwrap();
1306
1307 assert!(dest_file.exists());
1309 assert!(dest_file.parent().unwrap().exists());
1310 let copied_content = std::fs::read(&dest_file).unwrap();
1311 assert_eq!(copied_content, b"test content");
1312 }
1313}