use std::sync::Arc;
use std::collections::VecDeque;
use std::io::{Read, Write, Result as IoResult, Seek, SeekFrom};
use std::fmt::{self, Debug};
use std::time::SystemTime;
use std::path::{Path, PathBuf};
use std::cmp::min;
use error::{Error, Result};
use base::Time;
use base::lru::{Lru, CountMeter, PinChecker};
use trans::{Eid, Id, CloneNew, Txid, TxMgrRef};
use trans::cow::{Cow, CowRef, IntoCow, CowWeakRef, CowCache};
use volume::{VolumeRef, Persistable};
use content::{StoreRef, Writer as StoreWriter, Content, ContentRef,
ContentReader, ChunkMap};
use super::Handle;
const SUB_NODES_CNT: usize = 8;
#[derive(Debug, Copy, Clone, PartialEq, Deserialize, Serialize)]
pub enum FileType {
File,
Dir,
}
impl FileType {
pub fn is_file(&self) -> bool {
*self == FileType::File
}
pub fn is_dir(&self) -> bool {
*self == FileType::Dir
}
}
impl Default for FileType {
fn default() -> Self {
FileType::File
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
struct ChildEntry {
id: Eid,
ftype: FileType,
name: String,
}
impl ChildEntry {
fn new(id: &Eid, ftype: FileType, name: &str) -> Self {
ChildEntry {
id: id.clone(),
ftype,
name: name.to_string(),
}
}
}
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct Version {
num: usize, content_id: Eid, len: usize,
ctime: Time,
}
impl Version {
fn new(num: usize, content_id: &Eid, len: usize) -> Self {
Version {
num,
content_id: content_id.clone(),
len,
ctime: Time::now(),
}
}
pub fn num(&self) -> usize {
self.num
}
pub fn len(&self) -> usize {
self.len
}
pub fn created(&self) -> SystemTime {
self.ctime.to_system_time()
}
}
#[derive(Debug, Copy, Clone)]
pub struct Metadata {
ftype: FileType,
len: usize,
curr_version: usize,
ctime: Time,
mtime: Time,
}
impl Metadata {
pub fn file_type(&self) -> FileType {
self.ftype
}
pub fn is_dir(&self) -> bool {
self.ftype == FileType::Dir
}
pub fn is_file(&self) -> bool {
self.ftype == FileType::File
}
pub fn len(&self) -> usize {
self.len
}
pub fn curr_version(&self) -> usize {
self.curr_version
}
pub fn created(&self) -> SystemTime {
self.ctime.to_system_time()
}
pub fn modified(&self) -> SystemTime {
self.mtime.to_system_time()
}
}
#[derive(Debug)]
pub struct DirEntry {
path: PathBuf,
metadata: Metadata,
name: String,
}
impl DirEntry {
pub fn path(&self) -> &Path {
self.path.as_path()
}
pub fn metadata(&self) -> Metadata {
self.metadata
}
pub fn file_type(&self) -> FileType {
self.metadata.file_type()
}
pub fn file_name(&self) -> &str {
&self.name
}
}
type SubNodes = Lru<
String,
FnodeWeakRef,
CountMeter<FnodeWeakRef>,
PinChecker<FnodeWeakRef>,
>;
#[derive(Default, Clone, Deserialize, Serialize)]
pub struct Fnode {
id: Eid,
ftype: FileType,
version_limit: u8,
ctime: Time,
mtime: Time,
kids: Vec<ChildEntry>,
vers: VecDeque<Version>,
chk_map: ChunkMap,
#[serde(skip_serializing, skip_deserializing, default)]
parent: Option<FnodeRef>,
#[serde(skip_serializing, skip_deserializing,
default = "Fnode::default_sub_nodes")]
sub_nodes: SubNodes,
#[serde(skip_serializing, skip_deserializing, default)]
store: StoreRef,
}
impl Fnode {
pub const DEFAULT_VERSION_LIMIT: u8 = 10;
pub fn new(ftype: FileType, version_limit: u8, store: &StoreRef) -> Self {
let version_limit = match ftype {
FileType::File => version_limit,
FileType::Dir => 0,
};
Fnode {
id: Eid::new(),
ftype,
version_limit,
ctime: Time::now(),
mtime: Time::now(),
kids: Vec::new(),
vers: VecDeque::new(),
chk_map: ChunkMap::new(),
parent: None,
sub_nodes: Self::default_sub_nodes(),
store: store.clone(),
}
}
pub fn new_under(
parent: &FnodeRef,
name: &str,
ftype: FileType,
version_limit: u8,
txmgr: &TxMgrRef,
) -> Result<FnodeRef> {
let kid = {
let mut pfnode_cow = parent.write().unwrap();
let pfnode = pfnode_cow.make_mut()?;
if !pfnode.is_dir() {
return Err(Error::NotDir);
}
let mut kid = Fnode::new(ftype, version_limit, &pfnode.store);
if ftype == FileType::File {
kid.add_ver(Content::new().into_cow(&txmgr)?)?;
}
kid.into_cow(txmgr)?
};
Fnode::add_child(parent, &kid, name)?;
Ok(kid)
}
fn default_sub_nodes() -> SubNodes {
Lru::new(SUB_NODES_CNT)
}
pub fn is_file(&self) -> bool {
self.ftype == FileType::File
}
pub fn is_dir(&self) -> bool {
self.ftype == FileType::Dir
}
pub fn is_root(&self) -> bool {
self.parent.is_none()
}
pub fn metadata(&self) -> Metadata {
Metadata {
ftype: self.ftype,
len: match self.ftype {
FileType::File => self.curr_ver().len,
FileType::Dir => 0,
},
curr_version: self.curr_ver_num(),
ctime: self.ctime,
mtime: self.mtime,
}
}
pub fn curr_len(&self) -> usize {
match self.ftype {
FileType::File => self.curr_ver().len,
FileType::Dir => 0,
}
}
pub fn history(&self) -> Vec<Version> {
Vec::from(self.vers.clone())
}
pub fn load_root(
root_id: &Eid,
txmgr: &TxMgrRef,
store: &StoreRef,
vol: &VolumeRef,
) -> Result<FnodeRef> {
let root = Cow::<Fnode>::load_cow(root_id, txmgr, vol)?;
{
let mut root_cow = root.write().unwrap();
let root = root_cow.make_mut_naive()?;
root.store = store.clone();
}
Ok(root)
}
fn load_child(
&mut self,
name: &str,
self_ref: FnodeRef,
cache: &Cache,
vol: &VolumeRef,
) -> Result<FnodeRef> {
if let Some(fnode) = self.sub_nodes.get_refresh(name).and_then(
|sub| {
sub.upgrade()
},
)
{
return Ok(fnode);
}
self.kids
.iter()
.find(|ref c| c.name == name)
.ok_or(Error::NotFound)
.and_then(|child| {
cache.get(&child.id, vol).map_err(|e| Error::from(e))
})
.and_then(|child| {
{
let mut child_cow = child.write().unwrap();
let c = child_cow.make_mut_naive()?;
c.parent = Some(self_ref);
c.store = self.store.clone();
}
self.sub_nodes.insert(
name.to_string(),
Arc::downgrade(&child),
);
Ok(child)
})
}
pub fn has_child(&self, name: &str) -> bool {
self.kids.iter().position(|ref c| c.name == name).is_some()
}
pub fn children_cnt(&self) -> usize {
self.kids.len()
}
pub fn child(
parent: FnodeRef,
name: &str,
cache: &Cache,
vol: &VolumeRef,
) -> Result<FnodeRef> {
let mut par = parent.write().unwrap();
par.make_mut_naive()?.load_child(
name,
parent.clone(),
cache,
vol,
)
}
fn children_names(&self) -> Vec<String> {
self.kids.iter().map(|ref k| k.name.clone()).collect()
}
pub fn read_dir(
parent: FnodeRef,
path: &Path,
cache: &Cache,
vol: &VolumeRef,
) -> Result<Vec<DirEntry>> {
let mut par = parent.write().unwrap();
let par = par.make_mut_naive()?;
if !par.is_dir() {
return Err(Error::NotDir);
}
let parent_path = {
#[cfg(windows)]
{
let mut path_str = path.to_str().unwrap().to_string();
if !path_str.ends_with("/") {
path_str.push_str("/");
}
PathBuf::from(path_str)
}
#[cfg(unix)]
{
path
}
};
let mut ret = Vec::new();
let child_names = par.children_names();
for name in child_names.iter() {
let child_ref = par.load_child(&name, parent.clone(), cache, vol)?;
let child = child_ref.read().unwrap();
ret.push(DirEntry {
path: parent_path.join(name),
metadata: child.metadata(),
name: name.clone(),
});
}
Ok(ret)
}
pub fn add_child(
parent: &FnodeRef,
child: &FnodeRef,
name: &str,
) -> Result<()> {
let mut parent_cow = parent.write().unwrap();
let par = parent_cow.make_mut()?;
let mut kid = child.write().unwrap();
par.kids.push(ChildEntry::new(kid.id(), kid.ftype, name));
kid.make_mut()?.parent = Some(parent.clone());
par.sub_nodes.insert(
name.to_string(),
Arc::downgrade(child),
);
par.mtime = Time::now();
Ok(())
}
pub fn unlink(fnode: &FnodeRef) -> Result<()> {
let child = fnode.read().unwrap();
match child.parent {
Some(ref parent) => {
let mut par = parent.write().unwrap();
let par = par.make_mut()?;
let child_idx = par.kids
.iter()
.position(|ref c| c.id == *child.id())
.ok_or(Error::NotFound)?;
{
let name = &par.kids[child_idx].name;
par.sub_nodes.remove(name);
}
par.kids.remove(child_idx);
Ok(())
}
None => return Err(Error::IsRoot),
}
}
fn ver(&self, ver_num: usize) -> Option<&Version> {
self.vers.iter().find(|v| v.num == ver_num)
}
fn curr_ver(&self) -> &Version {
self.vers.back().unwrap()
}
pub fn curr_ver_num(&self) -> usize {
if self.vers.is_empty() {
return 0;
}
self.curr_ver().num
}
fn remove_ver(&mut self, ver_num: usize) -> Result<()> {
let idx = self.vers.iter().position(|v| v.num == ver_num).ok_or(
Error::NoVersion,
)?;
let ver = self.vers.remove(idx).unwrap();
if let Some(ctn) = {
let mut store = self.store.write().unwrap();
store.make_mut()?.deref_content(&ver.content_id)?
}
{
Content::unlink(&ctn, &mut self.chk_map, &self.store)?;
}
Ok(())
}
pub fn clear_vers(&mut self) -> Result<()> {
let ver_nums: Vec<usize> = self.vers.iter().map(|v| v.num).collect();
for ver_num in ver_nums {
self.remove_ver(ver_num)?;
}
Ok(())
}
fn add_ver(&mut self, content: ContentRef) -> Result<Option<ContentRef>> {
let is_deduped = {
let ctn = content.read().unwrap();
let mut store_cow = self.store.write().unwrap();
let store = store_cow.make_mut()?;
let deduped_id = store.dedup_content(ctn.id(), ctn.hash())?;
let ver =
Version::new(self.curr_ver_num() + 1, &deduped_id, ctn.len());
self.mtime = ver.ctime;
self.vers.push_back(ver);
deduped_id != *ctn.id()
};
if self.vers.len() > self.version_limit as usize {
let retire = self.vers.front().unwrap().num;
self.remove_ver(retire)?;
}
if is_deduped {
Ok(Some(content.clone()))
} else {
Ok(None)
}
}
pub fn version_reader(&self, ver_num: usize) -> Result<ContentReader> {
let ver = self.ver(ver_num).ok_or(Error::NoVersion)?;
let content = {
let st = self.store.read().unwrap();
st.get_content(&ver.content_id)?
};
Ok(ContentReader::new(&content, &self.store))
}
fn clone_current_content(&self, txmgr: &TxMgrRef) -> Result<ContentRef> {
let curr_ctn = {
let store = self.store.read().unwrap();
store.get_content(&self.curr_ver().content_id)?
};
let new_ctn = curr_ctn.read().unwrap();
new_ctn.clone_new().into_cow(txmgr)
}
pub fn set_len(handle: Handle, len: usize, txid: Txid) -> Result<()> {
let curr_len = {
let fnode = handle.fnode.read().unwrap();
fnode.curr_len()
};
if curr_len < len {
let mut size = len - curr_len;
let buf = vec![0u8; min(size, 16 * 1024)];
let mut wtr = Writer::new(handle.clone(), txid)?;
wtr.seek(SeekFrom::Start(curr_len as u64))?;
while size > 0 {
let write_len = min(size, buf.len());
let written = wtr.write(&buf[..write_len])?;
size -= written;
}
wtr.finish()?;
} else if curr_len > len {
let mut fnode_cow = handle.fnode.write().unwrap();
let ctn = {
let new_ctn = fnode_cow.clone_current_content(&handle.txmgr)?;
Content::truncate(&new_ctn, len, &handle.store)?;
new_ctn
};
Content::link(ctn.clone(), &handle.store)?;
let fnode = fnode_cow.make_mut()?;
if let Some(ctn) = fnode.add_ver(ctn)? {
Content::unlink(&ctn, &mut fnode.chk_map, &handle.store)?;
}
}
Ok(())
}
}
impl Debug for Fnode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Fnode")
.field("id", &self.id)
.field("ftype", &self.ftype)
.field("version_limit", &self.version_limit)
.field("ctime", &self.ctime)
.field("mtime", &self.mtime)
.field("kids", &self.kids)
.field("vers", &self.vers)
.field("sub_nodes", &self.sub_nodes)
.finish()
}
}
impl Id for Fnode {
#[inline]
fn id(&self) -> &Eid {
&self.id
}
#[inline]
fn id_mut(&mut self) -> &mut Eid {
&mut self.id
}
}
impl CloneNew for Fnode {}
impl<'de> IntoCow<'de> for Fnode {}
impl<'de> Persistable<'de> for Fnode {}
pub type FnodeRef = CowRef<Fnode>;
pub type FnodeWeakRef = CowWeakRef<Fnode>;
#[derive(Debug)]
pub struct Reader {
rdr: ContentReader,
}
impl Reader {
pub fn new(fnode: FnodeRef, ver: usize) -> Result<Self> {
let fnode = fnode.read().unwrap();
let rdr = fnode.version_reader(ver)?;
Ok(Reader { rdr })
}
pub fn new_current(fnode: FnodeRef) -> Result<Self> {
let fnode = fnode.read().unwrap();
let rdr = fnode.version_reader(fnode.curr_ver_num())?;
Ok(Reader { rdr })
}
}
impl Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
self.rdr.read(buf)
}
}
impl Seek for Reader {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
self.rdr.seek(pos)
}
}
#[derive(Debug)]
pub struct Writer {
inner: StoreWriter,
handle: Handle,
}
impl Writer {
pub fn new(handle: Handle, txid: Txid) -> Result<Self> {
let chk_map = {
let f = handle.fnode.read().unwrap();
f.chk_map.clone()
};
let inner =
StoreWriter::new(chk_map, &handle.txmgr, &handle.store, txid)?;
Ok(Writer { inner, handle })
}
pub fn finish(self) -> Result<()> {
let stg_ctn = self.inner.finish()?;
let handle = &self.handle;
let mut fnode_cow = handle.fnode.write().unwrap();
let ctn = {
let new_ctn = fnode_cow.clone_current_content(&handle.txmgr)?;
Content::replace(&new_ctn, &stg_ctn, &handle.store)?;
new_ctn
};
let fnode = fnode_cow.make_mut()?;
if let Some(ctn) = fnode.add_ver(ctn)? {
Content::unlink(&ctn, &mut fnode.chk_map, &handle.store)?;
}
Ok(())
}
}
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
self.inner.write(buf)
}
fn flush(&mut self) -> IoResult<()> {
self.inner.flush()
}
}
impl Seek for Writer {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
self.inner.seek(pos)
}
}
pub type Cache = CowCache<Fnode>;