use fnv::{FnvHashMap, FnvHashSet};
use libipld::Cid;
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
time::{Duration, Instant},
};
mod async_tracker;
mod sqlite_tracker;
pub use async_tracker::{AsyncCacheTracker, Spawner};
use parking_lot::Mutex;
pub use sqlite_tracker::SqliteCacheTracker;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone, Copy)]
pub struct BlockInfo {
id: i64,
cid: Cid,
len: usize,
}
impl BlockInfo {
pub fn new(id: i64, cid: &Cid, len: usize) -> Self {
Self { id, cid: *cid, len }
}
pub fn id(&self) -> i64 {
self.id
}
pub fn cid(&self) -> &Cid {
&self.cid
}
pub fn block_len(&self) -> usize {
self.len
}
}
#[derive(Debug, Clone, Copy)]
pub struct WriteInfo {
block: BlockInfo,
block_exists: bool,
}
impl WriteInfo {
pub fn new(block: BlockInfo, block_exists: bool) -> Self {
Self {
block,
block_exists,
}
}
pub fn block_exists(&self) -> bool {
self.block_exists
}
}
impl Deref for WriteInfo {
type Target = BlockInfo;
fn deref(&self) -> &Self::Target {
&self.block
}
}
#[allow(unused_variables)]
pub trait CacheTracker: Debug + Send + Sync {
fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {}
fn blocks_written(&self, blocks: Vec<WriteInfo>) {}
fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {}
fn sort_ids(&self, ids: &mut [i64]) {}
fn has_persistent_state(&self) -> bool;
fn retain_ids(&self, ids: &[i64]) {}
}
impl CacheTracker for Arc<dyn CacheTracker> {
fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
self.as_ref().blocks_accessed(blocks)
}
fn blocks_written(&self, blocks: Vec<WriteInfo>) {
self.as_ref().blocks_written(blocks)
}
fn sort_ids(&self, ids: &mut [i64]) {
self.as_ref().sort_ids(ids)
}
fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
self.as_ref().blocks_deleted(blocks)
}
fn has_persistent_state(&self) -> bool {
self.as_ref().has_persistent_state()
}
fn retain_ids(&self, ids: &[i64]) {
self.as_ref().retain_ids(ids)
}
}
#[derive(Debug)]
pub struct NoopCacheTracker;
impl CacheTracker for NoopCacheTracker {
fn has_persistent_state(&self) -> bool {
false
}
}
#[derive(Debug)]
pub struct SortByIdCacheTracker;
impl CacheTracker for SortByIdCacheTracker {
fn sort_ids(&self, ids: &mut [i64]) {
ids.sort_unstable();
}
fn has_persistent_state(&self) -> bool {
false
}
}
pub struct InMemCacheTracker<T, F> {
cache: Arc<Mutex<FnvHashMap<i64, T>>>,
mk_cache_entry: F,
created: Instant,
}
impl<T, F> InMemCacheTracker<T, F>
where
T: Ord + Clone + Debug,
F: Fn(Duration, BlockInfo) -> Option<T>,
{
pub fn new(mk_cache_entry: F) -> Self {
Self {
cache: Arc::new(Mutex::new(FnvHashMap::default())),
mk_cache_entry,
created: Instant::now(),
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct SortKey<T: Ord> {
time: Option<T>,
id: i64,
}
impl<T: Ord> SortKey<T> {
pub fn new(time: Option<T>, id: i64) -> Self {
Self { time, id }
}
}
fn get_key<T: Ord + Clone>(
cache: &mut impl DerefMut<Target = FnvHashMap<i64, T>>,
id: i64,
) -> SortKey<T> {
SortKey::new(cache.get(&id).cloned(), id)
}
impl<T, F> CacheTracker for InMemCacheTracker<T, F>
where
T: Ord + Clone + Debug + Send + Sync,
F: Fn(Duration, BlockInfo) -> Option<T> + Send + Sync,
{
fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
let now = Instant::now().checked_duration_since(self.created).unwrap();
let mut cache = self.cache.lock();
for block in blocks {
if let Some(value) = (self.mk_cache_entry)(now, block) {
cache.insert(block.id, value);
} else {
cache.remove(&block.id);
}
}
}
fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
let mut cache = self.cache.lock();
for block in blocks {
cache.remove(&block.id);
}
}
fn retain_ids(&self, ids: &[i64]) {
let ids = ids.iter().cloned().collect::<FnvHashSet<_>>();
let mut cache = self.cache.lock();
cache.retain(|id, _| ids.contains(id));
}
fn sort_ids(&self, ids: &mut [i64]) {
let mut cache = self.cache.lock();
ids.sort_unstable_by_key(move |id| get_key(&mut cache, *id));
}
fn has_persistent_state(&self) -> bool {
false
}
}
impl<T: Debug, F> std::fmt::Debug for InMemCacheTracker<T, F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemLruCacheTracker")
.field("cache", &self.cache.lock())
.finish()
}
}
#[cfg(test)]
#[test]
fn sort_key_sort_order() {
assert!(
SortKey::new(None, i64::max_value())
< SortKey::new(Some(Duration::default()), i64::min_value())
);
}