use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use dashmap::DashMap;
use object_store::{DynObjectStore, path::Path};
use url::Url;
use super::{DeltaIOStorageBackend, LogStore, ObjectStoreRef, StorageConfig, default_logstore};
use crate::{DeltaResult, DeltaTableError};
pub type ObjectStoreFactoryRegistry = Arc<DashMap<Url, Arc<dyn ObjectStoreFactory>>>;
pub trait ObjectStoreFactory: Send + Sync {
fn parse_url_opts(
&self,
url: &Url,
config: &StorageConfig,
) -> DeltaResult<(ObjectStoreRef, Path)>;
}
#[derive(Clone, Debug, Default)]
pub(crate) struct DefaultObjectStoreFactory {}
impl ObjectStoreFactory for DefaultObjectStoreFactory {
fn parse_url_opts(
&self,
url: &Url,
config: &StorageConfig,
) -> DeltaResult<(ObjectStoreRef, Path)> {
let (mut store, path) = default_parse_url_opts(url, &config.raw)?;
if let Some(runtime) = &config.runtime {
store =
Arc::new(DeltaIOStorageBackend::new(store, runtime.clone())) as Arc<DynObjectStore>;
}
Ok((store, path))
}
}
fn default_parse_url_opts(
url: &Url,
options: &HashMap<String, String>,
) -> DeltaResult<(ObjectStoreRef, Path)> {
match url.scheme() {
"memory" | "file" => {
let (store, path) = object_store::parse_url_opts(url, options)?;
tracing::debug!("building store with:\n\tParsed URL: {url}\n\tPath in store: {path}");
Ok((Arc::new(store), path))
}
_ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())),
}
}
pub fn object_store_factories() -> ObjectStoreFactoryRegistry {
static REGISTRY: OnceLock<ObjectStoreFactoryRegistry> = OnceLock::new();
REGISTRY
.get_or_init(|| {
let factory = Arc::new(DefaultObjectStoreFactory::default());
let registry = ObjectStoreFactoryRegistry::default();
registry.insert(Url::parse("memory://").unwrap(), factory.clone());
registry.insert(Url::parse("file://").unwrap(), factory);
registry
})
.clone()
}
pub fn store_for<K, V, I>(url: &Url, options: I) -> DeltaResult<ObjectStoreRef>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str> + Into<String>,
V: AsRef<str> + Into<String>,
{
let scheme = Url::parse(&format!("{}://", url.scheme())).unwrap();
let storage_config = StorageConfig::parse_options(options)?;
if let Some(factory) = object_store_factories().get(&scheme) {
let (store, _prefix) = factory.parse_url_opts(url, &storage_config)?;
let store = storage_config.decorate_store(store, url)?;
Ok(Arc::new(store))
} else {
Err(DeltaTableError::InvalidTableLocation(url.clone().into()))
}
}
pub type LogStoreFactoryRegistry = Arc<DashMap<Url, Arc<dyn LogStoreFactory>>>;
pub trait LogStoreFactory: Send + Sync {
fn with_options(
&self,
prefixed_store: ObjectStoreRef,
root_store: ObjectStoreRef,
location: &Url,
options: &StorageConfig,
) -> DeltaResult<Arc<dyn LogStore>>;
}
#[derive(Clone, Debug, Default)]
struct DefaultLogStoreFactory {}
impl LogStoreFactory for DefaultLogStoreFactory {
fn with_options(
&self,
prefixed_store: ObjectStoreRef,
root_store: ObjectStoreRef,
location: &Url,
options: &StorageConfig,
) -> DeltaResult<Arc<dyn LogStore>> {
Ok(default_logstore(
prefixed_store,
root_store,
location,
options,
))
}
}
pub fn logstore_factories() -> LogStoreFactoryRegistry {
static REGISTRY: OnceLock<LogStoreFactoryRegistry> = OnceLock::new();
REGISTRY
.get_or_init(|| {
let registry = LogStoreFactoryRegistry::default();
registry.insert(
Url::parse("memory://").unwrap(),
Arc::new(DefaultLogStoreFactory::default()),
);
registry.insert(
Url::parse("file://").unwrap(),
Arc::new(DefaultLogStoreFactory::default()),
);
registry
})
.clone()
}
#[cfg(test)]
mod tests {}