use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fmt::Write;
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
use std::string::String;
use std::sync::{Arc, RwLock};
use lazy_static::lazy_static;
use libc::{c_char, c_int, c_short, c_void, time_t};
use log::info;
use url::Url;
pub use crate::err::HdfsErr;
use crate::native::*;
const O_RDONLY: c_int = 0;
const O_WRONLY: c_int = 1;
const O_APPEND: c_int = 1024;
lazy_static! {
static ref HDFS_MANAGER: HdfsManager = HdfsManager::new();
}
pub fn get_hdfs_by_full_path(path: &str) -> Result<Arc<HdfsFs>, HdfsErr> {
HDFS_MANAGER.get_hdfs_by_full_path(path)
}
pub fn get_hdfs() -> Result<Arc<HdfsFs>, HdfsErr> {
HDFS_MANAGER.get_hdfs_by_full_path("default")
}
pub fn unload_hdfs_cache_by_full_path(
path: &str,
) -> Result<Option<Arc<HdfsFs>>, HdfsErr> {
HDFS_MANAGER.remove_hdfs_by_full_path(path)
}
pub fn unload_hdfs_cache(hdfs: Arc<HdfsFs>) -> Result<Option<Arc<HdfsFs>>, HdfsErr> {
HDFS_MANAGER.remove_hdfs(hdfs)
}
struct HdfsManager {
hdfs_cache: Arc<RwLock<HashMap<String, Arc<HdfsFs>>>>,
}
impl HdfsManager {
fn new() -> Self {
Self {
hdfs_cache: Arc::new(RwLock::new(HashMap::new())),
}
}
fn get_hdfs_by_full_path(&self, path: &str) -> Result<Arc<HdfsFs>, HdfsErr> {
let namenode_uri = match path {
"default" => "default".to_owned(),
_ => get_namenode_uri(path)?,
};
if let Some(hdfs_fs) = {
let cache = self.hdfs_cache.read().unwrap();
cache.get(&namenode_uri).cloned()
} {
return Ok(hdfs_fs);
}
let mut cache = self.hdfs_cache.write().unwrap();
let ret = if let Some(hdfs_fs) = cache.get(&namenode_uri) {
hdfs_fs.clone()
} else {
let hdfs_fs = unsafe {
let hdfs_builder = hdfsNewBuilder();
let cstr_uri = CString::new(namenode_uri.as_bytes()).unwrap();
hdfsBuilderSetNameNode(hdfs_builder, cstr_uri.as_ptr());
info!("Connecting to Namenode ({})", &namenode_uri);
hdfsBuilderConnect(hdfs_builder)
};
if hdfs_fs.is_null() {
return Err(HdfsErr::CannotConnectToNameNode(namenode_uri.clone()));
}
let hdfs_fs = Arc::new(HdfsFs {
url: namenode_uri.clone(),
raw: hdfs_fs,
_marker: PhantomData,
});
cache.insert(namenode_uri.clone(), hdfs_fs.clone());
hdfs_fs
};
Ok(ret)
}
fn remove_hdfs_by_full_path(
&self,
path: &str,
) -> Result<Option<Arc<HdfsFs>>, HdfsErr> {
let namenode_uri = match path {
"default" => String::from("default"),
_ => get_namenode_uri(path)?,
};
self.remove_hdfs_inner(&namenode_uri)
}
fn remove_hdfs(&self, hdfs: Arc<HdfsFs>) -> Result<Option<Arc<HdfsFs>>, HdfsErr> {
self.remove_hdfs_inner(hdfs.url())
}
fn remove_hdfs_inner(&self, hdfs_key: &str) -> Result<Option<Arc<HdfsFs>>, HdfsErr> {
let mut cache = self.hdfs_cache.write().unwrap();
Ok(cache.remove(hdfs_key))
}
}
#[derive(Clone)]
pub struct HdfsFs {
url: String,
raw: hdfsFS,
_marker: PhantomData<()>,
}
impl Debug for HdfsFs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsFs").field("url", &self.url).finish()
}
}
impl HdfsFs {
#[inline]
pub fn url(&self) -> &str {
&self.url
}
#[inline]
pub fn raw(&self) -> hdfsFS {
self.raw
}
fn new_hdfs_file(&self, path: &str, file: hdfsFile) -> Result<HdfsFile, HdfsErr> {
if file.is_null() {
Err(HdfsErr::Generic(format!(
"Fail to create/open file {}",
path
)))
} else {
Ok(HdfsFile {
fs: self.clone(),
path: path.to_owned(),
file,
_marker: PhantomData,
})
}
}
#[inline]
pub fn open(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
self.open_with_buf_size(path, 0)
}
pub fn open_with_buf_size(
&self,
path: &str,
buf_size: i32,
) -> Result<HdfsFile, HdfsErr> {
let file = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsOpenFile(
self.raw,
cstr_path.as_ptr(),
O_RDONLY,
buf_size as c_int,
0,
0,
)
};
self.new_hdfs_file(path, file)
}
pub fn get_file_status(&self, path: &str) -> Result<FileStatus, HdfsErr> {
let ptr = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsGetPathInfo(self.raw, cstr_path.as_ptr())
};
if ptr.is_null() {
Err(HdfsErr::Generic(format!(
"Fail to get file status for {}",
path
)))
} else {
Ok(FileStatus::new(ptr))
}
}
pub fn list_status(&self, path: &str) -> Result<Vec<FileStatus>, HdfsErr> {
let mut entry_num: c_int = 0;
let ptr = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsListDirectory(self.raw, cstr_path.as_ptr(), &mut entry_num)
};
let mut list = Vec::new();
if ptr.is_null() {
return Ok(list);
}
let shared_ptr = Arc::new(HdfsFileInfoPtr::new_array(ptr, entry_num));
for idx in 0..entry_num {
list.push(FileStatus::from_array(shared_ptr.clone(), idx as u32));
}
Ok(list)
}
pub fn default_blocksize(&self) -> Result<usize, HdfsErr> {
let block_sz = unsafe { hdfsGetDefaultBlockSize(self.raw) };
if block_sz > 0 {
Ok(block_sz as usize)
} else {
Err(HdfsErr::Generic(
"Fail to get default block size".to_owned(),
))
}
}
pub fn block_size(&self, path: &str) -> Result<usize, HdfsErr> {
let block_sz = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsGetDefaultBlockSizeAtPath(self.raw, cstr_path.as_ptr())
};
if block_sz > 0 {
Ok(block_sz as usize)
} else {
Err(HdfsErr::Generic(format!(
"Fail to get block size for file {}",
path
)))
}
}
pub fn capacity(&self) -> Result<usize, HdfsErr> {
let block_sz = unsafe { hdfsGetCapacity(self.raw) };
if block_sz > 0 {
Ok(block_sz as usize)
} else {
Err(HdfsErr::Generic("Fail to get capacity".to_owned()))
}
}
pub fn used(&self) -> Result<usize, HdfsErr> {
let block_sz = unsafe { hdfsGetUsed(self.raw) };
if block_sz > 0 {
Ok(block_sz as usize)
} else {
Err(HdfsErr::Generic("Fail to get used size".to_owned()))
}
}
pub fn exist(&self, path: &str) -> bool {
(unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsExists(self.raw, cstr_path.as_ptr())
} == 0)
}
pub fn get_hosts(
&self,
path: &str,
start: usize,
length: usize,
) -> Result<BlockHosts, HdfsErr> {
let ptr = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsGetHosts(
self.raw,
cstr_path.as_ptr(),
start as tOffset,
length as tOffset,
)
};
if !ptr.is_null() {
Ok(BlockHosts { ptr })
} else {
Err(HdfsErr::Generic(format!(
"Fail to get block hosts for file {} from {} with length {}",
path, start, length
)))
}
}
#[inline]
pub fn create(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
self.create_with_params(path, false, 0, 0, 0)
}
#[inline]
pub fn create_with_overwrite(
&self,
path: &str,
overwrite: bool,
) -> Result<HdfsFile, HdfsErr> {
self.create_with_params(path, overwrite, 0, 0, 0)
}
pub fn create_with_params(
&self,
path: &str,
overwrite: bool,
buf_size: i32,
replica_num: i16,
block_size: i32,
) -> Result<HdfsFile, HdfsErr> {
if !overwrite && self.exist(path) {
return Err(HdfsErr::FileAlreadyExists(path.to_owned()));
}
let file = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsOpenFile(
self.raw,
cstr_path.as_ptr(),
O_WRONLY,
buf_size as c_int,
replica_num as c_short,
block_size as tSize,
)
};
self.new_hdfs_file(path, file)
}
pub fn chmod(&self, path: &str, mode: i16) -> bool {
(unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsChmod(self.raw, cstr_path.as_ptr(), mode as c_short)
}) == 0
}
pub fn chown(&self, path: &str, owner: &str, group: &str) -> bool {
(unsafe {
let cstr_path = CString::new(path).unwrap();
let cstr_owner = CString::new(owner).unwrap();
let cstr_group = CString::new(group).unwrap();
hdfsChown(
self.raw,
cstr_path.as_ptr(),
cstr_owner.as_ptr(),
cstr_group.as_ptr(),
)
}) == 0
}
pub fn append(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
if !self.exist(path) {
return Err(HdfsErr::FileNotFound(path.to_owned()));
}
let file = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsOpenFile(self.raw, cstr_path.as_ptr(), O_APPEND, 0, 0, 0)
};
self.new_hdfs_file(path, file)
}
pub fn mkdir(&self, path: &str) -> Result<bool, HdfsErr> {
if unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsCreateDirectory(self.raw, cstr_path.as_ptr())
} == 0
{
Ok(true)
} else {
Err(HdfsErr::Generic(format!(
"Fail to create directory for {}",
path
)))
}
}
pub fn rename(&self, old_path: &str, new_path: &str) -> Result<bool, HdfsErr> {
if unsafe {
let cstr_old_path = CString::new(old_path).unwrap();
let cstr_new_path = CString::new(new_path).unwrap();
hdfsRename(self.raw, cstr_old_path.as_ptr(), cstr_new_path.as_ptr())
} == 0
{
Ok(true)
} else {
Err(HdfsErr::Generic(format!(
"Fail to rename {} to {}",
old_path, new_path
)))
}
}
pub fn set_replication(&self, path: &str, num: i16) -> Result<bool, HdfsErr> {
if unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsSetReplication(self.raw, cstr_path.as_ptr(), num as i16)
} == 0
{
Ok(true)
} else {
Err(HdfsErr::Generic(format!(
"Fail to set replication {} for {}",
num, path
)))
}
}
pub fn delete(&self, path: &str, recursive: bool) -> Result<bool, HdfsErr> {
if unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsDelete(self.raw, cstr_path.as_ptr(), recursive as c_int)
} == 0
{
Ok(true)
} else {
Err(HdfsErr::Generic(format!(
"Fail to delete {} with recursive mode {}",
path, recursive
)))
}
}
}
unsafe impl Send for HdfsFs {}
unsafe impl Sync for HdfsFs {}
#[derive(Clone)]
pub struct HdfsFile {
fs: HdfsFs,
path: String,
file: hdfsFile,
_marker: PhantomData<()>,
}
impl Debug for HdfsFile {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsFile")
.field("url", &self.fs.url)
.field("path", &self.path)
.finish()
}
}
impl HdfsFile {
#[inline]
pub fn fs(&self) -> &HdfsFs {
&self.fs
}
#[inline]
pub fn path(&self) -> &str {
&self.path
}
pub fn available(&self) -> Result<bool, HdfsErr> {
if unsafe { hdfsAvailable(self.fs.raw, self.file) } == 0 {
Ok(true)
} else {
Err(HdfsErr::Generic(format!(
"File {} is not available",
self.path()
)))
}
}
pub fn close(&self) -> Result<bool, HdfsErr> {
if unsafe { hdfsCloseFile(self.fs.raw, self.file) } == 0 {
Ok(true)
} else {
Err(HdfsErr::Generic(format!(
"Fail to close file {}",
self.path()
)))
}
}
pub fn flush(&self) -> bool {
(unsafe { hdfsFlush(self.fs.raw, self.file) }) == 0
}
pub fn hflush(&self) -> bool {
(unsafe { hdfsHFlush(self.fs.raw, self.file) }) == 0
}
pub fn hsync(&self) -> bool {
(unsafe { hdfsHSync(self.fs.raw, self.file) }) == 0
}
pub fn is_readable(&self) -> bool {
(unsafe { hdfsFileIsOpenForRead(self.file) }) == 1
}
pub fn is_writable(&self) -> bool {
(unsafe { hdfsFileIsOpenForWrite(self.file) }) == 1
}
pub fn get_file_status(&self) -> Result<FileStatus, HdfsErr> {
self.fs.get_file_status(self.path())
}
pub fn pos(&self) -> Result<u64, HdfsErr> {
let pos = unsafe { hdfsTell(self.fs.raw, self.file) };
if pos > 0 {
Ok(pos as u64)
} else {
Err(HdfsErr::Generic(format!(
"Fail to get current offset of file {}",
self.path()
)))
}
}
pub fn read(&self, buf: &mut [u8]) -> Result<i32, HdfsErr> {
let read_len = unsafe {
hdfsRead(
self.fs.raw,
self.file,
buf.as_ptr() as *mut c_void,
buf.len() as tSize,
)
};
if read_len > 0 {
Ok(read_len as i32)
} else {
Err(HdfsErr::Generic(format!(
"Fail to read contents from {} with return code {}",
self.path(),
read_len
)))
}
}
pub fn read_with_pos(&self, pos: i64, buf: &mut [u8]) -> Result<i32, HdfsErr> {
let read_len = unsafe {
hdfsPread(
self.fs.raw,
self.file,
pos as tOffset,
buf.as_ptr() as *mut c_void,
buf.len() as tSize,
)
};
if read_len > 0 {
Ok(read_len as i32)
} else {
Err(HdfsErr::Generic(format!(
"Fail to read contents from {} with offset {} and return code {}",
self.path(),
pos,
read_len
)))
}
}
pub fn seek(&self, offset: u64) -> bool {
(unsafe { hdfsSeek(self.fs.raw, self.file, offset as tOffset) }) == 0
}
pub fn write(&self, buf: &[u8]) -> Result<i32, HdfsErr> {
let written_len = unsafe {
hdfsWrite(
self.fs.raw,
self.file,
buf.as_ptr() as *mut c_void,
buf.len() as tSize,
)
};
if written_len > 0 {
Ok(written_len)
} else {
Err(HdfsErr::Generic(format!(
"Fail to write contents to file {}",
self.path()
)))
}
}
}
unsafe impl Send for HdfsFile {}
unsafe impl Sync for HdfsFile {}
#[derive(Clone)]
pub struct FileStatus {
raw: Arc<HdfsFileInfoPtr>,
idx: u32,
_marker: PhantomData<()>,
}
impl FileStatus {
#[inline]
fn new(ptr: *const hdfsFileInfo) -> FileStatus {
FileStatus {
raw: Arc::new(HdfsFileInfoPtr::new(ptr)),
idx: 0,
_marker: PhantomData,
}
}
#[inline]
fn from_array(raw: Arc<HdfsFileInfoPtr>, idx: u32) -> FileStatus {
FileStatus {
raw,
idx,
_marker: PhantomData,
}
}
#[inline]
fn ptr(&self) -> *const hdfsFileInfo {
unsafe { self.raw.ptr.offset(self.idx as isize) }
}
#[inline]
pub fn name(&self) -> &str {
let slice = unsafe { CStr::from_ptr((*self.ptr()).mName) }.to_bytes();
std::str::from_utf8(slice).unwrap()
}
#[inline]
pub fn is_file(&self) -> bool {
match unsafe { &*self.ptr() }.mKind {
tObjectKind::kObjectKindFile => true,
tObjectKind::kObjectKindDirectory => false,
}
}
#[inline]
pub fn is_directory(&self) -> bool {
match unsafe { &*self.ptr() }.mKind {
tObjectKind::kObjectKindFile => false,
tObjectKind::kObjectKindDirectory => true,
}
}
#[inline]
pub fn owner(&self) -> &str {
let slice = unsafe { CStr::from_ptr((*self.ptr()).mOwner) }.to_bytes();
std::str::from_utf8(slice).unwrap()
}
#[inline]
pub fn group(&self) -> &str {
let slice = unsafe { CStr::from_ptr((*self.ptr()).mGroup) }.to_bytes();
std::str::from_utf8(slice).unwrap()
}
#[inline]
pub fn permission(&self) -> i16 {
unsafe { &*self.ptr() }.mPermissions as i16
}
#[allow(clippy::len_without_is_empty)]
#[inline]
pub fn len(&self) -> usize {
unsafe { &*self.ptr() }.mSize as usize
}
#[inline]
pub fn block_size(&self) -> usize {
unsafe { &*self.ptr() }.mBlockSize as usize
}
#[inline]
pub fn replica_count(&self) -> i16 {
unsafe { &*self.ptr() }.mReplication as i16
}
#[inline]
pub fn last_modified(&self) -> time_t {
unsafe { &*self.ptr() }.mLastMod
}
#[inline]
pub fn last_access(&self) -> time_t {
unsafe { &*self.ptr() }.mLastAccess
}
}
struct HdfsFileInfoPtr {
pub ptr: *const hdfsFileInfo,
pub len: i32,
}
impl Drop for HdfsFileInfoPtr {
fn drop(&mut self) {
unsafe { hdfsFreeFileInfo(self.ptr as *mut hdfsFileInfo, self.len) };
}
}
impl HdfsFileInfoPtr {
fn new(ptr: *const hdfsFileInfo) -> HdfsFileInfoPtr {
HdfsFileInfoPtr { ptr, len: 1 }
}
pub fn new_array(ptr: *const hdfsFileInfo, len: i32) -> HdfsFileInfoPtr {
HdfsFileInfoPtr { ptr, len }
}
}
unsafe impl Send for HdfsFileInfoPtr {}
unsafe impl Sync for HdfsFileInfoPtr {}
pub struct BlockHosts {
ptr: *mut *mut *mut c_char,
}
impl Drop for BlockHosts {
fn drop(&mut self) {
unsafe { hdfsFreeHosts(self.ptr) };
}
}
pub const LOCAL_FS_SCHEME: &str = "file";
pub const HDFS_FS_SCHEME: &str = "hdfs";
pub const VIEW_FS_SCHEME: &str = "viewfs";
#[inline]
fn get_namenode_uri(path: &str) -> Result<String, HdfsErr> {
match Url::parse(path) {
Ok(url) => match url.scheme() {
LOCAL_FS_SCHEME => Ok("file:///".to_string()),
HDFS_FS_SCHEME | VIEW_FS_SCHEME => {
if let Some(host) = url.host() {
let mut uri_builder = String::new();
write!(&mut uri_builder, "{}://{}", url.scheme(), host).unwrap();
if let Some(port) = url.port() {
write!(&mut uri_builder, ":{}", port).unwrap();
}
Ok(uri_builder)
} else {
Err(HdfsErr::InvalidUrl(path.to_string()))
}
}
_ => Err(HdfsErr::InvalidUrl(path.to_string())),
},
Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())),
}
}
#[inline]
pub fn get_uri(path: &str) -> Result<String, HdfsErr> {
let path = if path.starts_with('/') {
format!("{}://{}", LOCAL_FS_SCHEME, path)
} else {
path.to_string()
};
match Url::parse(&path) {
Ok(url) => match url.scheme() {
LOCAL_FS_SCHEME | HDFS_FS_SCHEME | VIEW_FS_SCHEME => Ok(url.to_string()),
_ => Err(HdfsErr::InvalidUrl(path.to_string())),
},
Err(_) => Err(HdfsErr::InvalidUrl(path.to_string())),
}
}
#[cfg(test)]
mod test {
use uuid::Uuid;
use crate::minidfs::get_dfs;
#[cfg(feature = "use_existing_hdfs")]
#[test]
fn test_hdfs_default() {
let fs = super::get_hdfs().ok().unwrap();
let uuid = Uuid::new_v4().to_string();
let test_file = uuid.as_str();
let created_file = match fs.create(test_file) {
Ok(f) => f,
Err(_) => panic!("Couldn't create a file"),
};
assert!(created_file.close().is_ok());
assert!(fs.exist(test_file));
assert!(fs.delete(test_file, false).is_ok());
assert!(!fs.exist(test_file));
}
#[test]
fn test_hdfs() {
let dfs = get_dfs();
{
let minidfs_addr = dfs.namenode_addr();
let fs = dfs.get_hdfs().ok().unwrap();
{
let uuid = Uuid::new_v4().to_string();
let test_file = uuid.as_str();
let created_file = match fs.create(test_file) {
Ok(f) => f,
Err(_) => panic!("Couldn't create a file"),
};
assert!(created_file.close().is_ok());
assert!(fs.exist(test_file));
let opened_file = fs.open(test_file).ok().unwrap();
assert!(opened_file.close().is_ok());
assert!(fs.delete(test_file, false).is_ok());
}
{
let uuid = Uuid::new_v4().to_string();
let test_dir = format!("/{}", uuid);
match fs.mkdir(&test_dir) {
Ok(_) => println!("{} created", test_dir),
Err(_) => panic!("Couldn't create {} directory", test_dir),
};
let file_info = fs.get_file_status(&test_dir).ok().unwrap();
let expected_path = format!("{}{}", minidfs_addr, test_dir);
assert_eq!(&expected_path, file_info.name());
assert!(!file_info.is_file());
assert!(file_info.is_directory());
let sub_dir_num = 3;
let mut expected_list = Vec::new();
for x in 0..sub_dir_num {
let filename = format!("{}/{}", test_dir, x);
expected_list.push(format!("{}{}/{}", minidfs_addr, test_dir, x));
match fs.mkdir(&filename) {
Ok(_) => println!("{} created", filename),
Err(_) => panic!("Couldn't create {} directory", filename),
};
}
let mut list = fs.list_status(&test_dir).ok().unwrap();
assert_eq!(sub_dir_num, list.len());
list.sort_by(|a, b| Ord::cmp(a.name(), b.name()));
for (expected, name) in expected_list
.iter()
.zip(list.iter().map(|status| status.name()))
{
assert_eq!(expected, name);
}
assert!(fs.delete(&test_dir, true).is_ok());
}
}
}
#[test]
fn test_list_status_with_empty_dir() {
let dfs = get_dfs();
{
let fs = dfs.get_hdfs().ok().unwrap();
{
let uuid = Uuid::new_v4().to_string();
let test_dir = format!("/{}", uuid);
let empty_dir = format!("/{}", "_empty");
match fs.mkdir(&test_dir) {
Ok(_) => println!("{} created", test_dir),
Err(_) => panic!("Couldn't create {} directory", test_dir),
};
match fs.mkdir(&empty_dir) {
Ok(_) => println!("{} created", test_dir),
Err(_) => panic!("Couldn't create {} directory", test_dir),
};
let test_file = format!("{}/{}", &test_dir, "test.txt");
match fs.create(&test_file) {
Ok(_) => println!("{} created", test_file),
Err(_) => panic!("Couldn't create {} ", test_file),
}
let file_info = fs.list_status(&test_dir).ok().unwrap();
assert_eq!(file_info.len(), 1);
let file_info = fs.list_status(&empty_dir).ok().unwrap();
assert_eq!(file_info.len(), 0);
assert!(fs.delete(&test_dir, true).is_ok());
}
}
}
}