#![allow(clippy::module_inception)]
use std::cmp::min;
use std::collections::VecDeque;
use std::fmt::{self, Debug};
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;
use super::{Handle, Options};
use base::lru::{CountMeter, Lru, PinChecker};
use base::Time;
use content::{
ChunkMap, Content, ContentReader, StoreRef, Writer as StoreWriter,
};
use error::{Error, Result};
use trans::cow::{Cow, CowCache, CowRef, CowWeakRef, Cowable, IntoCow};
use trans::{Eid, Id, TxMgrRef, Txid};
use volume::VolumeRef;
const SUB_NODES_CNT: usize = 8;
#[derive(Debug, Copy, Clone, PartialEq, Deserialize, Serialize)]
pub enum FileType {
File,
Dir,
}
impl FileType {
pub fn is_file(self) -> bool {
self == FileType::File
}
pub fn is_dir(self) -> bool {
self == FileType::Dir
}
}
impl Default for FileType {
fn default() -> Self {
FileType::File
}
}
impl Into<i32> for FileType {
fn into(self) -> i32 {
match self {
FileType::File => 0,
FileType::Dir => 1,
}
}
}
impl Into<String> for FileType {
fn into(self) -> String {
match self {
FileType::File => String::from("File"),
FileType::Dir => String::from("Dir"),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
struct ChildEntry {
id: Eid,
ftype: FileType,
name: String,
}
impl ChildEntry {
fn new(id: &Eid, ftype: FileType, name: &str) -> Self {
ChildEntry {
id: id.clone(),
ftype,
name: name.to_string(),
}
}
}
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
pub struct Version {
num: usize,
content_id: Eid,
content_len: usize,
ctime: Time,
}
impl Version {
fn new(num: usize, content_id: &Eid, len: usize) -> Self {
Version {
num,
content_id: content_id.clone(),
content_len: len,
ctime: Time::now(),
}
}
pub fn num(&self) -> usize {
self.num
}
pub fn content_len(&self) -> usize {
self.content_len
}
pub fn created_at(&self) -> SystemTime {
self.ctime.to_system_time()
}
}
#[derive(Debug, Copy, Clone)]
pub struct Metadata {
ftype: FileType,
content_len: usize,
curr_version: usize,
ctime: Time,
mtime: Time,
}
impl Metadata {
pub fn file_type(&self) -> FileType {
self.ftype
}
pub fn is_dir(&self) -> bool {
self.ftype == FileType::Dir
}
pub fn is_file(&self) -> bool {
self.ftype == FileType::File
}
pub fn content_len(&self) -> usize {
self.content_len
}
pub fn curr_version(&self) -> usize {
self.curr_version
}
pub fn created_at(&self) -> SystemTime {
self.ctime.to_system_time()
}
pub fn modified_at(&self) -> SystemTime {
self.mtime.to_system_time()
}
}
#[derive(Debug)]
pub struct DirEntry {
path: PathBuf,
name: String,
metadata: Metadata,
}
impl DirEntry {
pub fn path(&self) -> &Path {
self.path.as_path()
}
pub fn file_name(&self) -> &str {
&self.name
}
pub fn metadata(&self) -> Metadata {
self.metadata
}
}
type SubNodes = Lru<
String,
FnodeWeakRef,
CountMeter<FnodeWeakRef>,
PinChecker<FnodeWeakRef>,
>;
#[derive(Default, Clone, Deserialize, Serialize)]
pub struct Fnode {
ftype: FileType,
opts: Options,
ctime: Time,
mtime: Time,
kids: Vec<ChildEntry>,
vers: VecDeque<Version>,
chk_map: ChunkMap,
#[serde(skip_serializing, skip_deserializing, default)]
parent: Option<FnodeRef>,
#[serde(
skip_serializing,
skip_deserializing,
default = "Fnode::default_sub_nodes"
)]
sub_nodes: SubNodes,
#[serde(skip_serializing, skip_deserializing, default)]
store: StoreRef,
}
impl Fnode {
pub fn new(ftype: FileType, opts: Options, store: &StoreRef) -> Self {
Fnode {
ftype,
opts,
ctime: Time::now(),
mtime: Time::now(),
kids: Vec::new(),
vers: VecDeque::new(),
chk_map: ChunkMap::new(opts.dedup_chunk),
parent: None,
sub_nodes: Self::default_sub_nodes(),
store: store.clone(),
}
}
pub fn new_under(
parent: &FnodeRef,
name: &str,
ftype: FileType,
opts: Options,
txmgr: &TxMgrRef,
) -> Result<FnodeRef> {
let kid = {
let mut pfnode_cow = parent.write().unwrap();
let pfnode = pfnode_cow.make_mut()?;
if !pfnode.is_dir() {
return Err(Error::NotDir);
}
let mut kid = Fnode::new(ftype, opts, &pfnode.store);
if kid.is_file() {
kid.add_version(Content::new())?;
}
kid.into_cow(txmgr)?
};
Fnode::add_child(parent, &kid, name)?;
Ok(kid)
}
#[inline]
fn default_sub_nodes() -> SubNodes {
Lru::new(SUB_NODES_CNT)
}
#[inline]
pub fn is_file(&self) -> bool {
self.ftype == FileType::File
}
#[inline]
pub fn is_dir(&self) -> bool {
self.ftype == FileType::Dir
}
#[inline]
pub fn is_root(&self) -> bool {
self.parent.is_none()
}
pub fn metadata(&self) -> Metadata {
Metadata {
ftype: self.ftype,
content_len: self.curr_len(),
curr_version: self.curr_ver_num(),
ctime: self.ctime,
mtime: self.mtime,
}
}
#[inline]
pub fn curr_len(&self) -> usize {
match self.ftype {
FileType::File => self.curr_ver().content_len(),
FileType::Dir => 0,
}
}
#[inline]
pub fn history(&self) -> Vec<Version> {
Vec::from(self.vers.clone())
}
#[inline]
pub fn get_opts(&self) -> Options {
self.opts
}
pub fn load_root(
root_id: &Eid,
txmgr: &TxMgrRef,
store: &StoreRef,
vol: &VolumeRef,
) -> Result<FnodeRef> {
let root = Cow::<Fnode>::load(root_id, txmgr, vol)?;
{
let mut root_cow = root.write().unwrap();
let root = root_cow.make_mut_naive();
root.store = store.clone();
}
Ok(root)
}
fn load_child(
&mut self,
name: &str,
self_ref: FnodeRef,
cache: &Cache,
vol: &VolumeRef,
) -> Result<FnodeRef> {
if let Some(fnode) = self
.sub_nodes
.get_refresh(name)
.and_then(|sub| sub.upgrade())
{
return Ok(fnode);
}
self.kids
.iter()
.find(|ref c| c.name == name)
.ok_or(Error::NotFound)
.and_then(|child| cache.get(&child.id, vol).map_err(Error::from))
.and_then(|child| {
{
let mut child_cow = child.write().unwrap();
let c = child_cow.make_mut_naive();
c.parent = Some(self_ref);
c.store = self.store.clone();
}
self.sub_nodes
.insert(name.to_string(), Arc::downgrade(&child));
Ok(child)
})
}
#[inline]
pub fn has_child(&self, name: &str) -> bool {
self.kids.iter().any(|ref c| c.name == name)
}
#[inline]
pub fn children_cnt(&self) -> usize {
self.kids.len()
}
pub fn child(
parent: &FnodeRef,
name: &str,
cache: &Cache,
vol: &VolumeRef,
) -> Result<FnodeRef> {
let mut par = parent.write().unwrap();
par.make_mut_naive()
.load_child(name, parent.clone(), cache, vol)
}
fn children_names(&self) -> Vec<String> {
self.kids.iter().map(|ref k| k.name.clone()).collect()
}
pub fn read_dir(
parent: FnodeRef,
path: &Path,
cache: &Cache,
vol: &VolumeRef,
) -> Result<Vec<DirEntry>> {
let mut par = parent.write().unwrap();
let par = par.make_mut_naive();
if !par.is_dir() {
return Err(Error::NotDir);
}
let parent_path = {
#[cfg(windows)]
{
let mut path_str = path.to_str().unwrap().to_string();
if !path_str.ends_with("/") {
path_str.push_str("/");
}
PathBuf::from(path_str)
}
#[cfg(not(windows))]
{
path
}
};
let mut ret = Vec::new();
let child_names = par.children_names();
for name in child_names.iter() {
let child_ref =
par.load_child(&name, parent.clone(), cache, vol)?;
let child = child_ref.read().unwrap();
ret.push(DirEntry {
path: parent_path.join(name),
metadata: child.metadata(),
name: name.clone(),
});
}
Ok(ret)
}
pub fn add_child(
parent: &FnodeRef,
child: &FnodeRef,
name: &str,
) -> Result<()> {
let mut parent_cow = parent.write().unwrap();
let par = parent_cow.make_mut()?;
let mut kid = child.write().unwrap();
par.kids.push(ChildEntry::new(kid.id(), kid.ftype, name));
kid.make_mut()?.parent = Some(parent.clone());
par.sub_nodes
.insert(name.to_string(), Arc::downgrade(child));
par.mtime = Time::now();
Ok(())
}
pub fn remove_from_parent(fnode: &FnodeRef) -> Result<()> {
let child = fnode.read().unwrap();
match child.parent {
Some(ref parent) => {
let mut par = parent.write().unwrap();
let par = par.make_mut()?;
let child_idx = par
.kids
.iter()
.position(|ref c| c.id == *child.id())
.ok_or(Error::NotFound)?;
{
let name = &par.kids[child_idx].name;
par.sub_nodes.remove(name);
}
par.kids.remove(child_idx);
Ok(())
}
None => Err(Error::IsRoot),
}
}
fn ver(&self, ver_num: usize) -> Option<&Version> {
self.vers.iter().find(|v| v.num == ver_num)
}
fn curr_ver(&self) -> &Version {
self.vers.back().unwrap()
}
pub fn curr_ver_num(&self) -> usize {
if self.vers.is_empty() {
return 0;
}
self.curr_ver().num
}
fn remove_ver(&mut self, ver_num: usize) -> Result<()> {
let idx = self
.vers
.iter()
.position(|v| v.num == ver_num)
.ok_or(Error::NoVersion)?;
let ver = self.vers.remove(idx).unwrap();
if let Some(ctn) = {
let mut store = self.store.write().unwrap();
store.make_mut()?.deref_content(&ver.content_id)?
} {
let mut content = ctn.write().unwrap();
content.unlink(&mut self.chk_map, &self.store)?;
content.make_del()?;
}
Ok(())
}
pub fn clear_vers(&mut self) -> Result<()> {
let ver_nums: Vec<usize> = self.vers.iter().map(|v| v.num).collect();
for ver_num in ver_nums {
self.remove_ver(ver_num)?;
}
Ok(())
}
fn add_version(&mut self, content: Content) -> Result<Option<Content>> {
assert!(self.is_file());
let (is_deduped, deduped_id) = {
let mut store = self.store.write().unwrap();
let store = store.make_mut()?;
store.dedup_content(&content)?
};
let ver =
Version::new(self.curr_ver_num() + 1, &deduped_id, content.len());
self.mtime = ver.ctime;
self.vers.push_back(ver);
if self.vers.len() > self.opts.version_limit as usize {
let retire = self.vers.front().unwrap().num;
self.remove_ver(retire)?;
}
if is_deduped {
Ok(None)
} else {
Ok(Some(content))
}
}
pub fn version_reader(&self, ver_num: usize) -> Result<ContentReader> {
let ver = self.ver(ver_num).ok_or(Error::NoVersion)?;
let content = {
let st = self.store.read().unwrap();
let ctn_ref = st.get_content(&ver.content_id)?;
let ctn = ctn_ref.read().unwrap();
ctn.clone()
};
Ok(ContentReader::new(content, &self.store))
}
fn clone_current_content(&self) -> Result<Content> {
let store = self.store.read().unwrap();
let curr_ctn = store.get_content(&self.curr_ver().content_id)?;
let content = curr_ctn.read().unwrap();
Ok(content.clone())
}
pub fn set_len(handle: Handle, len: usize, txid: Txid) -> Result<()> {
let curr_len = {
let fnode = handle.fnode.read().unwrap();
fnode.curr_len()
};
if curr_len < len {
let mut size = len - curr_len;
let buf = vec![0u8; min(size, 16 * 1024)];
let mut wtr = Writer::new(handle.clone(), txid);
wtr.seek(SeekFrom::Start(curr_len as u64))?;
while size > 0 {
let write_len = min(size, buf.len());
let written = wtr.write(&buf[..write_len])?;
size -= written;
}
wtr.finish()?;
} else if curr_len > len {
let mut fnode_cow = handle.fnode.write().unwrap();
let new_ctn = {
let mut ctn = fnode_cow.clone_current_content()?;
ctn.truncate(len, &handle.store)?;
ctn
};
let fnode = fnode_cow.make_mut()?;
if let Some(content) = fnode.add_version(new_ctn)? {
content.link(&handle.store)?;
}
}
Ok(())
}
}
impl Debug for Fnode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Fnode")
.field("ftype", &self.ftype)
.field("opts", &self.opts)
.field("ctime", &self.ctime)
.field("mtime", &self.mtime)
.field("kids", &self.kids)
.field("vers", &self.vers)
.field("chk_map", &self.chk_map)
.field("sub_nodes", &self.sub_nodes)
.finish()
}
}
impl Cowable for Fnode {}
impl<'de> IntoCow<'de> for Fnode {}
pub type FnodeRef = CowRef<Fnode>;
pub type FnodeWeakRef = CowWeakRef<Fnode>;
#[derive(Debug)]
pub struct Reader {
ver: usize,
rdr: ContentReader,
}
impl Reader {
pub fn new(fnode: FnodeRef, ver: usize) -> Result<Self> {
let fnode = fnode.read().unwrap();
let rdr = fnode.version_reader(ver)?;
Ok(Reader { ver, rdr })
}
pub fn new_current(fnode: FnodeRef) -> Result<Self> {
let fnode = fnode.read().unwrap();
let ver = fnode.curr_ver_num();
let rdr = fnode.version_reader(ver)?;
Ok(Reader { ver, rdr })
}
}
impl Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
self.rdr.read(buf)
}
}
impl Seek for Reader {
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
self.rdr.seek(pos)
}
}
#[derive(Debug)]
pub struct Writer {
inner: StoreWriter,
handle: Handle,
}
impl Writer {
pub fn new(handle: Handle, txid: Txid) -> Self {
let chk_map = {
let f = handle.fnode.read().unwrap();
f.chk_map.clone()
};
let inner =
StoreWriter::new(chk_map, &handle.txmgr, &handle.store, txid);
Writer { inner, handle }
}
pub fn finish(self) -> Result<usize> {
let (stg_ctn, chk_map) = self.inner.finish()?;
let handle = &self.handle;
let mut fnode_cow = handle.fnode.write().unwrap();
let merged_ctn = {
let mut ctn = fnode_cow.clone_current_content()?;
ctn.merge_from(&stg_ctn, &handle.store)?;
ctn
};
let fnode = fnode_cow.make_mut()?;
match fnode.add_version(merged_ctn)? {
Some(content) => {
content.link(&handle.store)?;
}
None => {
stg_ctn.unlink_weak(&mut fnode.chk_map, &handle.store)?;
}
}
fnode.chk_map = chk_map;
Ok(stg_ctn.end_offset())
}
}
impl Write for Writer {
#[inline]
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self) -> IoResult<()> {
self.inner.flush()
}
}
impl Seek for Writer {
#[inline]
fn seek(&mut self, pos: SeekFrom) -> IoResult<u64> {
self.inner.seek(pos)
}
}
pub type Cache = CowCache<Fnode>;