use std::fmt::{self, Debug};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use linked_hash_map::LinkedHashMap;
use rmp_serde::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use super::super::http_client::{CacheControl, HttpClient};
use super::{CacheBackend, CacheType, DummyBackend};
use base::crypto::{Crypto, Key};
use base::IntoRef;
use error::{Error, Result};
#[derive(Debug, Clone, Deserialize, Serialize)]
struct CacheItem {
len: usize,
is_pinned: bool,
}
impl CacheItem {
#[inline]
fn new(len: usize, is_pinned: bool) -> Self {
CacheItem { len, is_pinned }
}
}
#[derive(Debug, Default, Deserialize, Serialize)]
struct CacheMeta {
cache_type: CacheType,
capacity: usize,
used: usize,
useq: u64,
lru: LinkedHashMap<PathBuf, CacheItem>,
}
pub struct LocalCache {
meta: CacheMeta,
backend: Box<dyn CacheBackend>,
is_changed: bool,
client: HttpClient,
crypto: Crypto,
key: Key,
}
impl LocalCache {
const META_FILE_NAME: &'static str = "cache_meta";
pub fn new(
cache_type: CacheType,
capacity_in_mb: usize,
base: &Path,
repo_id: &str,
access_key: &str,
) -> Result<Self> {
let capacity = capacity_in_mb * 1024 * 1024; let client = HttpClient::new(repo_id, access_key)?;
let mut meta = CacheMeta::default();
meta.cache_type = cache_type;
meta.capacity = capacity;
let backend: Box<dyn CacheBackend> = match cache_type {
CacheType::Mem => Box::new(super::mem::MemBackend::new()),
CacheType::File => {
#[cfg(target_arch = "wasm32")]
{
let _ = base;
Box::new(super::browser::WasmBackend::new())
}
#[cfg(not(target_arch = "wasm32"))]
{
let base = base.join(repo_id);
Box::new(super::file::FileBackend::new(&base))
}
}
};
Ok(LocalCache {
meta,
backend,
is_changed: false,
client,
crypto: Crypto::default(),
key: Key::new_empty(),
})
}
#[inline]
pub fn set_crypto_ctx(&mut self, crypto: Crypto, key: Key) {
self.crypto = crypto;
self.key = key;
}
#[inline]
pub fn repo_exists(&self) -> Result<bool> {
self.client.repo_exists()
}
#[inline]
pub fn connect(&mut self, force: bool) -> Result<()> {
self.client.open_session(force)?;
Ok(())
}
fn evict(&mut self, to_evict: &[(PathBuf, usize)]) -> Result<()> {
for item in to_evict {
self.backend.remove(&item.0)?;
self.meta.lru.remove(&item.0);
self.meta.used -= item.1;
}
Ok(())
}
fn reserve_place(&mut self, len: usize) -> Result<()> {
if self.meta.used + len <= self.meta.capacity {
return Ok(());
}
let need_len = self.meta.used + len - self.meta.capacity;
let mut accum_len = 0;
let mut to_evict: Vec<(PathBuf, usize)> = Vec::new();
for ent in self.meta.lru.entries().filter(|ent| !ent.get().is_pinned) {
accum_len += ent.get().len;
to_evict.push((ent.key().clone(), ent.get().len));
if accum_len >= need_len {
break;
}
}
if accum_len < need_len {
for ent in self.meta.lru.entries().filter(|ent| ent.get().is_pinned)
{
accum_len += ent.get().len;
to_evict.push((ent.key().clone(), ent.get().len));
if accum_len >= need_len {
break;
}
}
}
assert!(accum_len >= need_len);
self.evict(&to_evict)
}
fn ensure_in_local(
&mut self,
rel_path: &Path,
is_pinned: bool,
) -> Result<()> {
self.is_changed = true;
if self.backend.contains(rel_path) {
let _ = self.meta.lru.get_refresh(rel_path);
return Ok(());
}
let remote = {
let obj =
self.client.get(rel_path, CacheControl::from(is_pinned))?;
self.reserve_place(obj.len())?;
obj
};
self.backend.insert(rel_path, &remote)?;
self.meta.lru.insert(
rel_path.to_path_buf(),
CacheItem::new(remote.len(), is_pinned),
);
self.meta.used += remote.len();
Ok(())
}
fn load_meta(&mut self) -> Result<CacheMeta> {
let path = Path::new(Self::META_FILE_NAME);
let buf = self
.backend
.get(&path)
.and_then(|buf| self.crypto.decrypt(&buf, &self.key))?;
let mut de = Deserializer::new(&buf[..]);
let meta: CacheMeta = Deserialize::deserialize(&mut de)?;
Ok(meta)
}
fn save_meta(&mut self) -> Result<()> {
self.meta.useq = self.client.get_update_seq();
let mut buf = Vec::new();
self.meta.serialize(&mut Serializer::new(&mut buf))?;
let path = Path::new(Self::META_FILE_NAME);
self.crypto
.encrypt(&buf, &self.key)
.and_then(|buf| self.backend.insert(&path, &buf))
}
#[inline]
pub fn init(&mut self) -> Result<()> {
self.backend.clear().and_then(|_| self.save_meta())
}
pub fn open(&mut self) -> Result<()> {
if self.meta.cache_type == CacheType::Mem {
return Ok(());
}
match self.load_meta() {
Ok(meta) => {
if self.meta.cache_type != meta.cache_type {
return Err(Error::InvalidUri);
}
let remote_useq = self.client.get_update_seq();
if meta.useq == remote_useq {
self.meta.useq = remote_useq;
self.meta.used = meta.used;
self.meta.lru = meta.lru;
return Ok(());
}
warn!(
"update seq not match, local: {}, remote: {}, \
clear local cache",
meta.useq, remote_useq,
);
self.backend.clear()?;
self.meta.used = 0;
self.meta.lru.clear();
self.save_meta()
}
Err(ref err) if *err == Error::NotFound => self.save_meta(),
Err(err) => Err(err),
}
}
#[inline]
pub fn get_to(
&mut self,
rel_path: &Path,
offset: usize,
dst: &mut [u8],
) -> Result<()> {
self.ensure_in_local(rel_path, false)?;
self.backend.get_exact(rel_path, offset, dst)
}
#[inline]
pub fn get(&mut self, rel_path: &Path) -> Result<Vec<u8>> {
self.ensure_in_local(rel_path, true)?;
self.backend.get(rel_path)
}
fn do_put(
&mut self,
rel_path: &Path,
offset: usize,
obj: &[u8],
is_pinned: bool,
) -> Result<()> {
self.del_local(rel_path)?;
let cache_ctl = CacheControl::from(is_pinned);
self.client.put(rel_path, offset, cache_ctl, obj)?;
if offset == 0 {
self.reserve_place(obj.len())?;
self.backend.insert(rel_path, obj)?;
self.meta.lru.insert(
rel_path.to_path_buf(),
CacheItem::new(obj.len(), is_pinned),
);
self.meta.used += obj.len();
}
Ok(())
}
#[inline]
pub fn put(
&mut self,
rel_path: &Path,
offset: usize,
obj: &[u8],
) -> Result<()> {
self.do_put(rel_path, offset, obj, false)
}
#[inline]
pub fn put_pinned(&mut self, rel_path: &Path, obj: &[u8]) -> Result<()> {
self.do_put(rel_path, 0, obj, true)
}
fn del_local(&mut self, rel_path: &Path) -> Result<()> {
self.backend.remove(rel_path)?;
self.is_changed = true;
if let Some(cache_obj) = self.meta.lru.remove(rel_path) {
self.meta.used -= cache_obj.len;
}
Ok(())
}
#[inline]
pub fn del(&mut self, rel_path: &Path) -> Result<()> {
self.del_local(rel_path)
.and_then(|_| self.client.del(rel_path))
}
pub fn flush(&mut self) -> Result<()> {
if self.is_changed {
self.save_meta()?;
self.client.flush()?;
self.is_changed = false;
}
Ok(())
}
pub fn destroy_repo(&mut self) -> Result<()> {
self.client
.destroy_repo()
.and_then(|_| self.backend.clear())
}
}
impl Default for LocalCache {
#[inline]
fn default() -> Self {
LocalCache {
meta: CacheMeta::default(),
backend: Box::new(DummyBackend::default()),
is_changed: false,
client: HttpClient::default(),
crypto: Crypto::default(),
key: Key::new_empty(),
}
}
}
impl Debug for LocalCache {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("LocalCache")
.field("meta", &self.meta)
.field("is_changed", &self.is_changed)
.finish()
}
}
impl IntoRef for LocalCache {}
pub type LocalCacheRef = Arc<RwLock<LocalCache>>;
#[cfg(test)]
mod tests {
extern crate tempdir;
use self::tempdir::TempDir;
use super::*;
use base::init_env;
fn test_local_cache(cache_type: CacheType, base: &Path) {
init_env();
let repo_id = "repo456";
let access_key = "accessKey456";
let mut cache =
LocalCache::new(cache_type, 1, base, &repo_id, &access_key)
.unwrap();
let k300 = 300 * 1000;
let k400 = 400 * 1000;
let k500 = 500 * 1000;
let rel_path = Path::new("data/aa/bb/k300");
let mut obj = vec![1u8; k300];
obj[0] = 0;
obj[1] = 1;
obj[2] = 2;
let rel_path2 = Path::new("data/aa/bb/k400");
let mut obj2 = vec![2u8; k400];
obj2[0] = 0;
obj2[1] = 1;
obj2[2] = 2;
let rel_path3 = Path::new("data/aa/bb/k500");
let mut obj3 = vec![3u8; k500];
obj3[0] = 0;
obj3[1] = 1;
obj3[2] = 2;
let not_exists = Path::new("not_exists");
assert!(!cache.repo_exists().unwrap());
cache.connect().unwrap();
cache.init().unwrap();
assert_eq!(cache.meta.lru.len(), 0);
cache.put(&rel_path, 0, &obj).unwrap();
cache.put(&rel_path2, 0, &obj2).unwrap();
assert_eq!(cache.meta.lru.len(), 2);
assert_eq!(cache.meta.used, k300 + k400);
cache.put(&rel_path3, 0, &obj3).unwrap();
assert_eq!(cache.meta.lru.len(), 2);
assert_eq!(cache.meta.used, k400 + k500);
cache.put(&rel_path3, 0, &obj3).unwrap();
assert_eq!(cache.meta.lru.len(), 2);
assert_eq!(cache.meta.used, k400 + k500);
let mut tgt = vec![0u8; obj.len()];
cache.get_to(&rel_path, 0, &mut tgt).unwrap();
assert_eq!(&tgt.len(), &obj.len());
assert_eq!(&tgt[..5], &obj[..5]);
assert_eq!(cache.meta.lru.len(), 2);
assert_eq!(cache.meta.used, k500 + k300);
let mut tgt = vec![0u8; obj.len()];
cache.get_to(&rel_path, 0, &mut tgt).unwrap();
assert_eq!(tgt.len(), obj.len());
assert_eq!(&tgt[..5], &obj[..5]);
assert_eq!(cache.meta.lru.len(), 2);
assert_eq!(cache.meta.used, k500 + k300);
let result = cache.get(¬_exists).unwrap_err();
assert_eq!(result, Error::NotFound);
cache.del(&rel_path).unwrap();
assert_eq!(cache.meta.lru.len(), 1);
cache.del(&rel_path).unwrap();
cache.del(&rel_path2).unwrap();
assert_eq!(cache.meta.lru.len(), 1);
cache.flush().unwrap();
assert_eq!(cache.meta.lru.len(), 1);
drop(cache);
let mut cache =
LocalCache::new(cache_type, 2, base, &repo_id, &access_key)
.unwrap();
cache.connect().unwrap();
cache.open().unwrap();
cache.del(¬_exists).unwrap();
cache.put(&rel_path, 0, &obj).unwrap();
cache.put(&rel_path2, 0, &obj2).unwrap();
cache.put(&rel_path3, 0, &obj3).unwrap();
cache.flush().unwrap();
drop(cache);
let mut cache =
LocalCache::new(cache_type, 1, base, &repo_id, &access_key)
.unwrap();
cache.connect().unwrap();
cache.open().unwrap();
if cache_type == CacheType::File {
assert_eq!(cache.meta.lru.len(), 3);
}
cache.put(&rel_path, 50, &obj).unwrap();
if cache_type == CacheType::File {
assert_eq!(cache.meta.lru.len(), 2);
}
}
#[test]
fn local_cache_mem() {
test_local_cache(CacheType::Mem, Path::new(""));
}
#[test]
fn local_cache_file() {
let tmpdir = TempDir::new("zbox_test").expect("Create temp dir failed");
let base = tmpdir.path().to_path_buf();
test_local_cache(CacheType::File, &base);
}
}