use std::collections::HashMap;
use std::ffi::CStr;
use std::fmt::Formatter;
use std::rc::Rc;
use lazy_static::lazy_static;
use libc::{c_int, c_short, c_void};
use log::*;
use std::sync::RwLock;
use std::{ffi::CString, marker::PhantomData};
use crate::err::HdfsErr;
use crate::*;
const O_RDONLY: c_int = 0;
const O_WRONLY: c_int = 1;
const O_APPEND: c_int = 1024;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ConnectionProperties {
pub namenode_host: String,
pub namenode_port: u16,
pub namenode_user: Option<String>,
pub kerberos_ticket_cache_path: Option<String>,
}
unsafe impl Send for HdfsFs {}
unsafe impl Sync for HdfsFs {}
lazy_static! {
static ref HDFS_CACHE: RwLock<HashMap<ConnectionProperties, HdfsFs>> =
RwLock::new(HashMap::new());
}
#[derive(Clone)]
pub struct HdfsFs {
connection_properties: ConnectionProperties,
raw: hdfsFS,
_marker: PhantomData<()>,
}
impl std::fmt::Debug for HdfsFs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsFs")
.field("url", &self.connection_properties)
.finish()
}
}
impl HdfsFs {
pub fn new(connection_properties: ConnectionProperties) -> Result<HdfsFs, HdfsErr> {
HdfsFs::new_with_hdfs_params(connection_properties, HashMap::new())
}
pub fn new_with_hdfs_params(
connection_properties: ConnectionProperties,
hdfs_params: HashMap<String, String>,
) -> Result<HdfsFs, HdfsErr> {
{
let cache = HDFS_CACHE
.read()
.expect("Could not aquire read lock on HDFS cache");
if let Some(hdfs_fs) = cache.get(&connection_properties) {
return Ok(hdfs_fs.clone());
}
}
let mut cache = HDFS_CACHE
.write()
.expect("Could not aquire write lock on HDFS cache");
let hdfsFs = cache
.entry(connection_properties.clone())
.or_insert_with(|| {
let hdfs_fs = create_hdfs_fs(connection_properties.clone(), hdfs_params)
.expect("Could not create HDFS connection");
HdfsFs {
connection_properties,
raw: hdfs_fs,
_marker: PhantomData,
}
});
Ok(hdfsFs.clone())
}
pub fn append(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
if !self.exist(path) {
return Err(HdfsErr::FileNotFound(path.to_owned()));
}
let file = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsOpenFile(self.raw, cstr_path.as_ptr(), O_APPEND, 0, 0, 0)
};
self.new_hdfs_file(path, file)
}
#[inline]
pub fn create(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
self.create_with_params(path, false, 0, 0, 0)
}
#[inline]
pub fn create_with_overwrite(&self, path: &str, overwrite: bool) -> Result<HdfsFile, HdfsErr> {
self.create_with_params(path, overwrite, 0, 0, 0)
}
pub fn create_with_params(
&self,
path: &str,
overwrite: bool,
buf_size: i32,
replica_num: i16,
block_size: i64,
) -> Result<HdfsFile, HdfsErr> {
if !overwrite && self.exist(path) {
return Err(HdfsErr::FileAlreadyExists(path.to_owned()));
}
let file = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsOpenFile(
self.raw,
cstr_path.as_ptr(),
O_WRONLY,
buf_size as c_int,
replica_num as c_short,
block_size as tOffset,
)
};
self.new_hdfs_file(path, file)
}
pub fn get_file_status(&self, path: &str) -> Result<FileStatus, HdfsErr> {
let ptr = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsGetPathInfo(self.raw, cstr_path.as_ptr())
};
if ptr.is_null() {
Err(HdfsErr::Miscellaneous(format!(
"Could not get file status for {}",
path
)))
} else {
Ok(FileStatus::new(ptr))
}
}
pub fn delete(&self, path: &str, recursive: bool) -> Result<bool, HdfsErr> {
let res = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsDelete(self.raw, cstr_path.as_ptr(), recursive as c_int)
};
if res == 0 {
Ok(true)
} else {
Err(HdfsErr::Miscellaneous(format!(
"Could not delete path: {}",
path
)))
}
}
pub fn exist(&self, path: &str) -> bool {
(unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsExists(self.raw, cstr_path.as_ptr())
} == 0)
}
pub fn list_status(&self, path: &str) -> Result<Vec<FileStatus>, HdfsErr> {
let mut entry_num: c_int = 0;
let ptr = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsListDirectory(self.raw, cstr_path.as_ptr(), &mut entry_num)
};
if ptr.is_null() {
Err(HdfsErr::Miscellaneous(format!(
"Could not list content of path: {}",
path
)))
} else {
let shared_ptr = Rc::new(HdfsFileInfoPtr::new_array(ptr, entry_num));
let list = (0..entry_num)
.into_iter()
.map(|idx| FileStatus::from_array(shared_ptr.clone(), idx as u32))
.collect::<Vec<FileStatus>>();
Ok(list)
}
}
pub fn mkdir(&self, path: &str) -> Result<bool, HdfsErr> {
let res = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsCreateDirectory(self.raw, cstr_path.as_ptr())
};
if res == 0 {
Ok(true)
} else {
Err(HdfsErr::Miscellaneous(format!(
"Could not create directory at path: {}",
path
)))
}
}
#[inline]
pub fn open(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
self.open_with_buf_size(path, 0)
}
pub fn open_with_buf_size(&self, path: &str, buf_size: i32) -> Result<HdfsFile, HdfsErr> {
let file = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsOpenFile(
self.raw,
cstr_path.as_ptr(),
O_RDONLY,
buf_size as c_int,
0,
0,
)
};
self.new_hdfs_file(path, file)
}
pub fn open_for_writing(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
let file = unsafe {
let cstr_path = CString::new(path).unwrap();
hdfsOpenFile(self.raw, cstr_path.as_ptr(), O_WRONLY, 0, 0, 0)
};
self.new_hdfs_file(path, file)
}
fn new_hdfs_file(&self, path: &str, file: hdfsFile) -> Result<HdfsFile, HdfsErr> {
if file.is_null() {
Err(HdfsErr::Miscellaneous(format!(
"Could not open HDFS file at path {}",
path
)))
} else {
Ok(HdfsFile {
fs: self.clone(),
path: path.to_owned(),
file,
_market: PhantomData,
})
}
}
pub fn rename(&self, old_path: &str, new_path: &str) -> Result<bool, HdfsErr> {
let ret = unsafe {
let cstr_old_path = CString::new(old_path).unwrap();
let cstr_new_path = CString::new(new_path).unwrap();
hdfsRename(self.raw, cstr_old_path.as_ptr(), cstr_new_path.as_ptr())
};
if ret == 0 {
Ok(true)
} else {
Err(HdfsErr::Miscellaneous(format!(
"Could not reanme {} to {}",
old_path, new_path
)))
}
}
}
struct HdfsFileInfoPtr {
pub ptr: *const hdfsFileInfo,
pub len: i32,
}
impl Drop for HdfsFileInfoPtr {
fn drop(&mut self) {
unsafe { hdfsFreeFileInfo(self.ptr as *mut hdfsFileInfo, self.len) };
}
}
impl HdfsFileInfoPtr {
fn new(ptr: *const hdfsFileInfo) -> HdfsFileInfoPtr {
HdfsFileInfoPtr { ptr, len: 1 }
}
pub fn new_array(ptr: *const hdfsFileInfo, len: i32) -> HdfsFileInfoPtr {
HdfsFileInfoPtr { ptr, len }
}
}
pub struct FileStatus {
raw: Rc<HdfsFileInfoPtr>,
idx: u32,
_marker: PhantomData<()>,
}
impl FileStatus {
#[inline]
fn new(ptr: *const hdfsFileInfo) -> FileStatus {
FileStatus {
raw: Rc::new(HdfsFileInfoPtr::new(ptr)),
idx: 0,
_marker: PhantomData,
}
}
#[inline]
fn from_array(raw: Rc<HdfsFileInfoPtr>, idx: u32) -> FileStatus {
FileStatus {
raw,
idx,
_marker: PhantomData,
}
}
#[inline]
fn ptr(&self) -> *const hdfsFileInfo {
unsafe { self.raw.ptr.offset(self.idx as isize) }
}
#[inline]
pub fn name(&self) -> &str {
let slice = unsafe { CStr::from_ptr((*self.ptr()).mName) }.to_bytes();
std::str::from_utf8(slice).unwrap()
}
#[inline]
pub fn is_file(&self) -> bool {
match unsafe { &*self.ptr() }.mKind {
tObjectKind::kObjectKindFile => true,
tObjectKind::kObjectKindDirectory => false,
}
}
#[inline]
pub fn is_directory(&self) -> bool {
match unsafe { &*self.ptr() }.mKind {
tObjectKind::kObjectKindFile => false,
tObjectKind::kObjectKindDirectory => true,
}
}
#[inline]
pub fn owner(&self) -> &str {
let slice = unsafe { CStr::from_ptr((*self.ptr()).mOwner) }.to_bytes();
std::str::from_utf8(slice).unwrap()
}
#[inline]
pub fn group(&self) -> &str {
let slice = unsafe { CStr::from_ptr((*self.ptr()).mGroup) }.to_bytes();
std::str::from_utf8(slice).unwrap()
}
#[inline]
pub fn permission(&self) -> i16 {
unsafe { &*self.ptr() }.mPermissions as i16
}
#[allow(clippy::len_without_is_empty)]
#[inline]
pub fn len(&self) -> usize {
unsafe { &*self.ptr() }.mSize as usize
}
#[inline]
pub fn block_size(&self) -> usize {
unsafe { &*self.ptr() }.mBlockSize as usize
}
#[inline]
pub fn replica_count(&self) -> i16 {
unsafe { &*self.ptr() }.mReplication as i16
}
#[inline]
pub fn last_modified(&self) -> time_t {
unsafe { &*self.ptr() }.mLastMod
}
#[inline]
pub fn last_access(&self) -> time_t {
unsafe { &*self.ptr() }.mLastAccess
}
}
#[derive(Clone)]
pub struct HdfsFile {
fs: HdfsFs,
path: String,
file: hdfsFile,
_market: PhantomData<()>,
}
impl std::fmt::Debug for HdfsFile {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsFile")
.field("connection_properties", &self.fs.connection_properties)
.field("path", &self.path)
.finish()
}
}
impl HdfsFile {
#[inline]
pub fn fs(&self) -> &HdfsFs {
&self.fs
}
#[inline]
pub fn path(&self) -> &str {
&self.path
}
pub fn available(&self) -> Result<i32, HdfsErr> {
let ret = unsafe { hdfsAvailable(self.fs.raw, self.file) };
if ret < 0 {
Err(HdfsErr::Miscellaneous(format!(
"Could not determine HDFS availability for {}",
self.path
)))
} else {
Ok(ret)
}
}
pub fn close(&self) -> Result<bool, HdfsErr> {
if unsafe { hdfsCloseFile(self.fs.raw, self.file) } == 0 {
Ok(true)
} else {
Err(HdfsErr::Miscellaneous(format!(
"Could not close {}",
self.path
)))
}
}
pub fn get_file_status(&self) -> Result<FileStatus, HdfsErr> {
self.fs.get_file_status(self.path())
}
pub fn read(&self, buf: &mut [u8]) -> Result<i32, HdfsErr> {
let read_len = unsafe {
hdfsRead(
self.fs.raw,
self.file,
buf.as_ptr() as *mut c_void,
buf.len() as tSize,
)
};
if read_len > 0 {
Ok(read_len as i32)
} else {
Err(HdfsErr::Miscellaneous(format!(
"Failed to read from {}",
self.path
)))
}
}
pub fn seek(&self, offset: u64) -> bool {
(unsafe { hdfsSeek(self.fs.raw, self.file, offset as tOffset) }) == 0
}
pub fn write(&self, buf: &[u8]) -> Result<i32, HdfsErr> {
let written_len = unsafe {
hdfsWrite(
self.fs.raw,
self.file,
buf.as_ptr() as *mut c_void,
buf.len() as tSize,
)
};
if written_len > 0 {
Ok(written_len)
} else {
Err(HdfsErr::Miscellaneous(format!(
"Failed to write to {}",
self.path
)))
}
}
}
fn create_hdfs_fs(
connection_properties: ConnectionProperties,
hdfs_params: HashMap<String, String>,
) -> Result<hdfsFS, HdfsErr> {
let hdfs_fs = unsafe {
let hdfs_builder = hdfsNewBuilder();
let cstr_host = CString::new(connection_properties.namenode_host.as_bytes()).unwrap();
for (k, v) in hdfs_params {
let cstr_k = CString::new(k).unwrap();
let cstr_v = CString::new(v).unwrap();
hdfsBuilderConfSetStr(hdfs_builder, cstr_k.as_ptr(), cstr_v.as_ptr());
}
hdfsBuilderSetNameNode(hdfs_builder, cstr_host.as_ptr());
hdfsBuilderSetNameNodePort(hdfs_builder, connection_properties.namenode_port);
if let Some(user) = connection_properties.namenode_user.clone() {
let cstr_user = CString::new(user.as_bytes()).unwrap();
hdfsBuilderSetUserName(hdfs_builder, cstr_user.as_ptr());
}
if let Some(kerb_ticket_cache_path) =
connection_properties.kerberos_ticket_cache_path.clone()
{
let cstr_kerb_ticket_cache_path =
CString::new(kerb_ticket_cache_path.as_bytes()).unwrap();
hdfsBuilderSetKerbTicketCachePath(hdfs_builder, cstr_kerb_ticket_cache_path.as_ptr());
}
info!(
"Connecting to Namenode, host: {}, port: {}, user: {:?}, krb_ticket_cache: {:?}",
connection_properties.namenode_host,
connection_properties.namenode_port,
connection_properties.namenode_user,
connection_properties.kerberos_ticket_cache_path
);
hdfsBuilderConnect(hdfs_builder)
};
if hdfs_fs.is_null() {
Err(HdfsErr::CannotConnectToNameNode(format!(
"{}:{}",
connection_properties.namenode_host, connection_properties.namenode_port
)))
} else {
Ok(hdfs_fs)
}
}