use std::ffi::CString;
use std::io;
use std::mem::MaybeUninit;
use errno::{set_errno, Errno};
use hdfs_sys::*;
use log::debug;
use crate::metadata::Metadata;
use crate::{OpenOptions, Readdir};
#[derive(Debug)]
pub struct Client {
fs: hdfsFS,
}
pub struct ClientBuilder {
name_node: String,
user: Option<String>,
kerberos_ticket_cache_path: Option<String>,
}
impl ClientBuilder {
pub fn new(name_node: &str) -> ClientBuilder {
ClientBuilder {
name_node: name_node.to_string(),
user: None,
kerberos_ticket_cache_path: None,
}
}
pub fn with_user(mut self, user: &str) -> ClientBuilder {
self.user = Some(user.to_string());
self
}
pub fn with_kerberos_ticket_cache_path(
mut self,
kerberos_ticket_cache_path: &str,
) -> ClientBuilder {
self.kerberos_ticket_cache_path = Some(kerberos_ticket_cache_path.to_string());
self
}
pub fn connect(self) -> io::Result<Client> {
set_errno(Errno(0));
debug!("connect name node {}", &self.name_node);
let fs = {
let builder = unsafe { hdfsNewBuilder() };
let name_node = CString::new(self.name_node.as_bytes())?;
let mut user = MaybeUninit::uninit();
let mut ticket_cache_path = MaybeUninit::uninit();
unsafe { hdfsBuilderSetNameNode(builder, name_node.as_ptr()) };
if let Some(v) = self.user {
user.write(CString::new(v)?);
unsafe {
hdfsBuilderSetUserName(builder, user.assume_init_ref().as_ptr());
}
}
if let Some(v) = self.kerberos_ticket_cache_path {
ticket_cache_path.write(CString::new(v)?);
unsafe {
hdfsBuilderSetKerbTicketCachePath(
builder,
ticket_cache_path.assume_init_ref().as_ptr(),
);
}
}
unsafe { hdfsBuilderConnect(builder) }
};
if fs.is_null() {
return Err(io::Error::last_os_error());
}
debug!("name node {} connected", self.name_node);
Ok(Client::new(fs))
}
}
unsafe impl Send for Client {}
unsafe impl Sync for Client {}
impl Client {
pub(crate) fn new(fs: hdfsFS) -> Self {
Self { fs }
}
pub fn open_file(&self) -> OpenOptions {
OpenOptions::new(self.fs)
}
pub fn remove_file(&self, path: &str) -> io::Result<()> {
debug!("remove file {}", path);
let n = unsafe {
let p = CString::new(path)?;
hdfsDelete(self.fs, p.as_ptr(), false.into())
};
if n == -1 {
return Err(io::Error::last_os_error());
}
debug!("delete file {} finished", path);
Ok(())
}
pub fn rename_file(&self, old_path: &str, new_path: &str) -> io::Result<()> {
debug!("rename file {} -> {}", old_path, new_path);
let n = {
let old_path = CString::new(old_path)?;
let new_path = CString::new(new_path)?;
unsafe { hdfsRename(self.fs, old_path.as_ptr(), new_path.as_ptr()) }
};
if n == -1 {
return Err(io::Error::last_os_error());
}
debug!("rename file {} -> {} finished", old_path, new_path);
Ok(())
}
pub fn remove_dir(&self, path: &str) -> io::Result<()> {
debug!("remove dir {}", path);
let n = unsafe {
let p = CString::new(path)?;
hdfsDelete(self.fs, p.as_ptr(), false.into())
};
if n == -1 {
return Err(io::Error::last_os_error());
}
debug!("delete dir {} finished", path);
Ok(())
}
pub fn remove_dir_all(&self, path: &str) -> io::Result<()> {
debug!("remove dir all {}", path);
let n = unsafe {
let p = CString::new(path)?;
hdfsDelete(self.fs, p.as_ptr(), true.into())
};
if n == -1 {
return Err(io::Error::last_os_error());
}
debug!("delete dir all {} finished", path);
Ok(())
}
pub fn metadata(&self, path: &str) -> io::Result<Metadata> {
set_errno(Errno(0));
let hfi = unsafe {
let p = CString::new(path)?;
hdfsGetPathInfo(self.fs, p.as_ptr())
};
if hfi.is_null() {
return Err(io::Error::last_os_error());
}
let fi = unsafe { Metadata::from(*hfi) };
unsafe { hdfsFreeFileInfo(hfi, 1) };
Ok(fi)
}
pub fn read_dir(&self, path: &str) -> io::Result<Readdir> {
set_errno(Errno(0));
let mut entries = 0;
let hfis = unsafe {
let p = CString::new(path)?;
hdfsListDirectory(self.fs, p.as_ptr(), &mut entries)
};
if hfis.is_null() {
let e = io::Error::last_os_error();
return match e.raw_os_error() {
None => Ok(Vec::new().into()),
Some(0) => Ok(Vec::new().into()),
Some(_) => Err(e),
};
}
let mut fis = Vec::with_capacity(entries as usize);
for i in 0..entries {
let m = unsafe { Metadata::from(*hfis.offset(i as isize)) };
fis.push(m)
}
unsafe { hdfsFreeFileInfo(hfis, entries) };
Ok(fis.into())
}
pub fn create_dir(&self, path: &str) -> io::Result<()> {
let n = unsafe {
let p = CString::new(path)?;
hdfsCreateDirectory(self.fs, p.as_ptr())
};
if n == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::io;
use log::debug;
use crate::client::ClientBuilder;
#[test]
fn test_client_connect() {
let _ = env_logger::try_init();
let fs = ClientBuilder::new("default")
.connect()
.expect("init success");
assert!(!fs.fs.is_null())
}
#[test]
fn test_client_open() {
let _ = env_logger::try_init();
let fs = ClientBuilder::new("default")
.connect()
.expect("init success");
let path = uuid::Uuid::new_v4().to_string();
let _ = fs.open_file().read(true).open(&format!("/tmp/{path}"));
}
#[test]
fn test_client_stat() {
let _ = env_logger::try_init();
let fs = ClientBuilder::new("default")
.connect()
.expect("init success");
debug!("Client: {:?}", fs);
let path = uuid::Uuid::new_v4().to_string();
let f = fs.metadata(&format!("/tmp/{path}"));
assert!(f.is_err());
assert_eq!(f.unwrap_err().kind(), io::ErrorKind::NotFound);
}
#[test]
fn test_client_readdir() {
let _ = env_logger::try_init();
let fs = ClientBuilder::new("default")
.connect()
.expect("init success");
debug!("Client: {:?}", fs);
let f = fs.read_dir("/tmp").expect("open file success");
debug!("Metadata: {:?}", f);
assert!(f.len() > 0)
}
#[test]
fn test_client_mkdir() {
let _ = env_logger::try_init();
let fs = ClientBuilder::new("default")
.connect()
.expect("init success");
debug!("Client: {:?}", fs);
fs.create_dir("/tmp")
.expect("mkdir on exist dir should succeed");
}
}