use std::ffi::CString;
use std::io::{Error, Result};
use hdfs_sys::*;
use log::debug;
use crate::File;
#[derive(Debug, Clone)]
pub struct OpenOptions {
fs: hdfsFS,
read: bool,
write: bool,
append: bool,
truncate: bool,
create: bool,
create_new: bool,
}
unsafe impl Send for OpenOptions {}
unsafe impl Sync for OpenOptions {}
impl OpenOptions {
pub(crate) fn new(fs: hdfsFS) -> Self {
OpenOptions {
fs,
read: false,
write: false,
append: false,
truncate: false,
create: false,
create_new: false,
}
}
pub fn read(&mut self, read: bool) -> &mut Self {
self.read = read;
self
}
pub fn write(&mut self, write: bool) -> &mut Self {
self.write = write;
self
}
pub fn append(&mut self, append: bool) -> &mut Self {
self.append = append;
self
}
pub fn truncate(&mut self, truncate: bool) -> &mut Self {
self.truncate = truncate;
self
}
pub fn create(&mut self, create: bool) -> &mut Self {
self.create = create;
self
}
pub fn create_new(&mut self, create_new: bool) -> &mut Self {
self.create = create_new;
self
}
fn get_access_mode(&self) -> Result<libc::c_int> {
match (self.read, self.write, self.append) {
(true, false, false) => Ok(libc::O_RDONLY),
(false, true, false) => Ok(libc::O_WRONLY),
(true, true, false) => Ok(libc::O_RDWR),
(false, _, true) => Ok(libc::O_WRONLY | libc::O_APPEND),
(true, _, true) => Ok(libc::O_RDWR | libc::O_APPEND),
(false, false, false) => Err(Error::from_raw_os_error(libc::EINVAL)),
}
}
fn get_creation_mode(&self) -> Result<libc::c_int> {
match (self.write, self.append) {
(true, false) => {}
(false, false) => {
if self.truncate || self.create || self.create_new {
return Err(Error::from_raw_os_error(libc::EINVAL));
}
}
(_, true) => {
if self.truncate && !self.create_new {
return Err(Error::from_raw_os_error(libc::EINVAL));
}
}
}
Ok(match (self.create, self.truncate, self.create_new) {
(false, false, false) => 0,
(true, false, false) => libc::O_CREAT,
(false, true, false) => libc::O_TRUNC,
(true, true, false) => libc::O_CREAT | libc::O_TRUNC,
(_, _, true) => libc::O_CREAT | libc::O_EXCL,
})
}
pub fn open(&self, path: &str) -> Result<File> {
let flags = libc::O_CLOEXEC | self.get_access_mode()? | self.get_creation_mode()?;
debug!("open file {} with flags {}", path, flags);
let b = unsafe {
let p = CString::new(path)?;
hdfsOpenFile(self.fs, p.as_ptr(), flags, 0, 0, 0)
};
if b.is_null() {
return Err(Error::last_os_error());
}
debug!("file {} with flags {} opened", path, flags);
Ok(File::new(self.fs, b, path))
}
#[cfg(feature = "async_file")]
pub async fn async_open(&self, path: &str) -> Result<super::AsyncFile> {
let opt = self.clone();
let path = path.to_string();
let file = blocking::unblock(move || opt.open(&path)).await?;
Ok(super::AsyncFile::new(file, false))
}
}