use std::io::{Read, Write, Seek, SeekFrom, Result as IoResult, Error as IoError, ErrorKind as IoErrorKind};
use std::convert::TryInto;
use std::time::Duration;
use std::cell::RefCell;
use std::rc::Rc;
use http::Uri;
use tokio::runtime::{Builder, Runtime};
use futures::{Future, Stream, stream::StreamExt};
use bytes::Bytes;
use crate::error::*;
use crate::datatypes::*;
use crate::async_client::*;
use crate::natmap::NatMap;
use crate::https::HttpsSettings;
pub use crate::op::*;
#[inline]
fn single_threaded_runtime() -> Result<Runtime> { Ok(Builder::new_current_thread().enable_all().build()?) }
#[derive(Clone)]
pub struct SyncHdfsClient {
acx: Rc<HdfsClient>,
rt: Rc<RefCell<Runtime>>,
fostate: FOState
}
pub struct SyncHdfsClientBuilder {
a: HdfsClientBuilder
}
impl SyncHdfsClientBuilder {
pub fn new(entrypoint: Uri) -> Self {
Self { a: HdfsClientBuilder::new(entrypoint) }
}
pub fn from_config() -> Self {
Self { a: HdfsClientBuilder::from_config() }
}
pub fn from_config_opt() -> Option<Self> {
HdfsClientBuilder::from_config_opt().map(|a| Self { a })
}
pub fn alt_entrypoint(self, alt_entrypoint: Uri) -> Self {
Self { a: self.a.alt_entrypoint(alt_entrypoint), ..self }
}
pub fn https_settings(self, https_settings: HttpsSettings) -> Self {
Self { a: self.a.https_settings(https_settings), ..self }
}
pub fn natmap(self, natmap: NatMap) -> Self {
Self { a: self.a.natmap(natmap), ..self }
}
pub fn default_timeout(self, timeout: Duration) -> Self {
Self { a: self.a.default_timeout(timeout), ..self }
}
pub fn user_name(self, user_name: String) -> Self {
Self { a: self.a.user_name(user_name), ..self }
}
pub fn doas(self, doas: String) -> Self {
Self { a: self.a.doas(doas), ..self }
}
pub fn delegation_token(self, dt: String) -> Self {
Self { a: self.a.delegation_token(dt), ..self }
}
pub fn build(self) -> Result<SyncHdfsClient> {
Ok(SyncHdfsClient {
acx: Rc::new(self.a.build()),
rt: Rc::new(RefCell::new(single_threaded_runtime()?)),
fostate: FOState::PRIMARY
})
}
}
impl SyncHdfsClient {
pub fn from_async(acx: HdfsClient)-> Result<Self> {
Ok(Self {
acx: Rc::new(acx),
rt: Rc::new(RefCell::new(single_threaded_runtime()?)),
fostate: FOState::PRIMARY
})
}
pub fn fostate(&self) -> FOState { self.fostate }
pub fn with_fostate(self, fostate: FOState) -> Self { Self { fostate, ..self } }
#[inline]
fn exec<R, E>(&self, f: impl Future<Output=FOStdResult<R, E>>) -> FOStdResult<R, E> where E: From<tokio::time::error::Elapsed>{
async fn with_timeout<R, E>(f: impl Future<Output=FOStdResult<R, E>>, fostate: FOState, timeout: Duration)
-> FOStdResult<R, E>
where E: From<tokio::time::error::Elapsed> {
Ok(tokio::time::timeout(timeout, f).await.map_err(|e| (e.into(), fostate))??)
}
self.rt.borrow_mut().block_on(with_timeout(f, self.fostate, self.acx.default_timeout().clone()))
}
#[inline]
fn exec0<R>(&self, f: impl Future<Output=R>) -> Result<R> {
async fn with_timeout<R>(f: impl Future<Output=R>, timeout: Duration) -> Result<R> {
Ok(tokio::time::timeout(timeout, f).await?)
}
self.rt.borrow_mut().block_on(with_timeout(f, self.acx.default_timeout().clone()))
}
#[inline]
fn foresult<T, E>(&mut self, r: FOStdResult<T, E>) -> StdResult<T, E> {
let (r, fostate) = FOR::split(r);
self.fostate = fostate;
r
}
pub fn open(&mut self, path: &str, open_options: OpenOptions) -> Result<Box<dyn Stream<Item=Result<Bytes>>+Unpin>> {
let fs = self.acx.open(self.fostate, path, open_options);
let r = self.exec0(fs)?;
self.foresult(r)
}
pub fn append(&mut self, path: &str, data: Data, append_options: AppendOptions) -> DResult<()> {
let f = self.acx.append(self.fostate, path, data, append_options);
let r = self.exec(f);
self.foresult(r)
}
pub fn create(&mut self, path: &str, data: Data, opts: CreateOptions) -> DResult<()> {
let f = self.acx.create(self.fostate, path, data, opts);
let r = self.exec(f);
self.foresult(r)
}
fn save_stream<W: Write>(&self, input: impl Stream<Item=Result<Bytes>>, output: &mut W) -> Result<()> {
fn write_bytes<W: Write>(b: &Bytes, w: &mut W) -> Result<()> {
if w.write(&b)? != b.len() {
Err(app_error!(generic "Short write"))
} else {
Ok(())
}
}
let mut input = Box::pin(input);
loop {
let f = input.into_future();
let (ob, input2) = self.exec0(f)?;
match ob {
Some(Ok(bytes)) => write_bytes(&bytes, output)?,
Some(Err(e)) => break Err(e),
None => break Ok(())
}
input = input2;
}
}
#[inline]
pub fn get_file<W: Write>(&mut self, input: &str, output: &mut W) -> Result<()> {
let s = self.open(input, OpenOptions::new())?;
self.save_stream(s, output)
}
pub fn dir(&mut self, path: &str) -> Result<ListStatusResponse> {
let r = self.acx.dir(self.fostate, path);
let r = self.exec(r);
self.foresult(r)
}
pub fn stat(&mut self, path: &str) -> Result<FileStatusResponse> {
let r = self.acx.stat(self.fostate, path);
let r = self.exec(r);
self.foresult(r)
}
pub fn concat(&mut self, path: &str, paths: Vec<String>) -> Result<()> {
let r = self.acx.concat(self.fostate, path, paths);
let r = self.exec(r);
self.foresult(r)
}
pub fn mkdirs(&mut self, path: &str, opts: MkdirsOptions) -> Result<bool> {
let r = self.acx.mkdirs(self.fostate, path, opts);
let r = self.exec(r);
self.foresult(r)
}
pub fn rename(&mut self, path: &str, destination: String) -> Result<bool> {
let r = self.acx.rename(self.fostate, path, destination);
let r = self.exec(r);
self.foresult(r)
}
pub fn create_symlink(&mut self, path: &str, destination: String, opts: CreateSymlinkOptions) -> Result<()> {
let r = self.acx.create_symlink(self.fostate, path, destination, opts);
let r = self.exec(r);
self.foresult(r)
}
pub fn delete(&mut self, path: &str, opts: DeleteOptions) -> Result<bool> {
let r = self.acx.delete(self.fostate, path, opts);
let r = self.exec(r);
self.foresult(r)
}
}
pub struct ReadHdfsFile {
cx: SyncHdfsClient,
path: String,
len: i64,
pos: i64
}
impl ReadHdfsFile {
pub fn open(mut cx: SyncHdfsClient, path: String) -> Result<ReadHdfsFile> {
let stat = cx.stat(&&path)?;
Ok(Self::new(cx, path, stat.file_status.length, 0))
}
fn new(cx: SyncHdfsClient, path: String, len: i64, pos: i64) -> Self {
Self { cx, path, len, pos }
}
pub fn len(&self) -> u64 { self.len as u64 }
pub fn into_parts(self) -> (SyncHdfsClient, String, (i64, i64)) { (self.cx, self.path, (self.pos, self.len)) }
}
impl Read for ReadHdfsFile {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
if self.pos == self.len {
return Ok(0);
}
let buf_len: i64 = buf.len().try_into().map_err(|_| IoError::new(IoErrorKind::InvalidInput, "buffer too big"))?;
let s = self.cx.open(&self.path, OpenOptions::new().offset(self.pos).length(buf_len))?;
let mut pos: usize = 0;
let mut s = Box::pin(s);
loop {
let f = s.into_future();
match self.cx.exec0(f)? {
(Some(Ok(chunk)), s1) => {
s = s1;
self.pos += chunk.len() as i64;
let bcount = (&mut buf[pos..]).write(&chunk)?;
pos += bcount;
}
(Some(Err(e)), _) => {
break Err(e.into())
}
(None, _) => {
break Ok(pos)
}
}
}
}
}
impl Seek for ReadHdfsFile {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
fn offset(pos: i64, offset: i64, len: i64) -> IoResult<i64> {
match pos.checked_add(offset) {
Some(p) if p < 0 => Err(IoError::new(IoErrorKind::InvalidInput, "attempt to seek before start")),
Some(p) if p <= len => Ok(p),
_ => Ok(pos)
}
}
self.pos = match pos {
SeekFrom::Current(0) => Ok(self.pos),
SeekFrom::Current(o) => offset(self.pos, o, self.len),
SeekFrom::Start(0) => Ok(0),
SeekFrom::Start(o) => offset(0, o.try_into().map_err(|_| IoError::new(IoErrorKind::InvalidInput, "offset too big"))?, self.len),
SeekFrom::End(0) => Ok(self.len),
SeekFrom::End(o) => offset(self.len, o, self.len),
}?;
Ok(self.pos as u64)
}
}
pub struct WriteHdfsFile {
cx: SyncHdfsClient,
path: String,
opts: AppendOptions
}
impl WriteHdfsFile {
pub fn create(mut cx: SyncHdfsClient, path: String, c_opts: CreateOptions, a_opts: AppendOptions) -> Result<WriteHdfsFile> {
cx.create(&path, crate::rest_client::data_empty(), c_opts).map_err(ErrorD::drop)?;
Ok(Self { cx, path, opts: a_opts })
}
pub fn append(cx: SyncHdfsClient, path: String, opts: AppendOptions) -> Result<WriteHdfsFile> {
Ok(Self { cx, path, opts })
}
pub fn into_parts(self) -> (SyncHdfsClient, String) { (self.cx, self.path) }
#[cfg(feature = "zero-copy-on-write")]
fn do_write(&mut self, buf: &[u8]) -> DResult<()> {
let b: & 'static [u8] = unsafe { std::mem::transmute(buf) };
self.cx.append(&self.path, crate::rest_client::data_borrowed(b), self.opts.clone())
}
#[cfg(not(feature = "zero-copy-on-write"))]
fn do_write(&mut self, buf: &[u8]) -> DResult<()> {
let b = buf.to_owned();
self.cx.append(&self.path, crate::rest_client::data_owned(b), self.opts.clone())
}
}
impl Write for WriteHdfsFile {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
let () = self.do_write(buf).map_err(ErrorD::drop)?;
Ok(buf.len())
}
fn flush(&mut self) -> IoResult<()> {
Ok(())
}
}