use dashmap::DashMap;
use datafusion_common::{DataFusionError, Result};
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use std::sync::Arc;
use url::Url;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ObjectStoreUrl {
url: Url,
}
impl ObjectStoreUrl {
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
let mut parsed =
Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?;
let remaining = &parsed[url::Position::BeforePath..];
if !remaining.is_empty() && remaining != "/" {
return Err(DataFusionError::Execution(format!(
"ObjectStoreUrl must only contain scheme and authority, got: {remaining}"
)));
}
parsed.set_path("/");
Ok(Self { url: parsed })
}
pub fn local_filesystem() -> Self {
Self::parse("file://").unwrap()
}
pub fn as_str(&self) -> &str {
self.as_ref()
}
}
impl AsRef<str> for ObjectStoreUrl {
fn as_ref(&self) -> &str {
self.url.as_ref()
}
}
impl AsRef<Url> for ObjectStoreUrl {
fn as_ref(&self) -> &Url {
&self.url
}
}
impl std::fmt::Display for ObjectStoreUrl {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
self.as_str().fmt(f)
}
}
pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
fn register_store(
&self,
url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>>;
fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
}
pub struct DefaultObjectStoreRegistry {
object_stores: DashMap<String, Arc<dyn ObjectStore>>,
}
impl std::fmt::Debug for DefaultObjectStoreRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("DefaultObjectStoreRegistry")
.field(
"schemes",
&self
.object_stores
.iter()
.map(|o| o.key().clone())
.collect::<Vec<_>>(),
)
.finish()
}
}
impl Default for DefaultObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}
impl DefaultObjectStoreRegistry {
pub fn new() -> Self {
let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self { object_stores }
}
}
impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
fn register_store(
&self,
url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let s = get_url_key(url);
self.object_stores.insert(s, store)
}
fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
let s = get_url_key(url);
self.object_stores
.get(&s)
.map(|o| o.value().clone())
.ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {url}"
))
})
}
}
fn get_url_key(url: &Url) -> String {
format!(
"{}://{}",
url.scheme(),
&url[url::Position::BeforeHost..url::Position::AfterPort],
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_object_store_url() {
let file = ObjectStoreUrl::parse("file://").unwrap();
assert_eq!(file.as_str(), "file:///");
let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
assert_eq!(url.as_str(), "s3://bucket/");
let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
assert_eq!(url.as_str(), "s3://username:password@host:123/");
let err = ObjectStoreUrl::parse("s3://bucket:invalid").unwrap_err();
assert_eq!(err.to_string(), "External error: invalid port number");
let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?");
let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: ?foo=bar");
let err = ObjectStoreUrl::parse("s3://host:123/foo").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
let err =
ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
}
#[test]
fn test_get_url_key() {
let file = ObjectStoreUrl::parse("file://").unwrap();
let key = get_url_key(&file.url);
assert_eq!(key.as_str(), "file://");
let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
let key = get_url_key(&url.url);
assert_eq!(key.as_str(), "s3://bucket");
let url = ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
let key = get_url_key(&url.url);
assert_eq!(key.as_str(), "s3://host:123");
}
}