use std::error::Error as StdError;
use std::io::{self, Read, Write, Error as IoError, ErrorKind, Seek, SeekFrom};
use std::fmt::{self, Debug};
use trans::{TxMgr, TxHandle};
use fs::Handle;
use fs::fnode::{Fnode, Version, Metadata, Reader as FnodeReader,
Writer as FnodeWriter};
use super::{Result, Error};
#[derive(Debug)]
pub struct VersionReader<'a> {
handle: &'a Handle,
rdr: FnodeReader,
}
impl<'a> VersionReader<'a> {
fn new(handle: &'a Handle, ver: usize) -> Result<Self> {
let rdr = FnodeReader::new(handle.fnode.clone(), ver)?;
Ok(VersionReader { handle, rdr })
}
}
impl<'a> Read for VersionReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.rdr.read(buf)
}
}
impl<'a> Seek for VersionReader<'a> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.rdr.seek(pos)
}
}
pub struct File {
handle: Handle,
pos: SeekFrom, rdr: Option<FnodeReader>,
wtr: Option<FnodeWriter>,
tx_handle: Option<TxHandle>,
can_read: bool,
can_write: bool,
}
impl File {
pub(super) fn new(
handle: Handle,
pos: SeekFrom,
can_read: bool,
can_write: bool,
) -> Self {
File {
handle,
pos,
rdr: None,
wtr: None,
tx_handle: None,
can_read,
can_write,
}
}
pub fn metadata(&self) -> Metadata {
let fnode = self.handle.fnode.read().unwrap();
fnode.metadata()
}
pub fn history(&self) -> Vec<Version> {
let fnode = self.handle.fnode.read().unwrap();
fnode.history()
}
pub fn curr_version(&self) -> usize {
let fnode = self.handle.fnode.read().unwrap();
fnode.curr_ver_num()
}
fn curr_len(&self) -> usize {
let fnode = self.handle.fnode.read().unwrap();
fnode.curr_len()
}
pub fn version_reader(&self, ver_num: usize) -> Result<VersionReader> {
if !self.can_read {
return Err(Error::CannotRead);
}
VersionReader::new(&self.handle, ver_num)
}
fn seek_pos(&self, pos: SeekFrom) -> SeekFrom {
let curr_len = self.curr_len();
let pos: i64 = match pos {
SeekFrom::Start(p) => p as i64,
SeekFrom::End(p) => curr_len as i64 + p,
SeekFrom::Current(p) => {
match self.pos {
SeekFrom::Start(q) => p + q as i64,
SeekFrom::End(q) => curr_len as i64 + p + q,
SeekFrom::Current(_) => unreachable!(),
}
}
};
SeekFrom::Start(pos as u64)
}
fn begin_write(&mut self) -> Result<()> {
if self.wtr.is_some() {
return Err(Error::NotFinish);
}
if !self.can_write {
return Err(Error::CannotWrite);
}
assert!(self.tx_handle.is_none());
let curr_len = self.curr_len();
match self.pos {
SeekFrom::Start(pos) => {
let pos = pos as usize;
if pos > curr_len {
self.set_len(pos)?;
self.pos = self.seek_pos(SeekFrom::End(0));
}
}
_ => unreachable!(),
}
let tx_handle = TxMgr::begin_trans(&self.handle.txmgr)?;
tx_handle.run(|| {
let mut wtr =
FnodeWriter::new(self.handle.clone(), tx_handle.txid)?;
wtr.seek(self.seek_pos(self.pos))?;
self.wtr = Some(wtr);
Ok(())
})?;
self.tx_handle = Some(tx_handle);
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
match self.wtr.take() {
Some(wtr) => {
let tx_handle = self.tx_handle.take().unwrap();
tx_handle.run(|| wtr.finish())?;
tx_handle.commit()?;
self.pos = SeekFrom::Start(0);
Ok(())
}
None => Err(Error::NotWrite),
}
}
pub fn write_once(&mut self, buf: &[u8]) -> Result<()> {
match self.wtr {
Some(_) => Err(Error::NotFinish),
None => {
self.begin_write()?;
match self.wtr {
Some(ref mut wtr) => {
match self.tx_handle {
Some(ref tx_handle) => {
tx_handle.run(|| {
wtr.write_all(buf)?;
Ok(())
})?;
}
None => unreachable!(),
}
}
None => unreachable!(),
}
self.finish()
}
}
}
pub fn set_len(&mut self, len: usize) -> Result<()> {
if self.wtr.is_some() {
return Err(Error::NotFinish);
}
if !self.can_write {
return Err(Error::CannotWrite);
}
let tx_handle = TxMgr::begin_trans(&self.handle.txmgr)?;
tx_handle.run_all(|| {
Fnode::set_len(self.handle.clone(), len, tx_handle.txid)
})?;
Ok(())
}
}
impl Read for File {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if !self.can_read {
return Err(IoError::new(
ErrorKind::Other,
Error::CannotRead.description(),
));
}
if self.rdr.is_none() {
let mut rdr = map_io_err!(
FnodeReader::new_current(self.handle.fnode.clone())
)?;
rdr.seek(self.pos)?;
self.rdr = Some(rdr);
}
match self.rdr {
Some(ref mut rdr) => rdr.read(buf),
None => unreachable!(),
}
}
}
impl Write for File {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.wtr.is_none() {
map_io_err!(self.begin_write())?;
}
match self.wtr {
Some(ref mut wtr) => {
match self.tx_handle {
Some(ref tx_handle) => {
let mut ret = 0;
map_io_err!(tx_handle.run(|| {
ret = wtr.write(buf)?;
Ok(())
}))?;
Ok(ret)
}
None => unreachable!(),
}
}
None => unreachable!(),
}
}
fn flush(&mut self) -> io::Result<()> {
match self.wtr {
Some(ref mut wtr) => {
match self.tx_handle {
Some(ref tx_handle) => {
map_io_err!(tx_handle.run(|| {
wtr.flush()?;
Ok(())
}))?;
Ok(())
}
None => unreachable!(),
}
}
None => Err(IoError::new(
ErrorKind::PermissionDenied,
Error::CannotWrite.description(),
)),
}
}
}
impl Seek for File {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
if self.wtr.is_some() {
return Err(IoError::new(
ErrorKind::Other,
Error::NotFinish.description(),
));
}
let mut sought = 0;
if let Some(ref mut rdr) = self.rdr {
sought = rdr.seek(pos)?;
}
if sought == 0 {
self.pos = self.seek_pos(pos);
} else {
self.pos = SeekFrom::Start(sought);
}
Ok(sought)
}
}
impl Debug for File {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("File")
.field("pos", &self.pos)
.field("rdr", &self.rdr)
.field("wtr", &self.wtr)
.field("can_read", &self.can_read)
.field("can_write", &self.can_write)
.finish()
}
}