use std::{
io::{self, BufReader, BufWriter, Read, Write},
net::{TcpStream, ToSocketAddrs},
path::Path,
sync::Arc,
};
use crate::{hrpc::HRpc, HDFSError};
use hdfs_types::hdfs::DatanodeIdProto;
const CLIENT_NAME: &str = "hdfs-rust-client";
mod writer;
pub use writer::{FileWriter, WriterOptions};
mod reader;
pub use reader::{FileReader, ReaderOptions};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct FSConfig {
pub name_node: String,
pub port: u16,
pub user: String,
}
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub real_user: Option<String>,
pub effective_user: Option<String>,
pub name_node: Vec<String>,
pub connection_timeout: u64,
pub use_hostname: bool,
pub write_buf_size: usize,
pub read_buf_size: usize,
pub tcp_keepalived: Option<u64>,
pub no_delay: bool,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
real_user: Default::default(),
effective_user: Default::default(),
name_node: vec!["127.0.0.1:9000".into()],
use_hostname: true,
write_buf_size: 8192,
read_buf_size: 8192,
connection_timeout: 30,
tcp_keepalived: Some(30),
no_delay: true,
}
}
}
pub struct BufStream<S: Read + Write>(pub BufReader<Wrapped<S>>);
impl<S: Read + Write> BufStream<S> {
pub fn new(stream: S) -> Self {
Self(BufReader::new(Wrapped(BufWriter::new(stream))))
}
pub fn with(stream: S, read_buf: usize, write_buf: usize) -> Self {
Self(BufReader::with_capacity(
read_buf,
Wrapped(BufWriter::with_capacity(write_buf, stream)),
))
}
}
pub struct Wrapped<S: Read + Write>(pub BufWriter<S>);
impl<S: Read + Write> Read for Wrapped<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.get_mut().read(buf)
}
}
impl<S: Read + Write> Write for Wrapped<S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl<S: Read + Write> Read for BufStream<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl<S: Read + Write> Write for BufStream<S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.get_mut().write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.get_mut().flush()
}
}
#[derive(Debug, Clone, Copy)]
pub enum IOType {
Read,
Write,
Append,
}
pub struct HDFS<S: Read + Write, D: Read + Write> {
client_name: String,
ipc: HRpc<S>,
create_ipc: Box<dyn Fn() -> io::Result<HRpc<S>>>,
connect_data_node: Arc<dyn Fn(&DatanodeIdProto, IOType) -> io::Result<D> + 'static>,
}
pub trait ToNameNodes {
fn to_name_nodes(self) -> Vec<String>;
}
impl ToNameNodes for &str {
fn to_name_nodes(self) -> Vec<String> {
vec![self.to_string()]
}
}
impl ToNameNodes for Vec<String> {
fn to_name_nodes(self) -> Vec<String> {
self
}
}
impl<S: ToString> ToNameNodes for &[S] {
fn to_name_nodes(self) -> Vec<String> {
self.iter().map(|s| s.to_string()).collect()
}
}
impl<A: ToString, B: ToString> ToNameNodes for (A, B) {
fn to_name_nodes(self) -> Vec<String> {
vec![self.0.to_string(), self.1.to_string()]
}
}
impl HDFS<BufStream<TcpStream>, BufStream<TcpStream>> {
pub fn connect<S: ToString>(
name_node: impl ToNameNodes,
user: impl Into<Option<S>>,
) -> io::Result<Self> {
let config = ClientConfig {
name_node: name_node.to_name_nodes(),
effective_user: user.into().map(|s| s.to_string()),
..Default::default()
};
Self::connect_with(config)
}
pub fn connect_with(config: ClientConfig) -> io::Result<Self> {
let ClientConfig {
real_user,
effective_user,
name_node,
connection_timeout,
use_hostname,
write_buf_size,
read_buf_size,
tcp_keepalived,
no_delay,
} = config;
let timeout = std::time::Duration::from_secs(connection_timeout);
HDFS::new(
move || {
let stream = name_node
.iter()
.filter_map(|addr| addr.to_socket_addrs().ok())
.flatten()
.find_map(|addr| {
tracing::debug!(message="connect name node", addr=?addr);
TcpStream::connect_timeout(&addr, timeout).ok()
});
let stream = stream.ok_or(io::Error::new(
io::ErrorKind::Other,
"no available name node",
))?;
let sk_ref = socket2::SockRef::from(&stream);
if let Some(keep) = tcp_keepalived {
let keepalive = socket2::TcpKeepalive::new()
.with_time(std::time::Duration::from_secs(keep));
sk_ref.set_tcp_keepalive(&keepalive)?;
}
sk_ref.set_nodelay(no_delay)?;
let stream = BufStream::with(stream, write_buf_size, read_buf_size);
let ipc = HRpc::connect(
stream,
effective_user.clone(),
real_user.clone(),
None,
None,
)?;
Ok(ipc)
},
move |datanode, _| {
let mut addrs = if use_hostname {
tracing::debug!(
message = "connect data node",
hostname = datanode.host_name,
port = datanode.xfer_port
);
((datanode.host_name.as_str(), datanode.xfer_port as u16))
.to_socket_addrs()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid address"))?
} else {
tracing::debug!(
message = "connect data node",
ip = datanode.ip_addr,
port = datanode.xfer_port
);
((datanode.ip_addr.as_str(), datanode.xfer_port as u16))
.to_socket_addrs()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid ip addr"))?
};
let addr = addrs
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid ip address"))?;
let stream = TcpStream::connect_timeout(&addr, timeout)?;
let sk_ref = socket2::SockRef::from(&stream);
if let Some(keep) = tcp_keepalived {
let keepalive = socket2::TcpKeepalive::new()
.with_time(std::time::Duration::from_secs(keep));
sk_ref.set_tcp_keepalive(&keepalive)?;
}
sk_ref.set_nodelay(no_delay)?;
let stream = BufStream::with(stream, write_buf_size, read_buf_size);
Ok(stream)
},
)
}
}
impl<S: Read + Write, D: Read + Write> HDFS<S, D> {
pub fn new(
create_ipc: impl Fn() -> io::Result<HRpc<S>> + 'static,
connect_datanode: impl Fn(&DatanodeIdProto, IOType) -> io::Result<D> + 'static,
) -> io::Result<Self> {
let client_name = format!("{}_{}", CLIENT_NAME, uuid::Uuid::new_v4());
let ipc = create_ipc()?;
Ok(Self {
client_name,
ipc,
create_ipc: Box::new(create_ipc),
connect_data_node: Arc::new(connect_datanode),
})
}
pub fn get_rpc(&mut self) -> &mut HRpc<S> {
&mut self.ipc
}
pub fn new_rpc(&mut self) -> Result<HRpc<S>, io::Error> {
(self.create_ipc)()
}
pub fn open(&mut self, path: impl AsRef<Path>) -> Result<FileReader<D>, HDFSError> {
ReaderOptions::default().open(path, self)
}
pub fn reader_options(&mut self) -> ReaderOptions {
ReaderOptions::default()
}
pub fn create(&mut self, path: impl AsRef<Path>) -> Result<FileWriter<S, D>, HDFSError> {
WriterOptions::default().create(path, self)
}
pub fn append(&mut self, path: impl AsRef<Path>) -> Result<FileWriter<S, D>, HDFSError> {
WriterOptions::default().append(path, self)
}
pub fn writer_options(&mut self) -> WriterOptions {
WriterOptions::default()
}
pub fn create_dir(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
let req = hdfs_types::hdfs::MkdirsRequestProto {
src: path.as_ref().to_string_lossy().to_string(),
masked: hdfs_types::hdfs::FsPermissionProto { perm: 0o666 },
create_parent: false,
unmasked: None,
};
let (_, resp) = self.ipc.mkdirs(req)?;
assert!(resp.result);
Ok(())
}
pub fn create_dir_all(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
let req = hdfs_types::hdfs::MkdirsRequestProto {
src: path.as_ref().to_string_lossy().to_string(),
masked: hdfs_types::hdfs::FsPermissionProto { perm: 0o666 },
create_parent: true,
unmasked: None,
};
let (_, resp) = self.ipc.mkdirs(req)?;
assert!(resp.result);
Ok(())
}
pub fn read(&mut self, path: impl AsRef<Path>) -> Result<Vec<u8>, HDFSError> {
let mut fd = self.open(path)?;
let mut buf = vec![0; fd.metadata().length as usize];
fd.read_to_end(&mut buf)?;
Ok(buf)
}
pub fn remote_dir(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
let req = hdfs_types::hdfs::DeleteRequestProto {
src: path.as_ref().to_string_lossy().to_string(),
recursive: false,
};
let (_, resp) = self.ipc.delete(req)?;
assert!(resp.result);
Ok(())
}
pub fn remote_dir_all(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
let req = hdfs_types::hdfs::DeleteRequestProto {
src: path.as_ref().to_string_lossy().to_string(),
recursive: true,
};
let (_, resp) = self.ipc.delete(req)?;
assert!(resp.result);
Ok(())
}
pub fn remove_file(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
let req = hdfs_types::hdfs::DeleteRequestProto {
src: path.as_ref().to_string_lossy().to_string(),
recursive: false,
};
let (_, resp) = self.ipc.delete(req)?;
assert!(resp.result);
Ok(())
}
pub fn rename(
&mut self,
from: impl AsRef<Path>,
to: impl AsRef<Path>,
) -> Result<(), HDFSError> {
let req = hdfs_types::hdfs::Rename2RequestProto {
src: from.as_ref().to_string_lossy().to_string(),
dst: to.as_ref().to_string_lossy().to_string(),
overwrite_dest: true,
move_to_trash: Some(true),
};
self.ipc.rename2(req)?;
Ok(())
}
}