use crate::err::HdfsErr;
use crate::hdfs;
use crate::hdfs::{get_uri, HdfsFs};
use crate::minidfs::MiniDFS;
use crate::native::{hdfsCopy, hdfsMove};
use std::ffi::CString;
use std::sync::Arc;
pub struct HdfsUtil;
impl HdfsUtil {
pub fn copy_file_to_hdfs(
dfs: Arc<MiniDFS>,
src_path: &str,
dst_path: &str,
) -> Result<bool, HdfsErr> {
let src_uri = get_uri(src_path)?;
let src_fs = hdfs::get_hdfs_by_full_path(&src_uri)?;
let dst_fs = dfs.get_hdfs()?;
HdfsUtil::copy(&src_fs, src_path, &dst_fs, dst_path)
}
pub fn copy_file_from_hdfs(
dfs: Arc<MiniDFS>,
src_path: &str,
dst_path: &str,
) -> Result<bool, HdfsErr> {
let src_fs = dfs.get_hdfs()?;
let dst_uri = get_uri(dst_path)?;
let dst_fs = hdfs::get_hdfs_by_full_path(&dst_uri)?;
HdfsUtil::copy(&src_fs, src_path, &dst_fs, dst_path)
}
pub fn mv_file_to_hdfs(
dfs: Arc<MiniDFS>,
src_path: &str,
dst_path: &str,
) -> Result<bool, HdfsErr> {
let src_uri = get_uri(src_path)?;
let src_fs = hdfs::get_hdfs_by_full_path(&src_uri)?;
let dst_fs = dfs.get_hdfs()?;
HdfsUtil::mv(&src_fs, src_path, &dst_fs, dst_path)
}
pub fn mv_file_from_hdfs(
dfs: Arc<MiniDFS>,
src_path: &str,
dst_path: &str,
) -> Result<bool, HdfsErr> {
let src_fs = dfs.get_hdfs()?;
let dst_uri = get_uri(dst_path)?;
let dst_fs = hdfs::get_hdfs_by_full_path(&dst_uri)?;
HdfsUtil::mv(&src_fs, src_path, &dst_fs, dst_path)
}
pub fn copy(
src_fs: &HdfsFs,
src: &str,
dst_fs: &HdfsFs,
dst: &str,
) -> Result<bool, HdfsErr> {
let res = unsafe {
let cstr_src = CString::new(src).unwrap();
let cstr_dst = CString::new(dst).unwrap();
hdfsCopy(
src_fs.raw(),
cstr_src.as_ptr(),
dst_fs.raw(),
cstr_dst.as_ptr(),
)
};
if res == 0 {
Ok(true)
} else {
Err(HdfsErr::Generic(format!(
"Fail to copy file from {} to {}",
src, dst
)))
}
}
pub fn mv(
src_fs: &HdfsFs,
src: &str,
dst_fs: &HdfsFs,
dst: &str,
) -> Result<bool, HdfsErr> {
let res = unsafe {
let cstr_src = CString::new(src).unwrap();
let cstr_dst = CString::new(dst).unwrap();
hdfsMove(
src_fs.raw(),
cstr_src.as_ptr(),
dst_fs.raw(),
cstr_dst.as_ptr(),
)
};
if res == 0 {
Ok(true)
} else {
Err(HdfsErr::Generic(format!(
"Fail to move file from {} to {}",
src, dst
)))
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::minidfs::get_dfs;
use std::path::Path;
use tempfile::tempdir;
use uuid::Uuid;
#[test]
fn test_from_local() {
let temp_file = tempfile::Builder::new().tempfile().unwrap();
let src_path = temp_file.path();
let src_file_name = src_path.file_name().unwrap().to_str().unwrap();
let src_file = src_path.to_str().unwrap();
let dfs = get_dfs();
{
let src_file_uri = format!("file://{}", src_path.to_str().unwrap());
let src_fs = hdfs::get_hdfs_by_full_path(&src_file_uri).ok().unwrap();
let dst_file = format!("/{}", src_file_name);
let dst_fs = dfs.get_hdfs().ok().unwrap();
{
assert!(
HdfsUtil::copy_file_to_hdfs(dfs.clone(), src_file, &dst_file).is_ok()
);
assert!(dst_fs.exist(&dst_file));
assert!(src_fs.exist(src_file));
assert!(Path::new(src_file).exists());
}
assert!(dst_fs.delete(&dst_file, false).is_ok());
{
assert!(HdfsUtil::mv_file_to_hdfs(dfs, src_file, &dst_file).is_ok());
assert!(dst_fs.exist(&dst_file));
assert!(!src_fs.exist(src_file));
assert!(!Path::new(src_file).exists());
}
assert!(dst_fs.delete(&dst_file, false).is_ok());
}
}
#[test]
fn test_to_local() {
let uuid = Uuid::new_v4().to_string();
let file_name = uuid.as_str();
let temp_dir = tempdir().unwrap();
let dst_path = temp_dir.path().join(file_name);
let dst_file = dst_path.to_str().unwrap();
let dfs = get_dfs();
{
let src_file = format!("/{}", file_name);
let src_fs = dfs.get_hdfs().ok().unwrap();
src_fs.create(&src_file).ok().unwrap();
let dst_file_uri = format!("file://{}", dst_file);
let dst_fs = hdfs::get_hdfs_by_full_path(&dst_file_uri).ok().unwrap();
{
assert!(
HdfsUtil::copy_file_from_hdfs(dfs.clone(), &src_file, dst_file)
.is_ok()
);
assert!(src_fs.exist(&src_file));
assert!(dst_fs.exist(dst_file));
assert!(Path::new(dst_file).exists());
}
assert!(dst_fs.delete(dst_file, false).is_ok());
{
assert!(HdfsUtil::mv_file_from_hdfs(dfs, &src_file, dst_file).is_ok());
assert!(!src_fs.exist(&src_file));
assert!(dst_fs.exist(dst_file));
assert!(Path::new(dst_file).exists());
}
}
}
}