use std::fmt::{self, Debug};
use std::io::{Read, Result as IoResult, Write};
use std::sync::{Arc, RwLock, Weak};
use super::allocator::AllocatorRef;
use super::storage::{self, Storage, StorageRef};
use super::super_block::SuperBlk;
use base::crypto::{Cipher, Cost, Salt};
use base::lz4::{
BlockMode, BlockSize, ContentChecksum, Decoder as Lz4Decoder,
Encoder as Lz4Encoder, EncoderBuilder as Lz4EncoderBuilder,
};
use base::{IntoRef, Time, Version};
use error::{Error, Result};
use fs::Config;
use trans::{Eid, Finish};
#[derive(Debug, Clone, Default)]
pub struct Info {
pub id: Eid,
pub ver: Version,
pub uri: String,
pub compress: bool,
pub cost: Cost,
pub cipher: Cipher,
pub ctime: Time,
}
#[derive(Debug, Default)]
pub struct Volume {
info: Info,
storage: StorageRef,
}
impl Volume {
pub fn new(uri: &str) -> Result<Self> {
let mut info = Info::default();
info.uri = uri.to_string();
let storage = Storage::new(uri)?.into_ref();
Ok(Volume { info, storage })
}
pub fn init(
&mut self,
pwd: &str,
cfg: &Config,
payload: &[u8],
) -> Result<()> {
let mut storage = self.storage.write().unwrap();
storage.connect(false)?;
storage.init(cfg.cost, cfg.cipher)?;
self.info.id = Eid::new();
self.info.ver = Version::repo_version();
self.info.compress = cfg.compress;
self.info.cost = cfg.cost;
self.info.cipher = cfg.cipher;
self.info.ctime = Time::now();
let mut super_blk = SuperBlk::default();
super_blk.head.salt = Salt::new();
super_blk.head.cost = cfg.cost;
super_blk.head.cipher = cfg.cipher;
super_blk.body.volume_id = self.info.id.clone();
super_blk.body.ver = self.info.ver.clone();
super_blk.body.key = storage.get_key().clone();
super_blk.body.uri = self.info.uri.clone();
super_blk.body.compress = cfg.compress;
super_blk.body.ctime = self.info.ctime;
super_blk.body.payload = payload.to_vec();
super_blk.save(pwd, &mut storage)?;
debug!("volume initialised");
Ok(())
}
pub fn open(&mut self, pwd: &str, force: bool) -> Result<Vec<u8>> {
let mut storage = self.storage.write().unwrap();
storage.connect(force)?;
let super_blk = SuperBlk::load(pwd, &mut storage)?;
if !super_blk.body.ver.match_repo_version() {
return Err(Error::WrongVersion);
}
storage.open(
super_blk.head.cost,
super_blk.head.cipher,
super_blk.body.key.clone(),
force,
)?;
self.info.id = super_blk.body.volume_id.clone();
self.info.ver = super_blk.body.ver;
self.info.compress = super_blk.body.compress;
self.info.cost = super_blk.head.cost;
self.info.cipher = super_blk.head.cipher;
self.info.ctime = super_blk.body.ctime;
debug!("volume opened: {}", *storage);
Ok(super_blk.body.payload.clone())
}
pub fn repair_super_block(&mut self, pwd: &str) -> Result<()> {
let mut storage = self.storage.write().unwrap();
storage.connect(false)?;
SuperBlk::repair(pwd, &mut storage)
}
pub fn exists(&self) -> Result<bool> {
let storage = self.storage.read().unwrap();
storage.exists()
}
pub fn reset_password(
&mut self,
old_pwd: &str,
new_pwd: &str,
cost: Cost,
) -> Result<()> {
let mut storage = self.storage.write().unwrap();
let mut super_blk = SuperBlk::load(old_pwd, &mut storage)?;
super_blk.head.cost = cost;
super_blk.save(new_pwd, &mut storage)?;
self.info.cost = cost;
Ok(())
}
#[inline]
pub fn info(&self) -> Info {
self.info.clone()
}
#[inline]
pub fn get_allocator(&self) -> AllocatorRef {
let storage = self.storage.read().unwrap();
storage.get_allocator()
}
#[inline]
pub fn del_wal(&mut self, id: &Eid) -> Result<()> {
let mut storage = self.storage.write().unwrap();
storage.del_wal(id)
}
#[inline]
pub fn del(&mut self, id: &Eid) -> Result<()> {
let mut storage = self.storage.write().unwrap();
storage.del(id)
}
#[inline]
pub fn flush(&mut self) -> Result<()> {
let mut storage = self.storage.write().unwrap();
storage.flush()
}
#[inline]
pub fn destroy(&mut self) -> Result<()> {
let mut storage = self.storage.write().unwrap();
storage.destroy()
}
}
impl IntoRef for Volume {}
pub type VolumeRef = Arc<RwLock<Volume>>;
pub type VolumeWeakRef = Weak<RwLock<Volume>>;
pub struct WalReader {
inner: storage::WalReader,
}
impl WalReader {
#[inline]
pub fn new(id: &Eid, vol: &VolumeRef) -> Self {
let vol = vol.read().unwrap();
WalReader {
inner: storage::WalReader::new(id, &vol.storage),
}
}
}
impl Read for WalReader {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
self.inner.read(buf)
}
}
pub struct Reader {
inner: Box<dyn Read>,
}
impl Reader {
pub fn new(id: &Eid, vol: &VolumeRef) -> Result<Self> {
let vol = vol.read().unwrap();
let rdr = storage::Reader::new(id, &vol.storage)?;
if vol.info.compress {
Ok(Reader {
inner: Box::new(Lz4Decoder::new(rdr).unwrap()),
})
} else {
Ok(Reader {
inner: Box::new(rdr),
})
}
}
}
impl Read for Reader {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
self.inner.read(buf)
}
}
impl Debug for Reader {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "VolReader")
}
}
pub struct WalWriter {
inner: storage::WalWriter,
}
impl WalWriter {
#[inline]
pub fn new(id: &Eid, vol: &VolumeRef) -> Self {
let vol = vol.read().unwrap();
WalWriter {
inner: storage::WalWriter::new(id, &vol.storage),
}
}
}
impl Write for WalWriter {
#[inline]
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self) -> IoResult<()> {
self.inner.flush()
}
}
impl Finish for WalWriter {
#[inline]
fn finish(self) -> Result<()> {
self.inner.finish()
}
}
enum InnerWriter {
Compress(Lz4Encoder<storage::Writer>),
NoCompress(storage::Writer),
}
pub struct Writer {
inner: InnerWriter,
}
impl Writer {
pub fn new(id: &Eid, vol: &VolumeWeakRef) -> Result<Self> {
let vol = vol.upgrade().ok_or(Error::RepoClosed)?;
let vol = vol.read().unwrap();
let wtr = storage::Writer::new(id, &Arc::downgrade(&vol.storage))?;
let inner = if vol.info.compress {
let comp = Lz4EncoderBuilder::new()
.block_size(BlockSize::Default)
.block_mode(BlockMode::Linked)
.checksum(ContentChecksum::NoChecksum)
.level(0)
.auto_flush(true)
.build(wtr)?;
InnerWriter::Compress(comp)
} else {
InnerWriter::NoCompress(wtr)
};
Ok(Writer { inner })
}
}
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
match self.inner {
InnerWriter::Compress(ref mut inner) => inner.write(buf),
InnerWriter::NoCompress(ref mut inner) => inner.write(buf),
}
}
fn flush(&mut self) -> IoResult<()> {
match self.inner {
InnerWriter::Compress(ref mut inner) => inner.flush(),
InnerWriter::NoCompress(ref mut inner) => inner.flush(),
}
}
}
impl Finish for Writer {
fn finish(self) -> Result<()> {
match self.inner {
InnerWriter::Compress(inner) => {
let (wtr, result) = inner.finish();
result.map_err(Error::from)?;
wtr.finish()
}
InnerWriter::NoCompress(inner) => inner.finish(),
}
}
}
impl Debug for Writer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "VolumeWriter()")
}
}
#[cfg(test)]
mod tests {
extern crate tempdir;
use std::time::Instant;
#[cfg(feature = "storage-file")]
use self::tempdir::TempDir;
use super::*;
use base::crypto::{Crypto, RandomSeed, RANDOM_SEED_SIZE};
use base::init_env;
use base::utils::speed_str;
fn setup_mem_vol(loc: &str) -> VolumeRef {
init_env();
let uri = format!("mem://{}", loc);
let mut vol = Volume::new(&uri).unwrap();
vol.init("pwd", &Config::default(), &Vec::new()).unwrap();
vol.into_ref()
}
#[cfg(feature = "storage-file")]
fn setup_file_vol(pwd: &str, payload: &[u8]) -> (VolumeRef, TempDir) {
init_env();
let tmpdir = TempDir::new("zbox_test").expect("Create temp dir failed");
let dir = tmpdir.path().to_path_buf();
if dir.exists() {
std::fs::remove_dir_all(&dir).unwrap();
}
let uri = format!("file://{}", dir.display());
let mut vol = Volume::new(&uri).unwrap();
vol.init(pwd, &Config::default(), payload).unwrap();
(vol.into_ref(), tmpdir)
}
fn write_to_entity(id: &Eid, buf: &[u8], vol: &VolumeRef) {
let mut wtr = Writer::new(&id, &Arc::downgrade(vol)).unwrap();
wtr.write_all(&buf).unwrap();
wtr.finish().unwrap();
}
fn verify_entity(id: &Eid, buf: &[u8], vol: &VolumeRef) {
let mut dst = Vec::new();
let mut rdr = Reader::new(&id, &vol).unwrap();
rdr.read_to_end(&mut dst).unwrap();
assert_eq!(&dst[..], &buf[..]);
}
fn read_write_test(vol: &VolumeRef) {
let id = Eid::new();
let buf = [1, 2, 3];
let buf2 = [4, 5, 6];
write_to_entity(&id, &buf, &vol);
verify_entity(&id, &buf, &vol);
write_to_entity(&id, &buf2, &vol);
verify_entity(&id, &buf2, &vol);
{
let mut vol = vol.write().unwrap();
vol.del(&id).unwrap();
}
assert_eq!(Reader::new(&id, &vol).unwrap_err(), Error::NotFound);
}
#[cfg(any(feature = "storage-file", feature = "storage-zbox"))]
fn reopen_test(pwd: &str, payload: &[u8], vol: VolumeRef) {
let id = Eid::new();
let buf = [1, 2, 3];
read_write_test(&vol);
write_to_entity(&id, &buf, &vol);
{
let mut vol = vol.write().unwrap();
vol.flush().unwrap();
}
let (uri, _info, wmark) = {
let vol = vol.read().unwrap();
let storage = vol.storage.read().unwrap();
let allocator_ref = storage.get_allocator();
let allocator = allocator_ref.read().unwrap();
(vol.info.uri.clone(), vol.info(), allocator.block_wmark())
};
drop(vol);
let mut vol = Volume::new(&uri).unwrap();
let buf = vol.open(&pwd, false).unwrap();
assert_eq!(&buf[..], &payload[..]);
{
let storage = vol.storage.write().unwrap();
let allocator_ref = storage.get_allocator();
let mut allocator = allocator_ref.write().unwrap();
allocator.set_block_wmark(wmark);
}
let vol = vol.into_ref();
read_write_test(&vol);
verify_entity(&id, &buf, &vol);
}
#[test]
fn mem_volume() {
let vol = setup_mem_vol("mem_volume");
read_write_test(&vol);
}
#[cfg(feature = "storage-file")]
#[test]
fn file_volume() {
let pwd = "pwd";
let payload = [1, 2, 3];
let (vol, _tmpdir) = setup_file_vol(&pwd, &payload);
reopen_test(&pwd, &payload, vol);
}
#[cfg(feature = "storage-zbox")]
#[test]
fn zbox_volume() {
init_env();
let pwd = "pwd";
let payload = [1, 2, 3];
let uri = "zbox://accessKey456@repo456?cache_type=mem&cache_size=1mb";
let mut vol = Volume::new(&uri).unwrap();
vol.init(&pwd, &Config::default(), &payload).unwrap();
let vol = vol.into_ref();
reopen_test(&pwd, &payload, vol);
}
fn perf_test(vol: VolumeRef, prefix: &str) {
const DATA_LEN: usize = 36 * 1024 * 1024;
let mut buf = vec![0u8; DATA_LEN];
let seed = RandomSeed::from(&[0u8; RANDOM_SEED_SIZE]);
Crypto::random_buf_deterministic(&mut buf, &seed);
let id = Eid::new();
let now = Instant::now();
write_to_entity(&id, &buf, &vol);
let write_time = now.elapsed();
let now = Instant::now();
{
let mut rdr = Reader::new(&id, &vol).unwrap();
let mut dst = Vec::new();
rdr.read_to_end(&mut dst).unwrap();
}
let read_time = now.elapsed();
println!(
"{} perf: read: {}, write: {}",
prefix,
speed_str(&read_time, DATA_LEN),
speed_str(&write_time, DATA_LEN)
);
}
#[test]
fn mem_perf() {
let vol = setup_mem_vol("mem_perf");
perf_test(vol, "Memory volume");
}
#[cfg(feature = "storage-file")]
#[test]
fn file_perf() {
let (vol, _tmpdir) = setup_file_vol("pwd", &Vec::new());
perf_test(vol, "File volume");
}
}