mod builder;
pub use builder::CacheBuilder;
use crate::{ForcepError, MetaDb, Metadata, Result, mem_cache::MemCache};
use bytes::Bytes;
use sled::Db;
use std::io;
use std::path;
use std::result;
use tokio::fs as afs;
async fn tempfile(dir: &path::Path) -> Result<(afs::File, path::PathBuf)> {
let tmppath = crate::tmp::tmppath_in(dir);
let tmp = afs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmppath)
.await
.map_err(ForcepError::Io)?;
Ok((tmp, tmppath))
}
#[derive(Debug, Clone)]
struct Options {
path: path::PathBuf,
dir_depth: u8,
track_access: bool,
lru_size: usize,
rbuff_sz: usize,
wbuff_sz: usize,
}
#[derive(Debug)]
pub struct Cache {
meta: MetaDb,
mem: MemCache,
opts: Options,
}
impl Cache {
#[inline]
#[allow(clippy::new_ret_no_self)]
pub fn new<P: AsRef<path::Path>>(path: P) -> CacheBuilder {
CacheBuilder::new(path)
}
async fn create(opts: Options) -> Result<Self> {
afs::create_dir_all(&opts.path)
.await
.map_err(ForcepError::Io)?;
let mut meta_path = opts.path.clone();
meta_path.push("index");
Ok(Self {
meta: MetaDb::new(&meta_path)?,
mem: MemCache::new(opts.lru_size),
opts,
})
}
pub fn get_meta_db_ref(&self) -> &Db {
self.meta.get_db_ref()
}
fn path_from_key(&self, key: &[u8]) -> path::PathBuf {
let hex = hex::encode(key);
let mut buf = self.opts.path.clone();
for n in (0..self.opts.dir_depth).map(|x| x as usize * 2) {
let n_end = n + 2;
buf.push(if n_end >= hex.len() {
"__"
} else {
&hex[n..n_end]
})
}
buf.push(&hex);
buf
}
#[inline]
fn track_access_for(&self, k: &[u8], meta: Metadata) -> Result<()> {
if self.opts.track_access {
self.meta.track_access_for(k, Some(meta))?;
}
Ok(())
}
pub async fn read<K: AsRef<[u8]>>(&self, key: K) -> Result<Bytes> {
use tokio::io::AsyncReadExt;
let k = key.as_ref();
let meta = self.meta.get_metadata(k)?;
if let Some(val) = self.mem.get(k) {
return self.track_access_for(k, meta).map(|_| val);
}
let file = {
let path = self.path_from_key(k);
afs::OpenOptions::new()
.read(true)
.open(&path)
.await
.map_err(|e| match e.kind() {
io::ErrorKind::NotFound => ForcepError::NotFound,
_ => ForcepError::Io(e),
})?
};
let mut buf = Vec::with_capacity(meta.get_size() as _);
tokio::io::BufReader::with_capacity(self.opts.rbuff_sz, file)
.read_to_end(&mut buf)
.await
.map_err(ForcepError::Io)?;
self.track_access_for(k, meta)?;
let bytes = Bytes::from(buf);
self.mem.put(k, Bytes::clone(&bytes));
Ok(bytes)
}
pub async fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&self,
key: K,
value: V,
) -> Result<Metadata> {
use tokio::io::AsyncWriteExt;
let key = key.as_ref();
let value = value.as_ref();
let (tmp, tmp_path) = tempfile(&self.opts.path).await?;
{
let mut writer = tokio::io::BufWriter::with_capacity(self.opts.wbuff_sz, tmp);
writer.write_all(value).await.map_err(ForcepError::Io)?;
writer.flush().await.map_err(ForcepError::Io)?;
}
let final_path = self.path_from_key(key);
if let Some(parent) = final_path.parent() {
afs::create_dir_all(parent).await.map_err(ForcepError::Io)?;
}
afs::rename(&tmp_path, &final_path)
.await
.map_err(ForcepError::Io)?;
if !self.mem.is_nil() {
self.mem.put(key, Bytes::from(Vec::from(value)));
}
self.meta.insert_metadata_for(key, value)
}
pub async fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Metadata> {
let key = key.as_ref();
let cur_path = self.path_from_key(key);
let tmp_path = crate::tmp::tmppath_in(&self.opts.path);
afs::rename(&cur_path, &tmp_path)
.await
.map_err(|e| match e.kind() {
io::ErrorKind::NotFound => ForcepError::NotFound,
_ => ForcepError::Io(e),
})?;
afs::remove_file(&tmp_path).await.map_err(ForcepError::Io)?;
self.meta.remove_metadata_for(key)
}
#[inline]
pub fn read_metadata<K: AsRef<[u8]>>(&self, key: K) -> Result<Metadata> {
self.meta.get_metadata(key.as_ref())
}
#[inline]
pub fn metadata_iter(&self) -> impl Iterator<Item = Result<(Vec<u8>, Metadata)>> {
self.meta.metadata_iter()
}
#[inline]
pub async fn evict_with<E>(&self, evictor: E) -> result::Result<u64, E::Err>
where
E: crate::evictors::Evictor,
{
evictor.evict(self).await
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::CacheBuilder;
async fn default_cache() -> Cache {
CacheBuilder::default().build().await.unwrap()
}
#[tokio::test]
async fn short_path() {
let cache = default_cache().await;
cache.path_from_key(&[0xAA]);
cache.path_from_key(&[0xAA, 0xBB]);
cache.path_from_key(&[0xAA, 0xBB, 0xCC]);
}
#[tokio::test]
async fn write_read_remove() {
let cache = default_cache().await;
cache.write(&b"CACHE_KEY", &b"Hello World").await.unwrap();
let data = cache.read(&b"CACHE_KEY").await.unwrap();
assert_eq!(data.as_ref(), b"Hello World");
cache.remove(&b"CACHE_KEY").await.unwrap();
}
#[tokio::test]
async fn tracking_test() {
let cache = CacheBuilder::default()
.track_access(true)
.build()
.await
.unwrap();
cache.write(b"CACHE_KEY", b"Hello World").await.unwrap();
for _ in 0..100 {
cache.read(b"CACHE_KEY").await.unwrap();
}
assert_eq!(cache.read_metadata(b"CACHE_KEY").unwrap().get_hits(), 100);
}
#[tokio::test]
async fn read_metadata() {
let cache = default_cache().await;
cache.write(&b"CACHE_KEY", &b"Hello World").await.unwrap();
let metadata = cache.read_metadata(&b"CACHE_KEY").unwrap();
assert_eq!(metadata.get_size(), b"Hello World".len() as u64);
}
}