use std::collections::HashMap;
use std::sync::{Arc, LazyLock, RwLock};
use url::Url;
use crate::object_store::path::Path;
use crate::object_store::{self, Error, ObjectStore};
use crate::Error as DeltaError;
type ClosureReturn = Result<(Box<dyn ObjectStore>, Path), Error>;
type HandlerClosure = Arc<dyn Fn(&Url, HashMap<String, String>) -> ClosureReturn + Send + Sync>;
type Handlers = HashMap<String, HandlerClosure>;
static URL_REGISTRY: LazyLock<RwLock<Handlers>> = LazyLock::new(|| RwLock::new(HashMap::default()));
pub fn insert_url_handler(
scheme: impl AsRef<str>,
handler_closure: HandlerClosure,
) -> Result<(), DeltaError> {
let Ok(mut registry) = URL_REGISTRY.write() else {
return Err(DeltaError::generic(
"failed to acquire lock for adding a URL handler!",
));
};
registry.insert(scheme.as_ref().into(), handler_closure);
Ok(())
}
pub fn store_from_url(url: &Url) -> crate::DeltaResult<Arc<dyn ObjectStore>> {
store_from_url_opts(url, std::iter::empty::<(&str, &str)>())
}
pub fn store_from_url_opts<I, K, V>(
url: &Url,
options: I,
) -> crate::DeltaResult<Arc<dyn ObjectStore>>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let (store, _path) = if let Ok(handlers) = URL_REGISTRY.read() {
if let Some(handler) = handlers.get(url.scheme()) {
let options = options
.into_iter()
.map(|(k, v)| (k.as_ref().to_string(), v.into()))
.collect();
handler(url, options)?
} else {
object_store::parse_url_opts(url, options)?
}
} else {
object_store::parse_url_opts(url, options)?
};
Ok(Arc::new(store))
}
#[cfg(any(not(feature = "arrow-57"), feature = "arrow-58"))]
#[cfg(test)]
mod tests {
use hdfs_native_object_store::HdfsObjectStoreBuilder;
use super::*;
use crate::object_store::path::Path;
use crate::object_store::{self};
fn parse_url_opts_hdfs_native<I, K, V>(
url: &Url,
options: I,
) -> Result<(Box<dyn ObjectStore>, Path), Error>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let options_map = options
.into_iter()
.map(|(k, v)| (k.as_ref().to_string(), v.into()));
let store = HdfsObjectStoreBuilder::new()
.with_url(url.as_str())
.with_config(options_map)
.build()?;
let path = Path::parse(url.path())?;
Ok((Box::new(store), path))
}
#[test]
fn test_add_hdfs_scheme() {
let scheme = "hdfs";
if let Ok(handlers) = URL_REGISTRY.read() {
assert!(handlers.get(scheme).is_none());
} else {
panic!("Failed to read the RwLock for the registry");
}
insert_url_handler(scheme, Arc::new(parse_url_opts_hdfs_native))
.expect("Failed to add new URL scheme handler");
if let Ok(handlers) = URL_REGISTRY.read() {
assert!(handlers.get(scheme).is_some());
} else {
panic!("Failed to read the RwLock for the registry");
}
let url: Url = Url::parse("hdfs://example").expect("Failed to parse URL");
let options: HashMap<String, String> = HashMap::default();
match store_from_url_opts(&url, options) {
Err(crate::Error::ObjectStore(object_store::Error::Generic { store, source: _ })) => {
assert_eq!(store, "HdfsObjectStore");
}
Err(unexpected) => panic!("Unexpected error happened: {unexpected:?}"),
Ok(_) => {
panic!("Expected to get an error when constructing an HdfsObjectStore, but something didn't work as expected! Either the parse_url_opts_hdfs_native function didn't get called, or the hdfs-native-object-store no longer errors when it cannot connect to HDFS");
}
}
}
}