use std::sync::Arc;
use hdfs_native_object_store::HdfsObjectStoreBuilder;
use object_store::Error as ObjStoreError;
use object_store::http::HttpBuilder;
use object_store::local::LocalFileSystem;
use object_store::{ObjectStore, path::Path as ObjPath};
use url::Url;
use crate::io::datafusion::input_uri::InputUri;
#[derive(Clone)]
pub struct ResolvedObject {
pub store: Arc<dyn ObjectStore>,
pub path: ObjPath,
}
impl std::fmt::Debug for ResolvedObject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResolvedObject")
.field("path", &self.path)
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub enum UriStoreError {
InvalidUri(String),
UnsupportedScheme(String),
MissingAuthority(String),
ObjectStore(ObjStoreError),
HdfsBuild(String),
}
impl std::fmt::Display for UriStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
UriStoreError::InvalidUri(s) => write!(f, "invalid URI: {s}"),
UriStoreError::UnsupportedScheme(s) => write!(f, "unsupported URI scheme: {s}"),
UriStoreError::MissingAuthority(s) => {
write!(f, "HDFS URI missing authority (host): {s}")
}
UriStoreError::ObjectStore(e) => write!(f, "object store error: {e}"),
UriStoreError::HdfsBuild(s) => write!(f, "HDFS store build error: {s}"),
}
}
}
impl std::error::Error for UriStoreError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
UriStoreError::ObjectStore(e) => Some(e),
_ => None,
}
}
}
impl From<ObjStoreError> for UriStoreError {
fn from(e: ObjStoreError) -> Self {
UriStoreError::ObjectStore(e)
}
}
pub fn resolve_input_uri(uri: &InputUri) -> Result<ResolvedObject, UriStoreError> {
let url = uri
.parse()
.map_err(|_| UriStoreError::InvalidUri(uri.0.clone()))?;
resolve_url(&url)
}
pub fn resolve_url(url: &Url) -> Result<ResolvedObject, UriStoreError> {
match url.scheme() {
"file" => resolve_file(url),
"http" | "https" => resolve_http(url),
"hdfs" => resolve_hdfs(url),
other => Err(UriStoreError::UnsupportedScheme(other.to_string())),
}
}
fn resolve_file(url: &Url) -> Result<ResolvedObject, UriStoreError> {
let store = Arc::new(LocalFileSystem::new_with_prefix("/")?);
let rel = url.path().trim_start_matches('/');
Ok(ResolvedObject {
store,
path: ObjPath::from(rel),
})
}
fn resolve_http(url: &Url) -> Result<ResolvedObject, UriStoreError> {
let store = Arc::new(HttpBuilder::new().build()?);
Ok(ResolvedObject {
store,
path: ObjPath::from(url.as_str()),
})
}
fn resolve_hdfs(url: &Url) -> Result<ResolvedObject, UriStoreError> {
let host = url
.host_str()
.ok_or_else(|| UriStoreError::MissingAuthority(url.as_str().to_string()))?;
let base = match url.port() {
Some(port) => format!("hdfs://{host}:{port}"),
None => format!("hdfs://{host}"),
};
let rel = url.path().trim_start_matches('/');
let hdfs_store = HdfsObjectStoreBuilder::new()
.with_url(base)
.build()
.map_err(|e| UriStoreError::HdfsBuild(e.to_string()))?;
Ok(ResolvedObject {
store: Arc::new(hdfs_store),
path: ObjPath::from(rel),
})
}
#[cfg(test)]
mod datafusion_storage_tests {
use super::*;
fn url(s: &str) -> Url {
Url::parse(s).expect("test URL must parse")
}
#[test]
fn resolve_input_uri_invalid_uri_maps_to_invalid_uri() {
let uri = InputUri("not a uri".to_string());
match resolve_input_uri(&uri).unwrap_err() {
UriStoreError::InvalidUri(s) => assert_eq!(s, "not a uri"),
other => panic!("expected InvalidUri, got: {other:?}"),
}
}
#[test]
fn resolve_url_unsupported_scheme() {
let u = url("s3://bucket/key");
match resolve_url(&u).unwrap_err() {
UriStoreError::UnsupportedScheme(s) => assert_eq!(s, "s3"),
other => panic!("expected UnsupportedScheme, got: {other:?}"),
}
}
#[test]
fn resolve_file_makes_absolute_fs_path_relative() {
let u = url("file:///tmp/test.parquet");
let resolved = resolve_url(&u).expect("file:// should resolve");
assert_eq!(resolved.path.as_ref(), "tmp/test.parquet");
}
#[test]
fn resolve_file_root_path_is_empty_relative_path() {
let u = url("file:///");
let resolved = resolve_url(&u).expect("file:/// should resolve");
assert_eq!(resolved.path.as_ref(), "");
}
#[test]
fn resolve_hdfs_missing_authority_is_error() {
let u = url("hdfs:///data/file.parquet");
match resolve_url(&u).unwrap_err() {
UriStoreError::MissingAuthority(s) => {
assert_eq!(s, "hdfs:///data/file.parquet");
}
other => panic!("expected MissingAuthority, got: {other:?}"),
}
}
#[test]
fn resolve_hdfs_returns_relative_path_without_leading_slash() {
let u = url("hdfs://localhost:9870/some/path.parquet");
let resolved = resolve_url(&u).expect("builder may succeed without contacting HDFS");
assert_eq!(resolved.path.as_ref(), "some/path.parquet");
}
#[test]
fn resolve_http_without_base_url_returns_object_store_error() {
let u = url("http://example.com/data.parquet");
match resolve_url(&u).unwrap_err() {
UriStoreError::ObjectStore(e) => {
let s = format!("{e:?}");
assert!(s.contains("HTTP"), "expected HTTP store error, got: {s}");
assert!(s.contains("MissingUrl"), "expected MissingUrl, got: {s}");
}
other => panic!("expected ObjectStore error, got: {other:?}"),
}
}
#[test]
fn resolve_https_without_base_url_returns_object_store_error() {
let u = url("https://example.com/a/b/c");
match resolve_url(&u).unwrap_err() {
UriStoreError::ObjectStore(e) => {
let s = format!("{e:?}");
assert!(s.contains("HTTP"), "expected HTTP store error, got: {s}");
assert!(s.contains("MissingUrl"), "expected MissingUrl, got: {s}");
}
other => panic!("expected ObjectStore error, got: {other:?}"),
}
}
}