use std::collections::HashMap;
use opendal::services::HdfsNativeConfig;
use opendal::Operator;
use url::Url;
use crate::error::Error;
use crate::Result;
pub(crate) fn hdfs_relative_path(path: &str) -> Result<&str> {
let after_scheme = path
.strip_prefix("hdfs://")
.ok_or_else(|| Error::ConfigInvalid {
message: format!("Invalid HDFS path: {path}, should start with hdfs://"),
})?;
match after_scheme.find('/') {
Some(pos) => Ok(&after_scheme[pos + 1..]),
None => Err(Error::ConfigInvalid {
message: format!("Invalid HDFS path: {path}, missing path component"),
}),
}
}
const HDFS_NAME_NODE: &str = "hdfs.name-node";
const HDFS_ENABLE_APPEND: &str = "hdfs.enable-append";
pub(crate) fn hdfs_config_parse(props: HashMap<String, String>) -> Result<HdfsNativeConfig> {
let mut cfg = HdfsNativeConfig::default();
cfg.name_node = props.get(HDFS_NAME_NODE).cloned();
if let Some(v) = props.get(HDFS_ENABLE_APPEND) {
if v.eq_ignore_ascii_case("true") {
cfg.enable_append = true;
}
}
Ok(cfg)
}
pub(crate) fn hdfs_config_build(cfg: &HdfsNativeConfig, path: &str) -> Result<Operator> {
let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
message: format!("Invalid HDFS url: {path}"),
})?;
let mut cfg = cfg.clone();
if cfg.name_node.is_none() {
let host = url.host_str().ok_or_else(|| Error::ConfigInvalid {
message: format!("Invalid HDFS url: {path}, missing name node host"),
})?;
let port_part = url.port().map(|p| format!(":{p}")).unwrap_or_default();
cfg.name_node = Some(format!("hdfs://{host}{port_part}"));
}
cfg.root = Some("/".to_string());
Ok(Operator::from_config(cfg)?.finish())
}
#[cfg(test)]
mod tests {
use super::*;
fn make_props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
#[test]
fn test_hdfs_config_parse_with_name_node() {
let props = make_props(&[("hdfs.name-node", "hdfs://namenode:8020")]);
let cfg = hdfs_config_parse(props).unwrap();
assert_eq!(cfg.name_node.as_deref(), Some("hdfs://namenode:8020"));
assert!(!cfg.enable_append);
}
#[test]
fn test_hdfs_config_parse_with_enable_append() {
let props = make_props(&[
("hdfs.name-node", "hdfs://namenode:8020"),
("hdfs.enable-append", "true"),
]);
let cfg = hdfs_config_parse(props).unwrap();
assert!(cfg.enable_append);
}
#[test]
fn test_hdfs_config_parse_empty_props() {
let cfg = hdfs_config_parse(HashMap::new()).unwrap();
assert!(cfg.name_node.is_none());
assert!(!cfg.enable_append);
}
#[test]
fn test_hdfs_config_build_extracts_name_node_from_path() {
let cfg = HdfsNativeConfig::default();
let op = hdfs_config_build(&cfg, "hdfs://namenode:8020/warehouse/db").unwrap();
assert_eq!(op.info().scheme().to_string(), "hdfs-native");
}
#[test]
fn test_hdfs_config_build_uses_config_name_node() {
let mut cfg = HdfsNativeConfig::default();
cfg.name_node = Some("hdfs://my-cluster:9000".to_string());
let op = hdfs_config_build(&cfg, "hdfs://my-cluster:9000/warehouse").unwrap();
assert_eq!(op.info().scheme().to_string(), "hdfs-native");
}
#[test]
fn test_hdfs_config_build_invalid_url() {
let cfg = HdfsNativeConfig::default();
let result = hdfs_config_build(&cfg, "not-a-valid-url");
assert!(result.is_err());
}
#[test]
fn test_hdfs_config_build_missing_host() {
let cfg = HdfsNativeConfig::default();
let result = hdfs_config_build(&cfg, "hdfs:///path/without/host");
assert!(result.is_err());
}
#[test]
fn test_hdfs_config_parse_enable_append_false() {
let props = make_props(&[
("hdfs.name-node", "hdfs://namenode:8020"),
("hdfs.enable-append", "false"),
]);
let cfg = hdfs_config_parse(props).unwrap();
assert!(!cfg.enable_append);
}
#[test]
fn test_hdfs_config_parse_unrelated_keys_ignored() {
let props = make_props(&[
("s3.endpoint", "https://s3.amazonaws.com"),
("fs.oss.endpoint", "https://oss.aliyuncs.com"),
("hdfs.name-node", "hdfs://namenode:8020"),
]);
let cfg = hdfs_config_parse(props).unwrap();
assert_eq!(cfg.name_node.as_deref(), Some("hdfs://namenode:8020"));
}
#[test]
fn test_hdfs_relative_path_normal() {
let result = hdfs_relative_path("hdfs://namenode:8020/warehouse/db/table");
assert_eq!(result.unwrap(), "warehouse/db/table");
}
#[test]
fn test_hdfs_relative_path_root_slash() {
let result = hdfs_relative_path("hdfs://namenode:8020/");
assert_eq!(result.unwrap(), "");
}
#[test]
fn test_hdfs_relative_path_no_port() {
let result = hdfs_relative_path("hdfs://nameservice1/warehouse/data");
assert_eq!(result.unwrap(), "warehouse/data");
}
#[test]
fn test_hdfs_relative_path_missing_path_component() {
let result = hdfs_relative_path("hdfs://namenode:8020");
assert!(result.is_err());
}
#[test]
fn test_hdfs_relative_path_wrong_scheme() {
let result = hdfs_relative_path("s3://bucket/key");
assert!(result.is_err());
}
}