use crate::traits::BlockStore;
use async_trait::async_trait;
use ipfrs_core::{Block, Cid, Result};
use lru::LruCache;
use parking_lot::Mutex;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, Default)]
pub struct CacheStats {
pub hits: u64,
pub misses: u64,
pub size: usize,
pub capacity: usize,
}
impl CacheStats {
pub fn hit_rate(&self) -> f64 {
let total = self.hits + self.misses;
if total == 0 {
0.0
} else {
self.hits as f64 / total as f64
}
}
pub fn miss_rate(&self) -> f64 {
1.0 - self.hit_rate()
}
}
pub struct BlockCache {
cache: Arc<Mutex<LruCache<Cid, Block>>>,
capacity: usize,
hits: Arc<AtomicU64>,
misses: Arc<AtomicU64>,
}
impl BlockCache {
pub fn new(capacity: usize) -> Self {
let cap_val = capacity;
let capacity = NonZeroUsize::new(capacity).unwrap_or(NonZeroUsize::new(1000).unwrap());
Self {
cache: Arc::new(Mutex::new(LruCache::new(capacity))),
capacity: cap_val,
hits: Arc::new(AtomicU64::new(0)),
misses: Arc::new(AtomicU64::new(0)),
}
}
#[inline]
pub fn get(&self, cid: &Cid) -> Option<Block> {
let result = self.cache.lock().get(cid).cloned();
if result.is_some() {
self.hits.fetch_add(1, Ordering::Relaxed);
} else {
self.misses.fetch_add(1, Ordering::Relaxed);
}
result
}
#[inline]
pub fn put(&self, block: Block) {
self.cache.lock().put(*block.cid(), block);
}
pub fn remove(&self, cid: &Cid) {
self.cache.lock().pop(cid);
}
pub fn clear(&self) {
self.cache.lock().clear();
self.hits.store(0, Ordering::Relaxed);
self.misses.store(0, Ordering::Relaxed);
}
pub fn stats(&self) -> CacheStats {
CacheStats {
hits: self.hits.load(Ordering::Relaxed),
misses: self.misses.load(Ordering::Relaxed),
size: self.cache.lock().len(),
capacity: self.capacity,
}
}
pub fn len(&self) -> usize {
self.cache.lock().len()
}
pub fn is_empty(&self) -> bool {
self.cache.lock().is_empty()
}
}
pub struct CachedBlockStore<S: BlockStore> {
store: S,
cache: BlockCache,
}
impl<S: BlockStore> CachedBlockStore<S> {
pub fn new(store: S, cache_capacity: usize) -> Self {
Self {
store,
cache: BlockCache::new(cache_capacity),
}
}
pub fn store(&self) -> &S {
&self.store
}
pub fn cache(&self) -> &BlockCache {
&self.cache
}
pub fn cache_stats(&self) -> CacheStats {
self.cache.stats()
}
}
#[async_trait]
impl<S: BlockStore> BlockStore for CachedBlockStore<S> {
async fn put(&self, block: &Block) -> Result<()> {
self.cache.put(block.clone());
self.store.put(block).await
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
if let Some(block) = self.cache.get(cid) {
return Ok(Some(block));
}
if let Some(block) = self.store.get(cid).await? {
self.cache.put(block.clone());
Ok(Some(block))
} else {
Ok(None)
}
}
async fn has(&self, cid: &Cid) -> Result<bool> {
if self.cache.get(cid).is_some() {
return Ok(true);
}
self.store.has(cid).await
}
async fn delete(&self, cid: &Cid) -> Result<()> {
self.cache.remove(cid);
self.store.delete(cid).await
}
fn list_cids(&self) -> Result<Vec<Cid>> {
self.store.list_cids()
}
fn len(&self) -> usize {
self.store.len()
}
fn is_empty(&self) -> bool {
self.store.is_empty()
}
async fn flush(&self) -> Result<()> {
self.store.flush().await
}
async fn close(&self) -> Result<()> {
self.cache.clear();
self.store.close().await
}
async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
let mut results = Vec::with_capacity(cids.len());
let mut cache_misses = Vec::new();
let mut miss_indices = Vec::new();
{
let cache = self.cache.cache.lock();
for (i, cid) in cids.iter().enumerate() {
if let Some(block) = cache.peek(cid) {
results.push(Some(block.clone()));
} else {
results.push(None);
cache_misses.push(*cid);
miss_indices.push(i);
}
}
}
if !cache_misses.is_empty() {
let fetched = self.store.get_many(&cache_misses).await?;
{
let mut cache = self.cache.cache.lock();
for (idx, block_opt) in miss_indices.iter().zip(fetched.iter()) {
if let Some(block) = block_opt {
cache.put(*block.cid(), block.clone());
results[*idx] = Some(block.clone());
}
}
}
}
Ok(results)
}
async fn put_many(&self, blocks: &[Block]) -> Result<()> {
{
let mut cache = self.cache.cache.lock();
for block in blocks {
cache.put(*block.cid(), block.clone());
}
}
self.store.put_many(blocks).await
}
async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
let mut results = Vec::with_capacity(cids.len());
let mut cache_misses = Vec::new();
let mut miss_indices = Vec::new();
{
let cache = self.cache.cache.lock();
for (i, cid) in cids.iter().enumerate() {
if cache.contains(cid) {
results.push(true);
} else {
results.push(false);
cache_misses.push(*cid);
miss_indices.push(i);
}
}
}
if !cache_misses.is_empty() {
let store_results = self.store.has_many(&cache_misses).await?;
for (idx, &exists) in miss_indices.iter().zip(store_results.iter()) {
results[*idx] = exists;
}
}
Ok(results)
}
async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
{
let mut cache = self.cache.cache.lock();
for cid in cids {
cache.pop(cid);
}
}
self.store.delete_many(cids).await
}
}
pub struct TieredBlockCache {
l1_cache: Arc<Mutex<LruCache<Cid, Block>>>,
l2_cache: Arc<Mutex<LruCache<Cid, Block>>>,
l1_capacity: usize,
l2_capacity: usize,
l1_hits: Arc<AtomicU64>,
l2_hits: Arc<AtomicU64>,
misses: Arc<AtomicU64>,
}
impl TieredBlockCache {
pub fn new(l1_capacity: usize, l2_capacity: usize) -> Self {
let l1_cap = NonZeroUsize::new(l1_capacity).unwrap_or(NonZeroUsize::new(100).unwrap());
let l2_cap = NonZeroUsize::new(l2_capacity).unwrap_or(NonZeroUsize::new(1000).unwrap());
Self {
l1_cache: Arc::new(Mutex::new(LruCache::new(l1_cap))),
l2_cache: Arc::new(Mutex::new(LruCache::new(l2_cap))),
l1_capacity,
l2_capacity,
l1_hits: Arc::new(AtomicU64::new(0)),
l2_hits: Arc::new(AtomicU64::new(0)),
misses: Arc::new(AtomicU64::new(0)),
}
}
#[inline]
pub fn get(&self, cid: &Cid) -> Option<Block> {
if let Some(block) = self.l1_cache.lock().get(cid) {
self.l1_hits.fetch_add(1, Ordering::Relaxed);
return Some(block.clone());
}
if let Some(block) = self.l2_cache.lock().get(cid) {
self.l2_hits.fetch_add(1, Ordering::Relaxed);
let block_clone = block.clone();
self.l1_cache.lock().put(*cid, block_clone.clone());
return Some(block_clone);
}
self.misses.fetch_add(1, Ordering::Relaxed);
None
}
#[inline]
pub fn put(&self, block: Block) {
let cid = *block.cid();
if let Some(evicted) = self.l1_cache.lock().push(cid, block.clone()) {
self.l2_cache.lock().put(evicted.0, evicted.1);
}
}
pub fn remove(&self, cid: &Cid) {
self.l1_cache.lock().pop(cid);
self.l2_cache.lock().pop(cid);
}
pub fn clear(&self) {
self.l1_cache.lock().clear();
self.l2_cache.lock().clear();
self.l1_hits.store(0, Ordering::Relaxed);
self.l2_hits.store(0, Ordering::Relaxed);
self.misses.store(0, Ordering::Relaxed);
}
pub fn stats(&self) -> TieredCacheStats {
TieredCacheStats {
l1_size: self.l1_cache.lock().len(),
l1_capacity: self.l1_capacity,
l2_size: self.l2_cache.lock().len(),
l2_capacity: self.l2_capacity,
l1_hits: self.l1_hits.load(Ordering::Relaxed),
l2_hits: self.l2_hits.load(Ordering::Relaxed),
misses: self.misses.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct TieredCacheStats {
pub l1_size: usize,
pub l1_capacity: usize,
pub l2_size: usize,
pub l2_capacity: usize,
pub l1_hits: u64,
pub l2_hits: u64,
pub misses: u64,
}
impl TieredCacheStats {
pub fn hit_rate(&self) -> f64 {
let total_hits = self.l1_hits + self.l2_hits;
let total = total_hits + self.misses;
if total == 0 {
0.0
} else {
total_hits as f64 / total as f64
}
}
pub fn l1_hit_rate(&self) -> f64 {
let total = self.l1_hits + self.l2_hits + self.misses;
if total == 0 {
0.0
} else {
self.l1_hits as f64 / total as f64
}
}
pub fn l2_hit_rate(&self) -> f64 {
let total = self.l1_hits + self.l2_hits + self.misses;
if total == 0 {
0.0
} else {
self.l2_hits as f64 / total as f64
}
}
pub fn miss_rate(&self) -> f64 {
1.0 - self.hit_rate()
}
}
pub struct TieredCachedBlockStore<S: BlockStore> {
store: S,
cache: TieredBlockCache,
}
impl<S: BlockStore> TieredCachedBlockStore<S> {
pub fn new(store: S, l1_capacity: usize, l2_capacity: usize) -> Self {
Self {
store,
cache: TieredBlockCache::new(l1_capacity, l2_capacity),
}
}
pub fn store(&self) -> &S {
&self.store
}
pub fn cache_stats(&self) -> TieredCacheStats {
self.cache.stats()
}
}
#[async_trait]
impl<S: BlockStore> BlockStore for TieredCachedBlockStore<S> {
async fn put(&self, block: &Block) -> Result<()> {
self.cache.put(block.clone());
self.store.put(block).await
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
if let Some(block) = self.cache.get(cid) {
return Ok(Some(block));
}
if let Some(block) = self.store.get(cid).await? {
self.cache.put(block.clone());
Ok(Some(block))
} else {
Ok(None)
}
}
async fn has(&self, cid: &Cid) -> Result<bool> {
if self.cache.get(cid).is_some() {
return Ok(true);
}
self.store.has(cid).await
}
async fn delete(&self, cid: &Cid) -> Result<()> {
self.cache.remove(cid);
self.store.delete(cid).await
}
fn list_cids(&self) -> Result<Vec<Cid>> {
self.store.list_cids()
}
fn len(&self) -> usize {
self.store.len()
}
fn is_empty(&self) -> bool {
self.store.is_empty()
}
async fn flush(&self) -> Result<()> {
self.store.flush().await
}
async fn close(&self) -> Result<()> {
self.cache.clear();
self.store.close().await
}
}