ipfrs_storage/
cache.rs

1//! In-memory block cache
2
3use crate::traits::BlockStore;
4use async_trait::async_trait;
5use ipfrs_core::{Block, Cid, Result};
6use lru::LruCache;
7use parking_lot::Mutex;
8use std::num::NonZeroUsize;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12/// Cache statistics
13#[derive(Debug, Clone, Default)]
14pub struct CacheStats {
15    /// Number of cache hits
16    pub hits: u64,
17    /// Number of cache misses
18    pub misses: u64,
19    /// Current number of items in cache
20    pub size: usize,
21    /// Cache capacity
22    pub capacity: usize,
23}
24
25impl CacheStats {
26    /// Calculate hit rate (0.0 to 1.0)
27    pub fn hit_rate(&self) -> f64 {
28        let total = self.hits + self.misses;
29        if total == 0 {
30            0.0
31        } else {
32            self.hits as f64 / total as f64
33        }
34    }
35
36    /// Calculate miss rate (0.0 to 1.0)
37    pub fn miss_rate(&self) -> f64 {
38        1.0 - self.hit_rate()
39    }
40}
41
42/// In-memory LRU cache for blocks
43pub struct BlockCache {
44    cache: Arc<Mutex<LruCache<Cid, Block>>>,
45    capacity: usize,
46    hits: Arc<AtomicU64>,
47    misses: Arc<AtomicU64>,
48}
49
50impl BlockCache {
51    /// Create a new LRU cache with the given capacity (number of blocks)
52    pub fn new(capacity: usize) -> Self {
53        let cap_val = capacity;
54        let capacity = NonZeroUsize::new(capacity).unwrap_or(NonZeroUsize::new(1000).unwrap());
55        Self {
56            cache: Arc::new(Mutex::new(LruCache::new(capacity))),
57            capacity: cap_val,
58            hits: Arc::new(AtomicU64::new(0)),
59            misses: Arc::new(AtomicU64::new(0)),
60        }
61    }
62
63    /// Get a block from cache
64    #[inline]
65    pub fn get(&self, cid: &Cid) -> Option<Block> {
66        let result = self.cache.lock().get(cid).cloned();
67        if result.is_some() {
68            self.hits.fetch_add(1, Ordering::Relaxed);
69        } else {
70            self.misses.fetch_add(1, Ordering::Relaxed);
71        }
72        result
73    }
74
75    /// Put a block into cache
76    #[inline]
77    pub fn put(&self, block: Block) {
78        self.cache.lock().put(*block.cid(), block);
79    }
80
81    /// Remove a block from cache
82    pub fn remove(&self, cid: &Cid) {
83        self.cache.lock().pop(cid);
84    }
85
86    /// Clear the cache
87    pub fn clear(&self) {
88        self.cache.lock().clear();
89        self.hits.store(0, Ordering::Relaxed);
90        self.misses.store(0, Ordering::Relaxed);
91    }
92
93    /// Get cache statistics
94    pub fn stats(&self) -> CacheStats {
95        CacheStats {
96            hits: self.hits.load(Ordering::Relaxed),
97            misses: self.misses.load(Ordering::Relaxed),
98            size: self.cache.lock().len(),
99            capacity: self.capacity,
100        }
101    }
102
103    /// Get cache statistics (for backward compatibility)
104    pub fn len(&self) -> usize {
105        self.cache.lock().len()
106    }
107
108    /// Check if cache is empty
109    pub fn is_empty(&self) -> bool {
110        self.cache.lock().is_empty()
111    }
112}
113
114/// Caching wrapper around a block store
115pub struct CachedBlockStore<S: BlockStore> {
116    store: S,
117    cache: BlockCache,
118}
119
120impl<S: BlockStore> CachedBlockStore<S> {
121    /// Create a new caching block store
122    pub fn new(store: S, cache_capacity: usize) -> Self {
123        Self {
124            store,
125            cache: BlockCache::new(cache_capacity),
126        }
127    }
128
129    /// Get reference to the underlying store
130    pub fn store(&self) -> &S {
131        &self.store
132    }
133
134    /// Get reference to the cache
135    pub fn cache(&self) -> &BlockCache {
136        &self.cache
137    }
138
139    /// Get cache statistics
140    pub fn cache_stats(&self) -> CacheStats {
141        self.cache.stats()
142    }
143}
144
145#[async_trait]
146impl<S: BlockStore> BlockStore for CachedBlockStore<S> {
147    async fn put(&self, block: &Block) -> Result<()> {
148        // Write-through: update both cache and store
149        self.cache.put(block.clone());
150        self.store.put(block).await
151    }
152
153    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
154        // Check cache first
155        if let Some(block) = self.cache.get(cid) {
156            return Ok(Some(block));
157        }
158
159        // Cache miss: fetch from store
160        if let Some(block) = self.store.get(cid).await? {
161            self.cache.put(block.clone());
162            Ok(Some(block))
163        } else {
164            Ok(None)
165        }
166    }
167
168    async fn has(&self, cid: &Cid) -> Result<bool> {
169        // Check cache first
170        if self.cache.get(cid).is_some() {
171            return Ok(true);
172        }
173        self.store.has(cid).await
174    }
175
176    async fn delete(&self, cid: &Cid) -> Result<()> {
177        self.cache.remove(cid);
178        self.store.delete(cid).await
179    }
180
181    fn list_cids(&self) -> Result<Vec<Cid>> {
182        self.store.list_cids()
183    }
184
185    fn len(&self) -> usize {
186        self.store.len()
187    }
188
189    fn is_empty(&self) -> bool {
190        self.store.is_empty()
191    }
192
193    async fn flush(&self) -> Result<()> {
194        self.store.flush().await
195    }
196
197    async fn close(&self) -> Result<()> {
198        self.cache.clear();
199        self.store.close().await
200    }
201
202    // Optimized batch operations to reduce lock contention
203    async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
204        let mut results = Vec::with_capacity(cids.len());
205        let mut cache_misses = Vec::new();
206        let mut miss_indices = Vec::new();
207
208        // Single lock acquisition for all cache lookups
209        {
210            let cache = self.cache.cache.lock();
211            for (i, cid) in cids.iter().enumerate() {
212                if let Some(block) = cache.peek(cid) {
213                    results.push(Some(block.clone()));
214                } else {
215                    results.push(None);
216                    cache_misses.push(*cid);
217                    miss_indices.push(i);
218                }
219            }
220        }
221
222        // Fetch cache misses from store
223        if !cache_misses.is_empty() {
224            let fetched = self.store.get_many(&cache_misses).await?;
225
226            // Update cache and results in a single lock acquisition
227            {
228                let mut cache = self.cache.cache.lock();
229                for (idx, block_opt) in miss_indices.iter().zip(fetched.iter()) {
230                    if let Some(block) = block_opt {
231                        cache.put(*block.cid(), block.clone());
232                        results[*idx] = Some(block.clone());
233                    }
234                }
235            }
236        }
237
238        Ok(results)
239    }
240
241    async fn put_many(&self, blocks: &[Block]) -> Result<()> {
242        // Single lock acquisition for all cache updates
243        {
244            let mut cache = self.cache.cache.lock();
245            for block in blocks {
246                cache.put(*block.cid(), block.clone());
247            }
248        }
249
250        // Write to underlying store
251        self.store.put_many(blocks).await
252    }
253
254    async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
255        let mut results = Vec::with_capacity(cids.len());
256        let mut cache_misses = Vec::new();
257        let mut miss_indices = Vec::new();
258
259        // Single lock acquisition for all cache checks
260        {
261            let cache = self.cache.cache.lock();
262            for (i, cid) in cids.iter().enumerate() {
263                if cache.contains(cid) {
264                    results.push(true);
265                } else {
266                    results.push(false);
267                    cache_misses.push(*cid);
268                    miss_indices.push(i);
269                }
270            }
271        }
272
273        // Check cache misses in store
274        if !cache_misses.is_empty() {
275            let store_results = self.store.has_many(&cache_misses).await?;
276            for (idx, &exists) in miss_indices.iter().zip(store_results.iter()) {
277                results[*idx] = exists;
278            }
279        }
280
281        Ok(results)
282    }
283
284    async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
285        // Single lock acquisition for all cache deletions
286        {
287            let mut cache = self.cache.cache.lock();
288            for cid in cids {
289                cache.pop(cid);
290            }
291        }
292
293        self.store.delete_many(cids).await
294    }
295}
296
297/// Multi-level cache with hot (L1) and warm (L2) tiers
298///
299/// L1 is smaller and faster, L2 is larger but may have more contention
300pub struct TieredBlockCache {
301    /// L1 cache - hot blocks (small, fast)
302    l1_cache: Arc<Mutex<LruCache<Cid, Block>>>,
303    /// L2 cache - warm blocks (larger, slower)
304    l2_cache: Arc<Mutex<LruCache<Cid, Block>>>,
305    /// L1 capacity
306    l1_capacity: usize,
307    /// L2 capacity
308    l2_capacity: usize,
309    /// L1 hits
310    l1_hits: Arc<AtomicU64>,
311    /// L2 hits
312    l2_hits: Arc<AtomicU64>,
313    /// Total misses
314    misses: Arc<AtomicU64>,
315}
316
317impl TieredBlockCache {
318    /// Create a new tiered cache
319    ///
320    /// # Arguments
321    /// * `l1_capacity` - Capacity of L1 (hot) cache in number of blocks
322    /// * `l2_capacity` - Capacity of L2 (warm) cache in number of blocks
323    pub fn new(l1_capacity: usize, l2_capacity: usize) -> Self {
324        let l1_cap = NonZeroUsize::new(l1_capacity).unwrap_or(NonZeroUsize::new(100).unwrap());
325        let l2_cap = NonZeroUsize::new(l2_capacity).unwrap_or(NonZeroUsize::new(1000).unwrap());
326
327        Self {
328            l1_cache: Arc::new(Mutex::new(LruCache::new(l1_cap))),
329            l2_cache: Arc::new(Mutex::new(LruCache::new(l2_cap))),
330            l1_capacity,
331            l2_capacity,
332            l1_hits: Arc::new(AtomicU64::new(0)),
333            l2_hits: Arc::new(AtomicU64::new(0)),
334            misses: Arc::new(AtomicU64::new(0)),
335        }
336    }
337
338    /// Get a block from cache (checks L1 first, then L2)
339    #[inline]
340    pub fn get(&self, cid: &Cid) -> Option<Block> {
341        // Try L1 first
342        if let Some(block) = self.l1_cache.lock().get(cid) {
343            self.l1_hits.fetch_add(1, Ordering::Relaxed);
344            return Some(block.clone());
345        }
346
347        // Try L2
348        if let Some(block) = self.l2_cache.lock().get(cid) {
349            self.l2_hits.fetch_add(1, Ordering::Relaxed);
350            let block_clone = block.clone();
351            // Promote to L1 on hit
352            self.l1_cache.lock().put(*cid, block_clone.clone());
353            return Some(block_clone);
354        }
355
356        self.misses.fetch_add(1, Ordering::Relaxed);
357        None
358    }
359
360    /// Put a block into cache (goes to L1)
361    #[inline]
362    pub fn put(&self, block: Block) {
363        let cid = *block.cid();
364
365        // If block is being evicted from L1, move it to L2
366        if let Some(evicted) = self.l1_cache.lock().push(cid, block.clone()) {
367            // evicted is (Cid, Block)
368            self.l2_cache.lock().put(evicted.0, evicted.1);
369        }
370    }
371
372    /// Remove a block from cache
373    pub fn remove(&self, cid: &Cid) {
374        self.l1_cache.lock().pop(cid);
375        self.l2_cache.lock().pop(cid);
376    }
377
378    /// Clear all caches
379    pub fn clear(&self) {
380        self.l1_cache.lock().clear();
381        self.l2_cache.lock().clear();
382        self.l1_hits.store(0, Ordering::Relaxed);
383        self.l2_hits.store(0, Ordering::Relaxed);
384        self.misses.store(0, Ordering::Relaxed);
385    }
386
387    /// Get cache statistics
388    pub fn stats(&self) -> TieredCacheStats {
389        TieredCacheStats {
390            l1_size: self.l1_cache.lock().len(),
391            l1_capacity: self.l1_capacity,
392            l2_size: self.l2_cache.lock().len(),
393            l2_capacity: self.l2_capacity,
394            l1_hits: self.l1_hits.load(Ordering::Relaxed),
395            l2_hits: self.l2_hits.load(Ordering::Relaxed),
396            misses: self.misses.load(Ordering::Relaxed),
397        }
398    }
399}
400
401/// Statistics for tiered cache
402#[derive(Debug, Clone)]
403pub struct TieredCacheStats {
404    /// Current L1 cache size
405    pub l1_size: usize,
406    /// L1 cache capacity
407    pub l1_capacity: usize,
408    /// Current L2 cache size
409    pub l2_size: usize,
410    /// L2 cache capacity
411    pub l2_capacity: usize,
412    /// L1 cache hits
413    pub l1_hits: u64,
414    /// L2 cache hits
415    pub l2_hits: u64,
416    /// Total misses
417    pub misses: u64,
418}
419
420impl TieredCacheStats {
421    /// Calculate overall hit rate (0.0 to 1.0)
422    pub fn hit_rate(&self) -> f64 {
423        let total_hits = self.l1_hits + self.l2_hits;
424        let total = total_hits + self.misses;
425        if total == 0 {
426            0.0
427        } else {
428            total_hits as f64 / total as f64
429        }
430    }
431
432    /// Calculate L1 hit rate (0.0 to 1.0)
433    pub fn l1_hit_rate(&self) -> f64 {
434        let total = self.l1_hits + self.l2_hits + self.misses;
435        if total == 0 {
436            0.0
437        } else {
438            self.l1_hits as f64 / total as f64
439        }
440    }
441
442    /// Calculate L2 hit rate (0.0 to 1.0)
443    pub fn l2_hit_rate(&self) -> f64 {
444        let total = self.l1_hits + self.l2_hits + self.misses;
445        if total == 0 {
446            0.0
447        } else {
448            self.l2_hits as f64 / total as f64
449        }
450    }
451
452    /// Calculate miss rate (0.0 to 1.0)
453    pub fn miss_rate(&self) -> f64 {
454        1.0 - self.hit_rate()
455    }
456}
457
458/// Tiered caching wrapper around a block store
459pub struct TieredCachedBlockStore<S: BlockStore> {
460    store: S,
461    cache: TieredBlockCache,
462}
463
464impl<S: BlockStore> TieredCachedBlockStore<S> {
465    /// Create a new tiered caching block store
466    ///
467    /// # Arguments
468    /// * `store` - Underlying block store
469    /// * `l1_capacity` - L1 cache capacity (number of blocks)
470    /// * `l2_capacity` - L2 cache capacity (number of blocks)
471    pub fn new(store: S, l1_capacity: usize, l2_capacity: usize) -> Self {
472        Self {
473            store,
474            cache: TieredBlockCache::new(l1_capacity, l2_capacity),
475        }
476    }
477
478    /// Get reference to the underlying store
479    pub fn store(&self) -> &S {
480        &self.store
481    }
482
483    /// Get cache statistics
484    pub fn cache_stats(&self) -> TieredCacheStats {
485        self.cache.stats()
486    }
487}
488
489#[async_trait]
490impl<S: BlockStore> BlockStore for TieredCachedBlockStore<S> {
491    async fn put(&self, block: &Block) -> Result<()> {
492        // Write-through: update cache and store
493        self.cache.put(block.clone());
494        self.store.put(block).await
495    }
496
497    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
498        // Check cache first
499        if let Some(block) = self.cache.get(cid) {
500            return Ok(Some(block));
501        }
502
503        // Cache miss: fetch from store
504        if let Some(block) = self.store.get(cid).await? {
505            self.cache.put(block.clone());
506            Ok(Some(block))
507        } else {
508            Ok(None)
509        }
510    }
511
512    async fn has(&self, cid: &Cid) -> Result<bool> {
513        // Check cache first
514        if self.cache.get(cid).is_some() {
515            return Ok(true);
516        }
517        self.store.has(cid).await
518    }
519
520    async fn delete(&self, cid: &Cid) -> Result<()> {
521        self.cache.remove(cid);
522        self.store.delete(cid).await
523    }
524
525    fn list_cids(&self) -> Result<Vec<Cid>> {
526        self.store.list_cids()
527    }
528
529    fn len(&self) -> usize {
530        self.store.len()
531    }
532
533    fn is_empty(&self) -> bool {
534        self.store.is_empty()
535    }
536
537    async fn flush(&self) -> Result<()> {
538        self.store.flush().await
539    }
540
541    async fn close(&self) -> Result<()> {
542        self.cache.clear();
543        self.store.close().await
544    }
545}