use std::collections::HashMap;
use std::ops::Range;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use deepsize::DeepSizeOf;
use futures::{FutureExt, Stream};
use futures::{StreamExt, TryStreamExt, future, stream::BoxStream};
use lance_core::error::LanceOptionExt;
use lance_core::utils::parse::str_is_truthy;
use list_retry::ListRetryStream;
use object_store::DynObjectStore;
use object_store::Error as ObjectStoreError;
#[cfg(feature = "aws")]
use object_store::aws::AwsCredentialProvider;
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
use object_store::{ClientOptions, HeaderMap, HeaderValue};
use object_store::{ObjectMeta, ObjectStore as OSObjectStore, path::Path};
use providers::local::FileStoreProvider;
use providers::memory::MemoryStoreProvider;
use tokio::io::AsyncWriteExt;
use url::Url;
use super::local::LocalObjectReader;
mod list_retry;
pub mod providers;
pub mod storage_options;
mod tracing;
use crate::object_reader::SmallReader;
use crate::object_writer::{LocalWriter, WriteResult};
use crate::traits::Writer;
use crate::utils::tracking_store::{IOTracker, IoStats};
use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
use lance_core::{Error, Result};
pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;
const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024;
pub static DEFAULT_MAX_IOP_SIZE: std::sync::LazyLock<u64> = std::sync::LazyLock::new(|| {
std::env::var("LANCE_MAX_IOP_SIZE")
.map(|val| val.parse().unwrap())
.unwrap_or(16 * 1024 * 1024)
});
pub const DEFAULT_DOWNLOAD_RETRY_COUNT: usize = 3;
pub use providers::{ObjectStoreProvider, ObjectStoreRegistry};
pub use storage_options::{
EXPIRES_AT_MILLIS_KEY, LanceNamespaceStorageOptionsProvider, REFRESH_OFFSET_MILLIS_KEY,
StorageOptionsAccessor, StorageOptionsProvider,
};
#[async_trait]
pub trait ObjectStoreExt {
async fn exists(&self, path: &Path) -> Result<bool>;
fn read_dir_all<'a, 'b>(
&'a self,
dir_path: impl Into<&'b Path> + Send,
unmodified_since: Option<DateTime<Utc>>,
) -> BoxStream<'a, Result<ObjectMeta>>;
}
#[async_trait]
impl<O: OSObjectStore + ?Sized> ObjectStoreExt for O {
fn read_dir_all<'a, 'b>(
&'a self,
dir_path: impl Into<&'b Path> + Send,
unmodified_since: Option<DateTime<Utc>>,
) -> BoxStream<'a, Result<ObjectMeta>> {
let output = self.list(Some(dir_path.into())).map_err(|e| e.into());
if let Some(unmodified_since_val) = unmodified_since {
output
.try_filter(move |file| future::ready(file.last_modified <= unmodified_since_val))
.boxed()
} else {
output.boxed()
}
}
async fn exists(&self, path: &Path) -> Result<bool> {
match self.head(path).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
Err(e) => Err(e.into()),
}
}
}
#[derive(Debug, Clone)]
pub struct ObjectStore {
pub inner: Arc<dyn OSObjectStore>,
scheme: String,
block_size: usize,
max_iop_size: u64,
pub use_constant_size_upload_parts: bool,
pub list_is_lexically_ordered: bool,
io_parallelism: usize,
download_retry_count: usize,
io_tracker: IOTracker,
pub store_prefix: String,
}
impl DeepSizeOf for ObjectStore {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
self.scheme.deep_size_of_children(context) + self.block_size.deep_size_of_children(context)
}
}
impl std::fmt::Display for ObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ObjectStore({})", self.scheme)
}
}
pub trait WrappingObjectStore: std::fmt::Debug + Send + Sync {
fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore>;
}
#[derive(Debug, Clone)]
pub struct ChainedWrappingObjectStore {
wrappers: Vec<Arc<dyn WrappingObjectStore>>,
}
impl ChainedWrappingObjectStore {
pub fn new(wrappers: Vec<Arc<dyn WrappingObjectStore>>) -> Self {
Self { wrappers }
}
pub fn add_wrapper(&mut self, wrapper: Arc<dyn WrappingObjectStore>) {
self.wrappers.push(wrapper);
}
}
impl WrappingObjectStore for ChainedWrappingObjectStore {
fn wrap(&self, store_prefix: &str, original: Arc<dyn OSObjectStore>) -> Arc<dyn OSObjectStore> {
self.wrappers
.iter()
.fold(original, |acc, wrapper| wrapper.wrap(store_prefix, acc))
}
}
#[derive(Debug, Clone)]
pub struct ObjectStoreParams {
pub block_size: Option<usize>,
#[deprecated(note = "Implement an ObjectStoreProvider instead")]
pub object_store: Option<(Arc<DynObjectStore>, Url)>,
pub s3_credentials_refresh_offset: Duration,
#[cfg(feature = "aws")]
pub aws_credentials: Option<AwsCredentialProvider>,
pub object_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
pub storage_options_accessor: Option<Arc<StorageOptionsAccessor>>,
pub use_constant_size_upload_parts: bool,
pub list_is_lexically_ordered: Option<bool>,
}
impl Default for ObjectStoreParams {
fn default() -> Self {
#[allow(deprecated)]
Self {
object_store: None,
block_size: None,
s3_credentials_refresh_offset: Duration::from_secs(60),
#[cfg(feature = "aws")]
aws_credentials: None,
object_store_wrapper: None,
storage_options_accessor: None,
use_constant_size_upload_parts: false,
list_is_lexically_ordered: None,
}
}
}
impl ObjectStoreParams {
pub fn get_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
self.storage_options_accessor.clone()
}
pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
self.storage_options_accessor
.as_ref()
.and_then(|a| a.initial_storage_options())
}
}
impl std::hash::Hash for ObjectStoreParams {
#[allow(deprecated)]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.block_size.hash(state);
if let Some((store, url)) = &self.object_store {
Arc::as_ptr(store).hash(state);
url.hash(state);
}
self.s3_credentials_refresh_offset.hash(state);
#[cfg(feature = "aws")]
if let Some(aws_credentials) = &self.aws_credentials {
Arc::as_ptr(aws_credentials).hash(state);
}
if let Some(wrapper) = &self.object_store_wrapper {
Arc::as_ptr(wrapper).hash(state);
}
if let Some(accessor) = &self.storage_options_accessor {
accessor.accessor_id().hash(state);
}
self.use_constant_size_upload_parts.hash(state);
self.list_is_lexically_ordered.hash(state);
}
}
impl Eq for ObjectStoreParams {}
impl PartialEq for ObjectStoreParams {
#[allow(deprecated)]
fn eq(&self, other: &Self) -> bool {
#[cfg(feature = "aws")]
if self.aws_credentials.is_some() != other.aws_credentials.is_some() {
return false;
}
self.block_size == other.block_size
&& self
.object_store
.as_ref()
.map(|(store, url)| (Arc::as_ptr(store), url))
== other
.object_store
.as_ref()
.map(|(store, url)| (Arc::as_ptr(store), url))
&& self.s3_credentials_refresh_offset == other.s3_credentials_refresh_offset
&& self.object_store_wrapper.as_ref().map(Arc::as_ptr)
== other.object_store_wrapper.as_ref().map(Arc::as_ptr)
&& self
.storage_options_accessor
.as_ref()
.map(|a| a.accessor_id())
== other
.storage_options_accessor
.as_ref()
.map(|a| a.accessor_id())
&& self.use_constant_size_upload_parts == other.use_constant_size_upload_parts
&& self.list_is_lexically_ordered == other.list_is_lexically_ordered
}
}
pub fn uri_to_url(uri: &str) -> Result<Url> {
match Url::parse(uri) {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
local_path_to_url(uri)
}
Ok(url) => Ok(url),
Err(_) => local_path_to_url(uri),
}
}
fn expand_path(str_path: impl AsRef<str>) -> Result<std::path::PathBuf> {
let str_path = str_path.as_ref();
let expanded = expand_tilde_path(str_path).unwrap_or_else(|| str_path.into());
let mut expanded_path = path_abs::PathAbs::new(expanded)
.unwrap()
.as_path()
.to_path_buf();
if let Some(s) = expanded_path.as_path().to_str()
&& s.is_empty()
{
expanded_path = std::env::current_dir()?;
}
Ok(expanded_path)
}
fn expand_tilde_path(path: &str) -> Option<std::path::PathBuf> {
let home_dir = std::env::home_dir()?;
if path == "~" {
return Some(home_dir);
}
if let Some(stripped) = path.strip_prefix("~/") {
return Some(home_dir.join(stripped));
}
#[cfg(windows)]
if let Some(stripped) = path.strip_prefix("~\\") {
return Some(home_dir.join(stripped));
}
None
}
fn local_path_to_url(str_path: &str) -> Result<Url> {
let expanded_path = expand_path(str_path)?;
Url::from_directory_path(expanded_path).map_err(|_| {
Error::invalid_input_source(format!("Invalid table location: '{}'", str_path).into())
})
}
#[cfg(feature = "huggingface")]
fn parse_hf_repo_id(url: &Url) -> Result<String> {
let mut segments: Vec<String> = Vec::new();
if let Some(host) = url.host_str() {
segments.push(host.to_string());
}
segments.extend(
url.path()
.trim_start_matches('/')
.split('/')
.map(|s| s.to_string()),
);
if segments.len() < 2 {
return Err(Error::invalid_input(
"Huggingface URL must contain at least owner and repo",
));
}
let repo_type_candidates = ["models", "datasets", "spaces"];
let (owner, repo_with_rev) = if repo_type_candidates.contains(&segments[0].as_str()) {
if segments.len() < 3 {
return Err(Error::invalid_input(
"Huggingface URL missing owner/repo after repo type",
));
}
(segments[1].as_str(), segments[2].as_str())
} else {
(segments[0].as_str(), segments[1].as_str())
};
let repo = repo_with_rev
.split_once('@')
.map(|(r, _)| r)
.unwrap_or(repo_with_rev);
Ok(format!("{owner}/{repo}"))
}
impl ObjectStore {
pub async fn from_uri(uri: &str) -> Result<(Arc<Self>, Path)> {
let registry = Arc::new(ObjectStoreRegistry::default());
Self::from_uri_and_params(registry, uri, &ObjectStoreParams::default()).await
}
pub async fn from_uri_and_params(
registry: Arc<ObjectStoreRegistry>,
uri: &str,
params: &ObjectStoreParams,
) -> Result<(Arc<Self>, Path)> {
#[allow(deprecated)]
if let Some((store, path)) = params.object_store.as_ref() {
let mut inner = store.clone();
let store_prefix =
registry.calculate_object_store_prefix(uri, params.storage_options())?;
if let Some(wrapper) = params.object_store_wrapper.as_ref() {
inner = wrapper.wrap(&store_prefix, inner);
}
let io_tracker = IOTracker::default();
let tracked_store = io_tracker.wrap("", inner);
let store = Self {
inner: tracked_store,
scheme: path.scheme().to_string(),
block_size: params.block_size.unwrap_or(64 * 1024),
max_iop_size: *DEFAULT_MAX_IOP_SIZE,
use_constant_size_upload_parts: params.use_constant_size_upload_parts,
list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or_default(),
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
io_tracker,
store_prefix,
};
let path = Path::parse(path.path())?;
return Ok((Arc::new(store), path));
}
let url = uri_to_url(uri)?;
let store = registry.get_store(url.clone(), params).await?;
let provider = registry.get_provider(url.scheme()).expect_ok()?;
let path = provider.extract_path(&url)?;
Ok((store, path))
}
pub fn extract_path_from_uri(registry: Arc<ObjectStoreRegistry>, uri: &str) -> Result<Path> {
let url = uri_to_url(uri)?;
let provider = registry
.get_provider(url.scheme())
.ok_or_else(|| Error::invalid_input(format!("Unknown scheme: {}", url.scheme())))?;
provider.extract_path(&url)
}
#[deprecated(note = "Use `from_uri` instead")]
pub fn from_path(str_path: &str) -> Result<(Arc<Self>, Path)> {
Self::from_uri_and_params(
Arc::new(ObjectStoreRegistry::default()),
str_path,
&Default::default(),
)
.now_or_never()
.unwrap()
}
pub fn local() -> Self {
let provider = FileStoreProvider;
provider
.new_store(Url::parse("file:///").unwrap(), &Default::default())
.now_or_never()
.unwrap()
.unwrap()
}
pub fn memory() -> Self {
let provider = MemoryStoreProvider;
provider
.new_store(Url::parse("memory:///").unwrap(), &Default::default())
.now_or_never()
.unwrap()
.unwrap()
}
pub fn is_local(&self) -> bool {
self.scheme == "file"
}
pub fn is_cloud(&self) -> bool {
self.scheme != "file" && self.scheme != "memory"
}
pub fn scheme(&self) -> &str {
&self.scheme
}
pub fn block_size(&self) -> usize {
self.block_size
}
pub fn max_iop_size(&self) -> u64 {
self.max_iop_size
}
pub fn io_parallelism(&self) -> usize {
std::env::var("LANCE_IO_THREADS")
.map(|val| val.parse::<usize>().unwrap())
.unwrap_or(self.io_parallelism)
}
pub fn io_tracker(&self) -> &IOTracker {
&self.io_tracker
}
pub fn io_stats_snapshot(&self) -> IoStats {
self.io_tracker.stats()
}
pub fn io_stats_incremental(&self) -> IoStats {
self.io_tracker.incremental_stats()
}
pub async fn open(&self, path: &Path) -> Result<Box<dyn Reader>> {
match self.scheme.as_str() {
"file" => {
LocalObjectReader::open_with_tracker(
path,
self.block_size,
None,
Arc::new(self.io_tracker.clone()),
)
.await
}
_ => Ok(Box::new(CloudObjectReader::new(
self.inner.clone(),
path.clone(),
self.block_size,
None,
self.download_retry_count,
)?)),
}
}
pub async fn open_with_size(&self, path: &Path, known_size: usize) -> Result<Box<dyn Reader>> {
if known_size <= self.block_size {
return Ok(Box::new(SmallReader::new(
self.inner.clone(),
path.clone(),
self.download_retry_count,
known_size,
)));
}
match self.scheme.as_str() {
"file" => {
LocalObjectReader::open_with_tracker(
path,
self.block_size,
Some(known_size),
Arc::new(self.io_tracker.clone()),
)
.await
}
_ => Ok(Box::new(CloudObjectReader::new(
self.inner.clone(),
path.clone(),
self.block_size,
Some(known_size),
self.download_retry_count,
)?)),
}
}
pub async fn create_local_writer(path: &std::path::Path) -> Result<ObjectWriter> {
let object_store = Self::local();
let absolute_path = expand_path(path.to_string_lossy())?;
let os_path = Path::from_absolute_path(absolute_path)?;
ObjectWriter::new(&object_store, &os_path).await
}
pub async fn open_local(path: &std::path::Path) -> Result<Box<dyn Reader>> {
let object_store = Self::local();
let absolute_path = expand_path(path.to_string_lossy())?;
let os_path = Path::from_absolute_path(absolute_path)?;
object_store.open(&os_path).await
}
pub async fn create(&self, path: &Path) -> Result<Box<dyn Writer>> {
match self.scheme.as_str() {
"file" => {
let local_path = super::local::to_local_path(path);
let local_path = std::path::PathBuf::from(&local_path);
if let Some(parent) = local_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let parent = local_path
.parent()
.expect("file path must have parent")
.to_owned();
let named_temp =
tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
.await
.map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
let (std_file, temp_path) = named_temp.into_parts();
let file = tokio::fs::File::from_std(std_file);
Ok(Box::new(LocalWriter::new(
file,
path.clone(),
temp_path,
Arc::new(self.io_tracker.clone()),
)))
}
_ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
}
}
pub async fn put(&self, path: &Path, content: &[u8]) -> Result<WriteResult> {
let mut writer = self.create(path).await?;
writer.write_all(content).await?;
Writer::shutdown(writer.as_mut()).await
}
pub async fn delete(&self, path: &Path) -> Result<()> {
self.inner.delete(path).await?;
Ok(())
}
pub async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
if self.is_local() {
return super::local::copy_file(from, to);
}
Ok(self.inner.copy(from, to).await?)
}
pub async fn read_dir(&self, dir_path: impl Into<Path>) -> Result<Vec<String>> {
let path = dir_path.into();
let path = Path::parse(&path)?;
let output = self.inner.list_with_delimiter(Some(&path)).await?;
Ok(output
.common_prefixes
.iter()
.chain(output.objects.iter().map(|o| &o.location))
.map(|s| s.filename().unwrap().to_string())
.collect())
}
pub fn list(
&self,
path: Option<Path>,
) -> Pin<Box<dyn Stream<Item = Result<ObjectMeta>> + Send>> {
Box::pin(ListRetryStream::new(self.inner.clone(), path, 5).map(|m| m.map_err(|e| e.into())))
}
pub fn read_dir_all<'a, 'b>(
&'a self,
dir_path: impl Into<&'b Path> + Send,
unmodified_since: Option<DateTime<Utc>>,
) -> BoxStream<'a, Result<ObjectMeta>> {
self.inner.read_dir_all(dir_path, unmodified_since)
}
pub async fn remove_dir_all(&self, dir_path: impl Into<Path>) -> Result<()> {
let path = dir_path.into();
let path = Path::parse(&path)?;
if self.is_local() {
return super::local::remove_dir_all(&path);
}
let sub_entries = self
.inner
.list(Some(&path))
.map(|m| m.map(|meta| meta.location))
.boxed();
self.inner
.delete_stream(sub_entries)
.try_collect::<Vec<_>>()
.await?;
if self.scheme == "file-object-store" {
return super::local::remove_dir_all(&path);
}
Ok(())
}
pub fn remove_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
) -> BoxStream<'a, Result<Path>> {
self.inner
.delete_stream(locations.err_into::<ObjectStoreError>().boxed())
.err_into::<Error>()
.boxed()
}
pub async fn exists(&self, path: &Path) -> Result<bool> {
match self.inner.head(path).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
Err(e) => Err(e.into()),
}
}
pub async fn size(&self, path: &Path) -> Result<u64> {
Ok(self.inner.head(path).await?.size)
}
pub async fn read_one_all(&self, path: &Path) -> Result<Bytes> {
let reader = self.open(path).await?;
Ok(reader.get_all().await?)
}
pub async fn read_one_range(&self, path: &Path, range: Range<usize>) -> Result<Bytes> {
let reader = self.open(path).await?;
Ok(reader.get_range(range).await?)
}
}
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)]
pub enum LanceConfigKey {
DownloadRetryCount,
}
impl FromStr for LanceConfigKey {
type Err = Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"download_retry_count" => Ok(Self::DownloadRetryCount),
_ => Err(Error::invalid_input_source(
format!("Invalid LanceConfigKey: {}", s).into(),
)),
}
}
}
#[derive(Clone, Debug, Default)]
pub struct StorageOptions(pub HashMap<String, String>);
impl StorageOptions {
pub fn new(options: HashMap<String, String>) -> Self {
let mut options = options;
if let Ok(value) = std::env::var("AZURE_STORAGE_ALLOW_HTTP") {
options.insert("allow_http".into(), value);
}
if let Ok(value) = std::env::var("AZURE_STORAGE_USE_HTTP") {
options.insert("allow_http".into(), value);
}
if let Ok(value) = std::env::var("AWS_ALLOW_HTTP") {
options.insert("allow_http".into(), value);
}
if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_MAX_RETRIES") {
options.insert("client_max_retries".into(), value);
}
if let Ok(value) = std::env::var("OBJECT_STORE_CLIENT_RETRY_TIMEOUT") {
options.insert("client_retry_timeout".into(), value);
}
Self(options)
}
pub fn allow_http(&self) -> bool {
self.0.iter().any(|(key, value)| {
key.to_ascii_lowercase().contains("allow_http") & str_is_truthy(value)
})
}
pub fn download_retry_count(&self) -> usize {
self.0
.iter()
.find(|(key, _)| key.eq_ignore_ascii_case("download_retry_count"))
.map(|(_, value)| value.parse::<usize>().unwrap_or(3))
.unwrap_or(3)
}
pub fn client_max_retries(&self) -> usize {
self.0
.iter()
.find(|(key, _)| key.eq_ignore_ascii_case("client_max_retries"))
.and_then(|(_, value)| value.parse::<usize>().ok())
.unwrap_or(10)
}
pub fn client_retry_timeout(&self) -> u64 {
self.0
.iter()
.find(|(key, _)| key.eq_ignore_ascii_case("client_retry_timeout"))
.and_then(|(_, value)| value.parse::<u64>().ok())
.unwrap_or(180)
}
pub fn get(&self, key: &str) -> Option<&String> {
self.0.get(key)
}
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
pub fn client_options(&self) -> Result<ClientOptions> {
let mut headers = HeaderMap::new();
for (key, value) in &self.0 {
if let Some(header_name) = key.strip_prefix("headers.") {
let name = header_name
.parse::<http::header::HeaderName>()
.map_err(|e| {
Error::invalid_input(format!("invalid header name '{header_name}': {e}"))
})?;
let val = HeaderValue::from_str(value).map_err(|e| {
Error::invalid_input(format!("invalid header value for '{header_name}': {e}"))
})?;
headers.insert(name, val);
}
}
let mut client_options = ClientOptions::default();
if !headers.is_empty() {
client_options = client_options.with_default_headers(headers);
}
Ok(client_options)
}
pub fn expires_at_millis(&self) -> Option<u64> {
self.0
.get(EXPIRES_AT_MILLIS_KEY)
.and_then(|s| s.parse::<u64>().ok())
}
}
impl From<HashMap<String, String>> for StorageOptions {
fn from(value: HashMap<String, String>) -> Self {
Self::new(value)
}
}
static DEFAULT_OBJECT_STORE_REGISTRY: std::sync::LazyLock<ObjectStoreRegistry> =
std::sync::LazyLock::new(ObjectStoreRegistry::default);
impl ObjectStore {
#[allow(clippy::too_many_arguments)]
pub fn new(
store: Arc<DynObjectStore>,
location: Url,
block_size: Option<usize>,
wrapper: Option<Arc<dyn WrappingObjectStore>>,
use_constant_size_upload_parts: bool,
list_is_lexically_ordered: bool,
io_parallelism: usize,
download_retry_count: usize,
storage_options: Option<&HashMap<String, String>>,
) -> Self {
let scheme = location.scheme();
let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
let store_prefix = match DEFAULT_OBJECT_STORE_REGISTRY.get_provider(scheme) {
Some(provider) => provider
.calculate_object_store_prefix(&location, storage_options)
.unwrap(),
None => {
let store_prefix = format!("{}${}", location.scheme(), location.authority());
log::warn!(
"Guessing that object store prefix is {}, since object store scheme is not found in registry.",
store_prefix
);
store_prefix
}
};
let store = match wrapper {
Some(wrapper) => wrapper.wrap(&store_prefix, store),
None => store,
};
let io_tracker = IOTracker::default();
let tracked_store = io_tracker.wrap("", store);
Self {
inner: tracked_store,
scheme: scheme.into(),
block_size,
max_iop_size: *DEFAULT_MAX_IOP_SIZE,
use_constant_size_upload_parts,
list_is_lexically_ordered,
io_parallelism,
download_retry_count,
io_tracker,
store_prefix,
}
}
}
fn infer_block_size(scheme: &str) -> usize {
match scheme {
"file" => 4 * 1024,
_ => 64 * 1024,
}
}
#[cfg(test)]
mod tests {
use super::*;
use lance_core::utils::tempfile::{TempStdDir, TempStdFile, TempStrDir};
use object_store::memory::InMemory;
use rstest::rstest;
use std::env::set_current_dir;
use std::fs::{create_dir_all, write};
use std::path::Path as StdPath;
use std::sync::atomic::{AtomicBool, Ordering};
fn write_to_file(path_str: &str, contents: &str) -> std::io::Result<()> {
let path = expand_path(path_str).map_err(std::io::Error::other)?;
std::fs::create_dir_all(path.parent().unwrap())?;
write(path, contents)
}
async fn read_from_store(store: &ObjectStore, path: &Path) -> Result<String> {
let test_file_store = store.open(path).await.unwrap();
let size = test_file_store.size().await.unwrap();
let bytes = test_file_store.get_range(0..size).await.unwrap();
let contents = String::from_utf8(bytes.to_vec()).unwrap();
Ok(contents)
}
#[tokio::test]
async fn test_absolute_paths() {
let tmp_path = TempStrDir::default();
write_to_file(
&format!("{tmp_path}/bar/foo.lance/test_file"),
"TEST_CONTENT",
)
.unwrap();
for uri in &[
format!("{tmp_path}/bar/foo.lance"),
format!("{tmp_path}/./bar/foo.lance"),
format!("{tmp_path}/bar/foo.lance/../foo.lance"),
] {
let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
let contents = read_from_store(store.as_ref(), &path.child("test_file"))
.await
.unwrap();
assert_eq!(contents, "TEST_CONTENT");
}
}
#[tokio::test]
async fn test_cloud_paths() {
let uri = "s3://bucket/foo.lance";
let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
assert_eq!(store.scheme, "s3");
assert_eq!(path.to_string(), "foo.lance");
let (store, path) = ObjectStore::from_uri("s3+ddb://bucket/foo.lance")
.await
.unwrap();
assert_eq!(store.scheme, "s3");
assert_eq!(path.to_string(), "foo.lance");
let (store, path) = ObjectStore::from_uri("gs://bucket/foo.lance")
.await
.unwrap();
assert_eq!(store.scheme, "gs");
assert_eq!(path.to_string(), "foo.lance");
let (store, path) =
ObjectStore::from_uri("abfss://filesystem@account.dfs.core.windows.net/foo.lance")
.await
.unwrap();
assert_eq!(store.scheme, "abfss");
assert_eq!(path.to_string(), "foo.lance");
}
async fn test_block_size_used_test_helper(
uri: &str,
storage_options: Option<HashMap<String, String>>,
default_expected_block_size: usize,
) {
let registry = Arc::new(ObjectStoreRegistry::default());
let accessor = storage_options
.clone()
.map(|opts| Arc::new(StorageOptionsAccessor::with_static_options(opts)));
let params = ObjectStoreParams {
storage_options_accessor: accessor.clone(),
..ObjectStoreParams::default()
};
let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
.await
.unwrap();
assert_eq!(store.block_size, default_expected_block_size);
let registry = Arc::new(ObjectStoreRegistry::default());
let params = ObjectStoreParams {
block_size: Some(1024),
storage_options_accessor: accessor,
..ObjectStoreParams::default()
};
let (store, _) = ObjectStore::from_uri_and_params(registry, uri, ¶ms)
.await
.unwrap();
assert_eq!(store.block_size, 1024);
}
#[rstest]
#[case("s3://bucket/foo.lance", None)]
#[case("gs://bucket/foo.lance", None)]
#[case("az://account/bucket/foo.lance",
Some(HashMap::from([
(String::from("account_name"), String::from("account")),
(String::from("container_name"), String::from("container"))
])))]
#[case("abfss://filesystem@account.dfs.core.windows.net/foo.lance",
Some(HashMap::from([
(String::from("account_name"), String::from("account")),
(String::from("container_name"), String::from("filesystem"))
])))]
#[tokio::test]
async fn test_block_size_used_cloud(
#[case] uri: &str,
#[case] storage_options: Option<HashMap<String, String>>,
) {
test_block_size_used_test_helper(uri, storage_options, 64 * 1024).await;
}
#[rstest]
#[case("file")]
#[case("file-object-store")]
#[case("memory:///bucket/foo.lance")]
#[tokio::test]
async fn test_block_size_used_file(#[case] prefix: &str) {
let tmp_path = TempStrDir::default();
let path = format!("{tmp_path}/bar/foo.lance/test_file");
write_to_file(&path, "URL").unwrap();
let uri = format!("{prefix}:///{path}");
test_block_size_used_test_helper(&uri, None, 4 * 1024).await;
}
#[tokio::test]
async fn test_relative_paths() {
let tmp_path = TempStrDir::default();
write_to_file(
&format!("{tmp_path}/bar/foo.lance/test_file"),
"RELATIVE_URL",
)
.unwrap();
set_current_dir(StdPath::new(tmp_path.as_ref())).expect("Error changing current dir");
let (store, path) = ObjectStore::from_uri("./bar/foo.lance").await.unwrap();
let contents = read_from_store(store.as_ref(), &path.child("test_file"))
.await
.unwrap();
assert_eq!(contents, "RELATIVE_URL");
}
#[tokio::test]
async fn test_tilde_expansion() {
let uri = "~/foo.lance";
write_to_file(&format!("{uri}/test_file"), "TILDE").unwrap();
let (store, path) = ObjectStore::from_uri(uri).await.unwrap();
let contents = read_from_store(store.as_ref(), &path.child("test_file"))
.await
.unwrap();
assert_eq!(contents, "TILDE");
}
#[tokio::test]
async fn test_read_directory() {
let path = TempStdDir::default();
create_dir_all(path.join("foo").join("bar")).unwrap();
create_dir_all(path.join("foo").join("zoo")).unwrap();
create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
write_to_file(
path.join("foo").join("test_file").to_str().unwrap(),
"read_dir",
)
.unwrap();
let (store, base) = ObjectStore::from_uri(path.to_str().unwrap()).await.unwrap();
let sub_dirs = store.read_dir(base.child("foo")).await.unwrap();
assert_eq!(sub_dirs, vec!["bar", "zoo", "test_file"]);
}
#[tokio::test]
async fn test_delete_directory_local_store() {
test_delete_directory("").await;
}
#[tokio::test]
async fn test_delete_directory_file_object_store() {
test_delete_directory("file-object-store").await;
}
async fn test_delete_directory(scheme: &str) {
let path = TempStdDir::default();
create_dir_all(path.join("foo").join("bar")).unwrap();
create_dir_all(path.join("foo").join("zoo")).unwrap();
create_dir_all(path.join("foo").join("zoo").join("abc")).unwrap();
write_to_file(
path.join("foo")
.join("bar")
.join("test_file")
.to_str()
.unwrap(),
"delete",
)
.unwrap();
let file_url = Url::from_directory_path(&path).unwrap();
let url = if scheme.is_empty() {
file_url
} else {
let mut url = Url::parse(&format!("{scheme}:///")).unwrap();
url.set_path(file_url.path());
url
};
let (store, base) = ObjectStore::from_uri(url.as_ref()).await.unwrap();
store.remove_dir_all(base.child("foo")).await.unwrap();
assert!(!path.join("foo").exists());
}
#[derive(Debug)]
struct TestWrapper {
called: AtomicBool,
return_value: Arc<dyn OSObjectStore>,
}
impl WrappingObjectStore for TestWrapper {
fn wrap(
&self,
_store_prefix: &str,
_original: Arc<dyn OSObjectStore>,
) -> Arc<dyn OSObjectStore> {
self.called.store(true, Ordering::Relaxed);
self.return_value.clone()
}
}
impl TestWrapper {
fn called(&self) -> bool {
self.called.load(Ordering::Relaxed)
}
}
#[tokio::test]
async fn test_wrapping_object_store_option_is_used() {
let mock_inner_store: Arc<dyn OSObjectStore> = Arc::new(InMemory::new());
let registry = Arc::new(ObjectStoreRegistry::default());
assert_eq!(Arc::strong_count(&mock_inner_store), 1);
let wrapper = Arc::new(TestWrapper {
called: AtomicBool::new(false),
return_value: mock_inner_store.clone(),
});
let params = ObjectStoreParams {
object_store_wrapper: Some(wrapper.clone()),
..ObjectStoreParams::default()
};
assert!(!wrapper.called());
let _ = ObjectStore::from_uri_and_params(registry, "memory:///", ¶ms)
.await
.unwrap();
assert!(wrapper.called());
assert_eq!(Arc::strong_count(&mock_inner_store), 2);
}
#[tokio::test]
async fn test_local_paths() {
let file_path = TempStdFile::default();
let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
writer.write_all(b"LOCAL").await.unwrap();
Writer::shutdown(&mut writer).await.unwrap();
let reader = ObjectStore::open_local(&file_path).await.unwrap();
let buf = reader.get_range(0..5).await.unwrap();
assert_eq!(buf.as_ref(), b"LOCAL");
}
#[tokio::test]
async fn test_read_one() {
let file_path = TempStdFile::default();
let mut writer = ObjectStore::create_local_writer(&file_path).await.unwrap();
writer.write_all(b"LOCAL").await.unwrap();
Writer::shutdown(&mut writer).await.unwrap();
let file_path_os = object_store::path::Path::parse(file_path.to_str().unwrap()).unwrap();
let obj_store = ObjectStore::local();
let buf = obj_store.read_one_all(&file_path_os).await.unwrap();
assert_eq!(buf.as_ref(), b"LOCAL");
let buf = obj_store.read_one_range(&file_path_os, 0..5).await.unwrap();
assert_eq!(buf.as_ref(), b"LOCAL");
}
#[tokio::test]
#[cfg(windows)]
async fn test_windows_paths() {
use std::path::Component;
use std::path::Prefix;
use std::path::Prefix::*;
fn get_path_prefix(path: &StdPath) -> Prefix {
match path.components().next().unwrap() {
Component::Prefix(prefix_component) => prefix_component.kind(),
_ => panic!(),
}
}
fn get_drive_letter(prefix: Prefix) -> String {
match prefix {
Disk(bytes) => String::from_utf8(vec![bytes]).unwrap(),
_ => panic!(),
}
}
let tmp_path = TempStdFile::default();
let prefix = get_path_prefix(&tmp_path);
let drive_letter = get_drive_letter(prefix);
write_to_file(
&(format!("{drive_letter}:/test_folder/test.lance") + "/test_file"),
"WINDOWS",
)
.unwrap();
for uri in &[
format!("{drive_letter}:/test_folder/test.lance"),
format!("{drive_letter}:\\test_folder\\test.lance"),
] {
let (store, base) = ObjectStore::from_uri(uri).await.unwrap();
let contents = read_from_store(store.as_ref(), &base.child("test_file"))
.await
.unwrap();
assert_eq!(contents, "WINDOWS");
}
}
#[tokio::test]
async fn test_cross_filesystem_copy() {
let source_dir = TempStdDir::default();
let dest_dir = TempStdDir::default();
let source_file_name = "test_file.txt";
let source_file = source_dir.join(source_file_name);
std::fs::write(&source_file, b"test content").unwrap();
let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
.await
.unwrap();
let from_path = base_path.child(source_file_name);
let dest_file = dest_dir.join("copied_file.txt");
let dest_str = dest_file.to_str().unwrap();
let to_path = object_store::path::Path::parse(dest_str).unwrap();
store.copy(&from_path, &to_path).await.unwrap();
assert!(dest_file.exists());
let copied_content = std::fs::read(&dest_file).unwrap();
assert_eq!(copied_content, b"test content");
}
#[tokio::test]
async fn test_copy_creates_parent_directories() {
let source_dir = TempStdDir::default();
let dest_dir = TempStdDir::default();
let source_file_name = "test_file.txt";
let source_file = source_dir.join(source_file_name);
std::fs::write(&source_file, b"test content").unwrap();
let (store, base_path) = ObjectStore::from_uri(source_dir.to_str().unwrap())
.await
.unwrap();
let from_path = base_path.child(source_file_name);
let dest_file = dest_dir.join("nested").join("dirs").join("copied_file.txt");
let dest_str = dest_file.to_str().unwrap();
let to_path = object_store::path::Path::parse(dest_str).unwrap();
store.copy(&from_path, &to_path).await.unwrap();
assert!(dest_file.exists());
assert!(dest_file.parent().unwrap().exists());
let copied_content = std::fs::read(&dest_file).unwrap();
assert_eq!(copied_content, b"test content");
}
#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_extracts_headers() {
let opts = StorageOptions(HashMap::from([
("headers.x-custom-foo".to_string(), "bar".to_string()),
("headers.x-ms-version".to_string(), "2023-11-03".to_string()),
("region".to_string(), "us-west-2".to_string()),
]));
let client_options = opts.client_options().unwrap();
let opts_no_headers = StorageOptions(HashMap::from([(
"region".to_string(),
"us-west-2".to_string(),
)]));
opts_no_headers.client_options().unwrap();
#[cfg(feature = "gcp")]
{
use object_store::gcp::GoogleCloudStorageBuilder;
let _builder = GoogleCloudStorageBuilder::new()
.with_client_options(client_options)
.with_url("gs://test-bucket");
}
}
#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_rejects_invalid_header_name() {
let opts = StorageOptions(HashMap::from([(
"headers.bad header".to_string(),
"value".to_string(),
)]));
let err = opts.client_options().unwrap_err();
assert!(err.to_string().contains("invalid header name"));
}
#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_rejects_invalid_header_value() {
let opts = StorageOptions(HashMap::from([(
"headers.x-good-name".to_string(),
"bad\x01value".to_string(),
)]));
let err = opts.client_options().unwrap_err();
assert!(err.to_string().contains("invalid header value"));
}
#[test]
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp"))]
fn test_client_options_empty_when_no_header_keys() {
let opts = StorageOptions(HashMap::from([
("region".to_string(), "us-east-1".to_string()),
("access_key_id".to_string(), "AKID".to_string()),
]));
opts.client_options().unwrap();
}
}