use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, OnceLock};
use futures::stream::BoxStream;
use futures::{StreamExt, stream};
use tokio::runtime::{Handle, Runtime};
use url::Url;
use crate::acl::{AclEntry, AclStatus};
use crate::common::config::{self, Configuration};
use crate::ec::resolve_ec_policy;
use crate::error::{HdfsError, Result};
use crate::file::{FileReader, FileWriter};
use crate::hdfs::protocol::NamenodeProtocol;
use crate::hdfs::proxy::NameServiceProxy;
use crate::proto::hdfs::hdfs_file_status_proto::FileType;
use crate::security::user::User;
use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
#[derive(Clone)]
pub struct WriteOptions {
pub block_size: Option<u64>,
pub replication: Option<u32>,
pub permission: u32,
pub overwrite: bool,
pub create_parent: bool,
}
impl Default for WriteOptions {
fn default() -> Self {
Self {
block_size: None,
replication: None,
permission: 0o644,
overwrite: false,
create_parent: true,
}
}
}
impl AsRef<WriteOptions> for WriteOptions {
fn as_ref(&self) -> &WriteOptions {
self
}
}
impl WriteOptions {
pub fn block_size(mut self, block_size: u64) -> Self {
self.block_size = Some(block_size);
self
}
pub fn replication(mut self, replication: u32) -> Self {
self.replication = Some(replication);
self
}
pub fn permission(mut self, permission: u32) -> Self {
self.permission = permission;
self
}
pub fn overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
self
}
pub fn create_parent(mut self, create_parent: bool) -> Self {
self.create_parent = create_parent;
self
}
}
#[derive(Debug, Clone)]
struct MountLink {
viewfs_path: String,
hdfs_path: String,
protocol: Arc<NamenodeProtocol>,
}
impl MountLink {
fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
Self {
viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
protocol,
}
}
fn resolve(&self, path: &str) -> Option<String> {
if path == self.viewfs_path {
Some(self.hdfs_path.clone())
} else {
path.strip_prefix(&format!("{}/", self.viewfs_path))
.map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
}
}
}
#[derive(Debug)]
struct MountTable {
mounts: Vec<MountLink>,
fallback: MountLink,
home_dir: String,
}
impl MountTable {
fn resolve(&self, src: &str) -> (&MountLink, String) {
let path = if src.starts_with('/') {
src.to_string()
} else {
format!("{}/{}", self.home_dir, src)
};
for link in self.mounts.iter() {
if let Some(resolved) = link.resolve(&path) {
return (link, resolved);
}
}
(&self.fallback, self.fallback.resolve(&path).unwrap())
}
}
fn build_home_dir(
scheme: &str,
host: Option<&str>,
config: &Configuration,
username: &str,
) -> String {
let prefix = match scheme {
"hdfs" => config.get("dfs.user.home.dir.prefix"),
"viewfs" => {
host.and_then(|host| config.get(&format!("fs.viewfs.mounttable.{host}.homedir")))
}
_ => None,
}
.unwrap_or("/user");
let prefix = prefix.trim_end_matches('/');
if prefix.is_empty() {
format!("/{username}")
} else {
format!("{prefix}/{username}")
}
}
#[derive(Debug)]
pub enum IORuntime {
Runtime(Runtime),
Handle(Handle),
}
impl From<Runtime> for IORuntime {
fn from(value: Runtime) -> Self {
Self::Runtime(value)
}
}
impl From<Handle> for IORuntime {
fn from(value: Handle) -> Self {
Self::Handle(value)
}
}
impl IORuntime {
fn handle(&self) -> Handle {
match self {
Self::Runtime(runtime) => runtime.handle().clone(),
Self::Handle(handle) => handle.clone(),
}
}
}
#[derive(Default)]
pub struct ClientBuilder {
url: Option<String>,
config: Option<HashMap<String, String>>,
config_dir: Option<String>,
runtime: Option<IORuntime>,
}
impl ClientBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_url(mut self, url: impl Into<String>) -> Self {
self.url = Some(url.into());
self
}
pub fn with_config(
mut self,
config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
self.config = Some(
config
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
);
self
}
pub fn with_config_dir(mut self, config_dir: impl Into<String>) -> Self {
self.config_dir = Some(config_dir.into());
self
}
pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
self.runtime = Some(runtime.into());
self
}
pub fn build(self) -> Result<Client> {
let config = Configuration::new(self.config_dir, self.config)?;
let url = if let Some(url) = self.url {
Url::parse(&url)?
} else {
Client::default_fs(&config)?
};
Client::build(&url, config, self.runtime)
}
}
#[derive(Clone, Debug)]
enum RuntimeHolder {
Custom(Arc<IORuntime>),
Default(Arc<OnceLock<Runtime>>),
}
impl RuntimeHolder {
fn new(rt: Option<IORuntime>) -> Self {
if let Some(rt) = rt {
Self::Custom(Arc::new(rt))
} else {
Self::Default(Arc::new(OnceLock::new()))
}
}
fn get_handle(&self) -> Handle {
match self {
Self::Custom(rt) => rt.handle().clone(),
Self::Default(rt) => match Handle::try_current() {
Ok(handle) => handle,
Err(_) => rt
.get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
.handle()
.clone(),
},
}
}
}
#[derive(Clone, Debug)]
pub struct Client {
mount_table: Arc<MountTable>,
config: Arc<Configuration>,
rt_holder: RuntimeHolder,
}
impl Client {
#[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
pub fn new(url: &str) -> Result<Self> {
let parsed_url = Url::parse(url)?;
Self::build(&parsed_url, Configuration::new(None, None)?, None)
}
#[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
let parsed_url = Url::parse(url)?;
Self::build(&parsed_url, Configuration::new(None, Some(config))?, None)
}
#[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
let config = Configuration::new(None, Some(config))?;
Self::build(&Self::default_fs(&config)?, config, None)
}
fn default_fs(config: &Configuration) -> Result<Url> {
let url = config
.get(config::DEFAULT_FS)
.ok_or(HdfsError::InvalidArgument(format!(
"No {} setting found",
config::DEFAULT_FS
)))?;
Ok(Url::parse(url)?)
}
fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
let resolved_url = if !url.has_host() {
let default_url = Self::default_fs(&config)?;
if url.scheme() != default_url.scheme() || !default_url.has_host() {
return Err(HdfsError::InvalidArgument(
"URL must contain a host".to_string(),
));
}
default_url
} else {
url.clone()
};
let config = Arc::new(config);
let rt_holder = RuntimeHolder::new(rt);
let user_info = User::get_user_info();
let username = user_info
.effective_user
.as_deref()
.or(user_info.real_user.as_deref())
.expect("User info must include a username");
let home_dir = build_home_dir(
resolved_url.scheme(),
resolved_url.host_str(),
config.as_ref(),
username,
);
let mount_table = match url.scheme() {
"hdfs" => {
let proxy = NameServiceProxy::new(
&resolved_url,
Arc::clone(&config),
rt_holder.get_handle(),
)?;
let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
MountTable {
mounts: Vec::new(),
fallback: MountLink::new("/", "/", protocol),
home_dir,
}
}
"viewfs" => Self::build_mount_table(
resolved_url.host_str().expect("URL must have a host"),
Arc::clone(&config),
rt_holder.get_handle(),
home_dir,
)?,
_ => {
return Err(HdfsError::InvalidArgument(
"Only `hdfs` and `viewfs` schemes are supported".to_string(),
));
}
};
Ok(Self {
mount_table: Arc::new(mount_table),
config,
rt_holder,
})
}
fn build_mount_table(
host: &str,
config: Arc<Configuration>,
handle: Handle,
home_dir: String,
) -> Result<MountTable> {
let mut mounts: Vec<MountLink> = Vec::new();
let mut fallback: Option<MountLink> = None;
for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
let url = Url::parse(hdfs_url)?;
if !url.has_host() {
return Err(HdfsError::InvalidArgument(
"URL must contain a host".to_string(),
));
}
if url.scheme() != "hdfs" {
return Err(HdfsError::InvalidArgument(
"Only hdfs mounts are supported for viewfs".to_string(),
));
}
let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
if let Some(prefix) = viewfs_path {
mounts.push(MountLink::new(prefix, url.path(), protocol));
} else {
if fallback.is_some() {
return Err(HdfsError::InvalidArgument(
"Multiple viewfs fallback links found".to_string(),
));
}
fallback = Some(MountLink::new("/", url.path(), protocol));
}
}
if let Some(fallback) = fallback {
mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
mounts.reverse();
Ok(MountTable {
mounts,
fallback,
home_dir,
})
} else {
Err(HdfsError::InvalidArgument(
"No viewfs fallback mount found".to_string(),
))
}
}
pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
let (link, resolved_path) = self.mount_table.resolve(path);
match link.protocol.get_file_info(&resolved_path).await?.fs {
Some(status) => Ok(FileStatus::from(status, path)),
None => Err(HdfsError::FileNotFound(path.to_string())),
}
}
pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
let iter = self.list_status_iter(path, recursive);
let statuses = iter
.into_stream()
.collect::<Vec<Result<FileStatus>>>()
.await;
let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
for status in statuses.into_iter() {
resolved_statues.push(status?);
}
Ok(resolved_statues)
}
pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
}
pub async fn read(&self, path: &str) -> Result<FileReader> {
let (link, resolved_path) = self.mount_table.resolve(path);
let located_info = link
.protocol
.get_block_locations(&resolved_path, 0, i64::MAX as u64)
.await?;
if let Some(locations) = located_info.locations {
let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
Some(resolve_ec_policy(ec_policy)?)
} else {
None
};
if locations.file_encryption_info.is_some() {
return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
}
Ok(FileReader::new(
Arc::clone(&link.protocol),
locations,
ec_schema,
Arc::clone(&self.config),
self.rt_holder.get_handle(),
))
} else {
Err(HdfsError::FileNotFound(path.to_string()))
}
}
pub async fn create(
&self,
src: &str,
write_options: impl AsRef<WriteOptions>,
) -> Result<FileWriter> {
let write_options = write_options.as_ref();
let (link, resolved_path) = self.mount_table.resolve(src);
let create_response = link
.protocol
.create(
&resolved_path,
write_options.permission,
write_options.overwrite,
write_options.create_parent,
write_options.replication,
write_options.block_size,
)
.await?;
match create_response.fs {
Some(status) => {
if status.file_encryption_info.is_some() {
let _ = self.delete(src, false).await;
return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
}
Ok(FileWriter::new(
Arc::clone(&link.protocol),
resolved_path,
status,
Arc::clone(&self.config),
self.rt_holder.get_handle(),
None,
))
}
None => Err(HdfsError::FileNotFound(src.to_string())),
}
}
fn needs_new_block(class: &str, msg: &str) -> bool {
class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
}
pub async fn append(&self, src: &str) -> Result<FileWriter> {
let (link, resolved_path) = self.mount_table.resolve(src);
let append_response = match link.protocol.append(&resolved_path, false).await {
Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
link.protocol.append(&resolved_path, true).await?
}
resp => resp?,
};
match append_response.stat {
Some(status) => {
if status.file_encryption_info.is_some() {
let _ = link
.protocol
.complete(src, append_response.block.map(|b| b.b), status.file_id)
.await;
return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
}
Ok(FileWriter::new(
Arc::clone(&link.protocol),
resolved_path,
status,
Arc::clone(&self.config),
self.rt_holder.get_handle(),
append_response.block,
))
}
None => Err(HdfsError::FileNotFound(src.to_string())),
}
}
pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.mkdirs(&resolved_path, permission, create_parent)
.await
.map(|_| ())
}
pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
let (src_link, src_resolved_path) = self.mount_table.resolve(src);
let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
if src_link.viewfs_path == dst_link.viewfs_path {
src_link
.protocol
.rename(&src_resolved_path, &dst_resolved_path, overwrite)
.await
.map(|_| ())
} else {
Err(HdfsError::InvalidArgument(
"Cannot rename across different name services".to_string(),
))
}
}
pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.delete(&resolved_path, recursive)
.await
.map(|r| r.result)
}
pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.set_times(&resolved_path, mtime, atime)
.await?;
Ok(())
}
pub async fn set_owner(
&self,
path: &str,
owner: Option<&str>,
group: Option<&str>,
) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.set_owner(&resolved_path, owner, group)
.await?;
Ok(())
}
pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.set_permission(&resolved_path, permission)
.await?;
Ok(())
}
pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
let (link, resolved_path) = self.mount_table.resolve(path);
let result = link
.protocol
.set_replication(&resolved_path, replication)
.await?
.result;
Ok(result)
}
pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
let (link, resolved_path) = self.mount_table.resolve(path);
let result = link
.protocol
.get_content_summary(&resolved_path)
.await?
.summary;
Ok(result.into())
}
pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.modify_acl_entries(&resolved_path, acl_spec)
.await?;
Ok(())
}
pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol
.remove_acl_entries(&resolved_path, acl_spec)
.await?;
Ok(())
}
pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol.remove_default_acl(&resolved_path).await?;
Ok(())
}
pub async fn remove_acl(&self, path: &str) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol.remove_acl(&resolved_path).await?;
Ok(())
}
pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
let (link, resolved_path) = self.mount_table.resolve(path);
link.protocol.set_acl(&resolved_path, acl_spec).await?;
Ok(())
}
pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
let (link, resolved_path) = self.mount_table.resolve(path);
Ok(link
.protocol
.get_acl_status(&resolved_path)
.await?
.result
.into())
}
pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
let flattened = expand_glob(pattern.to_string())?;
let mut results: Vec<FileStatus> = Vec::new();
for flat in flattened.into_iter() {
if flat.is_empty() {
continue;
}
let components = get_path_components(&flat);
#[derive(Clone, Debug)]
struct Candidate {
path: String,
status: Option<FileStatus>,
}
let mut candidates: Vec<Candidate> = vec![Candidate {
path: "/".to_string(),
status: None,
}];
for (idx, comp) in components.iter().enumerate() {
if candidates.is_empty() {
break;
}
let is_last = idx == components.len() - 1;
let unescaped = unescape_component(comp);
let glob_pat = GlobPattern::new(comp)?;
if !is_last && !glob_pat.has_wildcard() {
for cand in candidates.iter_mut() {
if !cand.path.ends_with('/') {
cand.path.push('/');
}
cand.path.push_str(&unescaped);
}
continue;
}
let mut new_candidates: Vec<Candidate> = Vec::new();
for cand in candidates.into_iter() {
if glob_pat.has_wildcard() {
let listing = match self.list_status(&cand.path, false).await {
Ok(listing) => listing,
Err(HdfsError::FileNotFound(_)) => continue,
Err(e) => return Err(e),
};
if listing.len() == 1 && listing[0].path == cand.path {
continue;
}
for child in listing.into_iter() {
if !is_last && !child.isdir {
continue;
}
let name = child
.path
.rsplit_once('/')
.map(|(_, n)| n)
.unwrap_or(child.path.as_str());
if glob_pat.matches(name) {
new_candidates.push(Candidate {
path: child.path.clone(),
status: Some(child),
});
}
}
} else {
let mut next_path = cand.path.clone();
if !next_path.ends_with('/') {
next_path.push('/');
}
next_path.push_str(&unescaped);
match self.get_file_info(&next_path).await {
Ok(status) => {
if is_last || status.isdir {
new_candidates.push(Candidate {
path: status.path.clone(),
status: Some(status),
});
}
}
Err(HdfsError::FileNotFound(_)) => continue,
Err(e) => return Err(e),
}
}
}
candidates = new_candidates;
}
for cand in candidates.into_iter() {
let status = if let Some(s) = cand.status {
s
} else {
match self.get_file_info(&cand.path).await {
Ok(s) => s,
Err(HdfsError::FileNotFound(_)) => continue,
Err(e) => return Err(e),
}
};
results.push(status);
}
}
Ok(results)
}
}
impl Default for Client {
fn default() -> Self {
ClientBuilder::new()
.build()
.expect("Failed to create default client")
}
}
pub(crate) struct DirListingIterator {
path: String,
resolved_path: String,
link: MountLink,
files_only: bool,
partial_listing: VecDeque<HdfsFileStatusProto>,
remaining: u32,
last_seen: Vec<u8>,
}
impl DirListingIterator {
fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
let (link, resolved_path) = mount_table.resolve(&path);
DirListingIterator {
path,
resolved_path,
link: link.clone(),
files_only,
partial_listing: VecDeque::new(),
remaining: 1,
last_seen: Vec::new(),
}
}
async fn get_next_batch(&mut self) -> Result<bool> {
let listing = self
.link
.protocol
.get_listing(&self.resolved_path, self.last_seen.clone(), false)
.await?;
if let Some(dir_list) = listing.dir_list {
self.last_seen = dir_list
.partial_listing
.last()
.map(|p| p.path.clone())
.unwrap_or(Vec::new());
self.remaining = dir_list.remaining_entries;
self.partial_listing = dir_list
.partial_listing
.into_iter()
.filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
.collect();
Ok(!self.partial_listing.is_empty())
} else {
Err(HdfsError::FileNotFound(self.path.clone()))
}
}
pub async fn next(&mut self) -> Option<Result<FileStatus>> {
if self.partial_listing.is_empty()
&& self.remaining > 0
&& let Err(error) = self.get_next_batch().await
{
self.remaining = 0;
return Some(Err(error));
}
if let Some(next) = self.partial_listing.pop_front() {
Some(Ok(FileStatus::from(next, &self.path)))
} else {
None
}
}
}
pub struct ListStatusIterator {
mount_table: Arc<MountTable>,
recursive: bool,
iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
}
impl ListStatusIterator {
fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
let initial = DirListingIterator::new(path.clone(), &mount_table, false);
ListStatusIterator {
mount_table,
recursive,
iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
}
}
pub async fn next(&self) -> Option<Result<FileStatus>> {
let mut next_file: Option<Result<FileStatus>> = None;
let mut iters = self.iters.lock().await;
while next_file.is_none() {
if let Some(iter) = iters.last_mut() {
if let Some(file_result) = iter.next().await {
if let Ok(file) = file_result {
if file.isdir && self.recursive {
iters.push(DirListingIterator::new(
file.path.clone(),
&self.mount_table,
false,
))
}
next_file = Some(Ok(file));
} else {
next_file = Some(file_result)
}
} else {
iters.pop();
}
} else {
break;
}
}
next_file
}
pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
let listing = stream::unfold(self, |state| async move {
let next = state.next().await;
next.map(|n| (n, state))
});
Box::pin(listing)
}
}
#[derive(Debug, Clone)]
pub struct FileStatus {
pub path: String,
pub length: usize,
pub isdir: bool,
pub permission: u16,
pub owner: String,
pub group: String,
pub modification_time: u64,
pub access_time: u64,
pub replication: Option<u32>,
pub blocksize: Option<u64>,
}
impl FileStatus {
fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
let mut path = base_path.trim_end_matches("/").to_string();
let relative_path = std::str::from_utf8(&value.path).unwrap();
if !relative_path.is_empty() {
path.push('/');
path.push_str(relative_path);
}
if path.is_empty() {
path.push('/');
}
FileStatus {
isdir: value.file_type() == FileType::IsDir,
path,
length: value.length as usize,
permission: value.permission.perm as u16,
owner: value.owner,
group: value.group,
modification_time: value.modification_time,
access_time: value.access_time,
replication: value.block_replication,
blocksize: value.blocksize,
}
}
}
#[derive(Debug)]
pub struct ContentSummary {
pub length: u64,
pub file_count: u64,
pub directory_count: u64,
pub quota: u64,
pub space_consumed: u64,
pub space_quota: u64,
}
impl From<ContentSummaryProto> for ContentSummary {
fn from(value: ContentSummaryProto) -> Self {
ContentSummary {
length: value.length,
file_count: value.file_count,
directory_count: value.directory_count,
quota: value.quota,
space_consumed: value.space_consumed,
space_quota: value.space_quota,
}
}
}
#[cfg(test)]
mod test {
use std::sync::{Arc, LazyLock};
use tokio::runtime::Runtime;
use url::Url;
use crate::{
client::ClientBuilder,
common::config::Configuration,
hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
};
use super::{MountLink, MountTable};
static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
let proxy = NameServiceProxy::new(
&Url::parse(url).unwrap(),
Arc::new(Configuration::new(None, None).unwrap()),
RT.handle().clone(),
)
.unwrap();
Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
}
#[test]
fn test_default_fs() {
assert!(
ClientBuilder::new()
.with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
.build()
.is_ok()
);
assert!(
ClientBuilder::new()
.with_config(vec![("fs.defaultFS", "hdfs://")])
.build()
.is_err()
);
assert!(
ClientBuilder::new()
.with_url("hdfs://")
.with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
.build()
.is_ok()
);
assert!(
ClientBuilder::new()
.with_url("hdfs://")
.with_config(vec![("fs.defaultFS", "hdfs://")])
.build()
.is_err()
);
assert!(
ClientBuilder::new()
.with_url("hdfs://")
.with_config(vec![("fs.defaultFS", "viewfs://test")])
.build()
.is_err()
);
}
#[test]
fn test_mount_link_resolve() {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
let link = MountLink::new("/view", "/hdfs", protocol);
assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
assert!(link.resolve("/hdfs/path").is_none());
}
#[test]
fn test_fallback_link() {
let protocol = create_protocol("hdfs://127.0.0.1:9000");
let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
let link = MountLink::new("", "", protocol);
assert_eq!(link.resolve("/").unwrap(), "/");
}
#[test]
fn test_mount_table_resolve() {
let link1 = MountLink::new(
"/mount1",
"/path1/nested",
create_protocol("hdfs://127.0.0.1:9000"),
);
let link2 = MountLink::new(
"/mount2",
"/path2",
create_protocol("hdfs://127.0.0.1:9001"),
);
let link3 = MountLink::new(
"/mount3/nested",
"/path3",
create_protocol("hdfs://127.0.0.1:9002"),
);
let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
let mount_table = MountTable {
mounts: vec![link1, link2, link3],
fallback,
home_dir: "/user/test".to_string(),
};
let (link, resolved) = mount_table.resolve("/mount1");
assert_eq!(link.viewfs_path, "/mount1");
assert_eq!(resolved, "/path1/nested");
let (link, resolved) = mount_table.resolve("/mount1/");
assert_eq!(link.viewfs_path, "/mount1");
assert_eq!(resolved, "/path1/nested/");
let (link, resolved) = mount_table.resolve("/mount12");
assert_eq!(link.viewfs_path, "");
assert_eq!(resolved, "/path4/mount12");
let (link, resolved) = mount_table.resolve("/mount3/file");
assert_eq!(link.viewfs_path, "");
assert_eq!(resolved, "/path4/mount3/file");
let (link, resolved) = mount_table.resolve("/mount3/nested/file");
assert_eq!(link.viewfs_path, "/mount3/nested");
assert_eq!(resolved, "/path3/file");
let (link, resolved) = mount_table.resolve("file");
assert_eq!(link.viewfs_path, "");
assert_eq!(resolved, "/path4/user/test/file");
let (link, resolved) = mount_table.resolve("dir/subdir");
assert_eq!(link.viewfs_path, "");
assert_eq!(resolved, "/path4/user/test/dir/subdir");
let mount_table = MountTable {
mounts: vec![
MountLink::new(
"/mount1",
"/path1/nested",
create_protocol("hdfs://127.0.0.1:9000"),
),
MountLink::new(
"/mount2",
"/path2",
create_protocol("hdfs://127.0.0.1:9001"),
),
],
fallback: MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003")),
home_dir: "/mount1/user".to_string(),
};
let (link, resolved) = mount_table.resolve("file");
assert_eq!(link.viewfs_path, "/mount1");
assert_eq!(resolved, "/path1/nested/user/file");
let (link, resolved) = mount_table.resolve("dir/subdir");
assert_eq!(link.viewfs_path, "/mount1");
assert_eq!(resolved, "/path1/nested/user/dir/subdir");
}
#[test]
fn test_io_runtime() {
assert!(
ClientBuilder::new()
.with_url("hdfs://127.0.0.1:9000")
.with_io_runtime(Runtime::new().unwrap())
.build()
.is_ok()
);
let rt = Runtime::new().unwrap();
assert!(
ClientBuilder::new()
.with_url("hdfs://127.0.0.1:9000")
.with_io_runtime(rt.handle().clone())
.build()
.is_ok()
);
}
#[test]
fn test_set_conf_dir() {
assert!(
ClientBuilder::new()
.with_url("hdfs://127.0.0.1:9000")
.with_config_dir("target/test")
.build()
.is_ok()
)
}
}