use std::future::Future;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::BoxStream;
use tokio::runtime::Runtime;
use crate::acl::{AclEntry, AclStatus};
use crate::client::{self, ContentSummary, FileStatus, WriteOptions};
use crate::file::{FileReader as AsyncFileReader, FileWriter as AsyncFileWriter};
use crate::{Result, client::IORuntime};
fn io_error(error: crate::HdfsError) -> io::Error {
io::Error::other(error)
}
#[derive(Default)]
pub struct ClientBuilder {
inner: client::ClientBuilder,
}
impl ClientBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_url(mut self, url: impl Into<String>) -> Self {
self.inner = self.inner.with_url(url);
self
}
pub fn with_config(
mut self,
config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
self.inner = self.inner.with_config(config);
self
}
pub fn with_config_dir(mut self, config_dir: impl Into<String>) -> Self {
self.inner = self.inner.with_config_dir(config_dir);
self
}
pub fn with_user(mut self, user: impl Into<String>) -> Self {
self.inner = self.inner.with_user(user);
self
}
pub fn build(self) -> Result<Client> {
let rt = Arc::new(Runtime::new()?);
let inner = self
.inner
.with_io_runtime(IORuntime::from(rt.handle().clone()))
.build()?;
Ok(Client { inner, rt })
}
}
#[derive(Clone, Debug)]
pub struct Client {
inner: client::Client,
rt: Arc<Runtime>,
}
impl Client {
fn block_on<F: Future>(&self, future: F) -> F::Output {
self.rt.block_on(future)
}
pub fn get_file_info(&self, path: &str) -> Result<FileStatus> {
self.block_on(self.inner.get_file_info(path))
}
pub fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
self.block_on(self.inner.list_status(path, recursive))
}
pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
ListStatusIterator {
inner: self.inner.list_status_iter(path, recursive),
rt: Arc::clone(&self.rt),
}
}
pub fn read(&self, path: &str) -> Result<FileReader> {
Ok(FileReader {
inner: self.block_on(self.inner.read(path))?,
rt: Arc::clone(&self.rt),
})
}
pub fn create(&self, src: &str, write_options: impl AsRef<WriteOptions>) -> Result<FileWriter> {
Ok(FileWriter {
inner: self.block_on(self.inner.create(src, write_options))?,
rt: Arc::clone(&self.rt),
})
}
pub fn append(&self, src: &str) -> Result<FileWriter> {
Ok(FileWriter {
inner: self.block_on(self.inner.append(src))?,
rt: Arc::clone(&self.rt),
})
}
pub fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
self.block_on(self.inner.mkdirs(path, permission, create_parent))
}
pub fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
self.block_on(self.inner.rename(src, dst, overwrite))
}
pub fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
self.block_on(self.inner.delete(path, recursive))
}
pub fn trash(&self, path: &str) -> Result<Option<String>> {
self.block_on(self.inner.trash(path))
}
pub fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
self.block_on(self.inner.set_times(path, mtime, atime))
}
pub fn set_owner(&self, path: &str, owner: Option<&str>, group: Option<&str>) -> Result<()> {
self.block_on(self.inner.set_owner(path, owner, group))
}
pub fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
self.block_on(self.inner.set_permission(path, permission))
}
pub fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
self.block_on(self.inner.set_replication(path, replication))
}
pub fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
self.block_on(self.inner.get_content_summary(path))
}
pub fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
self.block_on(self.inner.modify_acl_entries(path, acl_spec))
}
pub fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
self.block_on(self.inner.remove_acl_entries(path, acl_spec))
}
pub fn remove_default_acl(&self, path: &str) -> Result<()> {
self.block_on(self.inner.remove_default_acl(path))
}
pub fn remove_acl(&self, path: &str) -> Result<()> {
self.block_on(self.inner.remove_acl(path))
}
pub fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
self.block_on(self.inner.set_acl(path, acl_spec))
}
pub fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
self.block_on(self.inner.get_acl_status(path))
}
pub fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
self.block_on(self.inner.glob_status(pattern))
}
}
impl Default for Client {
fn default() -> Self {
ClientBuilder::new()
.build()
.expect("Failed to create default client")
}
}
pub struct ListStatusIterator {
inner: client::ListStatusIterator,
rt: Arc<Runtime>,
}
impl Iterator for ListStatusIterator {
type Item = Result<FileStatus>;
fn next(&mut self) -> Option<Self::Item> {
self.rt.block_on(self.inner.next())
}
}
pub struct FileReader {
inner: AsyncFileReader,
rt: Arc<Runtime>,
}
impl FileReader {
pub fn file_length(&self) -> usize {
self.inner.file_length()
}
pub fn remaining(&self) -> usize {
self.inner.remaining()
}
pub fn set_position(&mut self, pos: usize) {
self.inner.set_position(pos);
}
pub fn tell(&self) -> usize {
self.inner.tell()
}
pub fn read_bytes(&mut self, len: usize) -> Result<Bytes> {
self.rt.block_on(self.inner.read_bytes(len))
}
pub fn read_into(&mut self, buf: &mut [u8]) -> Result<usize> {
self.rt.block_on(self.inner.read_into(buf))
}
pub fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
self.rt.block_on(self.inner.read_range(offset, len))
}
pub fn read_range_buf(&self, buf: &mut [u8], offset: usize) -> Result<()> {
self.rt.block_on(self.inner.read_range_buf(buf, offset))
}
pub fn read_range_stream(&self, offset: usize, len: usize) -> FileReadStream {
FileReadStream {
inner: Mutex::new(self.inner.read_range_stream(offset, len).boxed()),
rt: Arc::clone(&self.rt),
}
}
}
impl Read for FileReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.read_into(buf).map_err(io_error)
}
}
impl Seek for FileReader {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
let file_length = self.file_length() as i128;
let current = self.tell() as i128;
let new_pos = match pos {
SeekFrom::Start(pos) => i128::from(pos),
SeekFrom::End(offset) => file_length + i128::from(offset),
SeekFrom::Current(offset) => current + i128::from(offset),
};
if new_pos < 0 || new_pos > file_length {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"cannot seek outside of file bounds",
));
}
self.inner.set_position(new_pos as usize);
Ok(new_pos as u64)
}
}
pub struct FileReadStream {
inner: Mutex<BoxStream<'static, Result<Bytes>>>,
rt: Arc<Runtime>,
}
impl Iterator for FileReadStream {
type Item = Result<Bytes>;
fn next(&mut self) -> Option<Self::Item> {
self.rt.block_on(self.inner.lock().unwrap().next())
}
}
pub struct FileWriter {
inner: AsyncFileWriter,
rt: Arc<Runtime>,
}
impl FileWriter {
pub fn write_bytes(&mut self, buf: Bytes) -> Result<usize> {
self.rt.block_on(self.inner.write_bytes(buf))
}
pub fn close(&mut self) -> Result<()> {
self.rt.block_on(self.inner.close())
}
}
impl Write for FileWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_bytes(Bytes::copy_from_slice(buf))
.map_err(io_error)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}