1use crate::traits::BlockStore;
4use async_trait::async_trait;
5use ipfrs_core::{Block, Cid, Result};
6use lru::LruCache;
7use parking_lot::Mutex;
8use std::num::NonZeroUsize;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12#[derive(Debug, Clone, Default)]
14pub struct CacheStats {
15 pub hits: u64,
17 pub misses: u64,
19 pub size: usize,
21 pub capacity: usize,
23}
24
25impl CacheStats {
26 pub fn hit_rate(&self) -> f64 {
28 let total = self.hits + self.misses;
29 if total == 0 {
30 0.0
31 } else {
32 self.hits as f64 / total as f64
33 }
34 }
35
36 pub fn miss_rate(&self) -> f64 {
38 1.0 - self.hit_rate()
39 }
40}
41
42pub struct BlockCache {
44 cache: Arc<Mutex<LruCache<Cid, Block>>>,
45 capacity: usize,
46 hits: Arc<AtomicU64>,
47 misses: Arc<AtomicU64>,
48}
49
50impl BlockCache {
51 pub fn new(capacity: usize) -> Self {
53 let cap_val = capacity;
54 let capacity = NonZeroUsize::new(capacity).unwrap_or(NonZeroUsize::new(1000).unwrap());
55 Self {
56 cache: Arc::new(Mutex::new(LruCache::new(capacity))),
57 capacity: cap_val,
58 hits: Arc::new(AtomicU64::new(0)),
59 misses: Arc::new(AtomicU64::new(0)),
60 }
61 }
62
63 #[inline]
65 pub fn get(&self, cid: &Cid) -> Option<Block> {
66 let result = self.cache.lock().get(cid).cloned();
67 if result.is_some() {
68 self.hits.fetch_add(1, Ordering::Relaxed);
69 } else {
70 self.misses.fetch_add(1, Ordering::Relaxed);
71 }
72 result
73 }
74
75 #[inline]
77 pub fn put(&self, block: Block) {
78 self.cache.lock().put(*block.cid(), block);
79 }
80
81 pub fn remove(&self, cid: &Cid) {
83 self.cache.lock().pop(cid);
84 }
85
86 pub fn clear(&self) {
88 self.cache.lock().clear();
89 self.hits.store(0, Ordering::Relaxed);
90 self.misses.store(0, Ordering::Relaxed);
91 }
92
93 pub fn stats(&self) -> CacheStats {
95 CacheStats {
96 hits: self.hits.load(Ordering::Relaxed),
97 misses: self.misses.load(Ordering::Relaxed),
98 size: self.cache.lock().len(),
99 capacity: self.capacity,
100 }
101 }
102
103 pub fn len(&self) -> usize {
105 self.cache.lock().len()
106 }
107
108 pub fn is_empty(&self) -> bool {
110 self.cache.lock().is_empty()
111 }
112}
113
114pub struct CachedBlockStore<S: BlockStore> {
116 store: S,
117 cache: BlockCache,
118}
119
120impl<S: BlockStore> CachedBlockStore<S> {
121 pub fn new(store: S, cache_capacity: usize) -> Self {
123 Self {
124 store,
125 cache: BlockCache::new(cache_capacity),
126 }
127 }
128
129 pub fn store(&self) -> &S {
131 &self.store
132 }
133
134 pub fn cache(&self) -> &BlockCache {
136 &self.cache
137 }
138
139 pub fn cache_stats(&self) -> CacheStats {
141 self.cache.stats()
142 }
143}
144
145#[async_trait]
146impl<S: BlockStore> BlockStore for CachedBlockStore<S> {
147 async fn put(&self, block: &Block) -> Result<()> {
148 self.cache.put(block.clone());
150 self.store.put(block).await
151 }
152
153 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
154 if let Some(block) = self.cache.get(cid) {
156 return Ok(Some(block));
157 }
158
159 if let Some(block) = self.store.get(cid).await? {
161 self.cache.put(block.clone());
162 Ok(Some(block))
163 } else {
164 Ok(None)
165 }
166 }
167
168 async fn has(&self, cid: &Cid) -> Result<bool> {
169 if self.cache.get(cid).is_some() {
171 return Ok(true);
172 }
173 self.store.has(cid).await
174 }
175
176 async fn delete(&self, cid: &Cid) -> Result<()> {
177 self.cache.remove(cid);
178 self.store.delete(cid).await
179 }
180
181 fn list_cids(&self) -> Result<Vec<Cid>> {
182 self.store.list_cids()
183 }
184
185 fn len(&self) -> usize {
186 self.store.len()
187 }
188
189 fn is_empty(&self) -> bool {
190 self.store.is_empty()
191 }
192
193 async fn flush(&self) -> Result<()> {
194 self.store.flush().await
195 }
196
197 async fn close(&self) -> Result<()> {
198 self.cache.clear();
199 self.store.close().await
200 }
201
202 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
204 let mut results = Vec::with_capacity(cids.len());
205 let mut cache_misses = Vec::new();
206 let mut miss_indices = Vec::new();
207
208 {
210 let cache = self.cache.cache.lock();
211 for (i, cid) in cids.iter().enumerate() {
212 if let Some(block) = cache.peek(cid) {
213 results.push(Some(block.clone()));
214 } else {
215 results.push(None);
216 cache_misses.push(*cid);
217 miss_indices.push(i);
218 }
219 }
220 }
221
222 if !cache_misses.is_empty() {
224 let fetched = self.store.get_many(&cache_misses).await?;
225
226 {
228 let mut cache = self.cache.cache.lock();
229 for (idx, block_opt) in miss_indices.iter().zip(fetched.iter()) {
230 if let Some(block) = block_opt {
231 cache.put(*block.cid(), block.clone());
232 results[*idx] = Some(block.clone());
233 }
234 }
235 }
236 }
237
238 Ok(results)
239 }
240
241 async fn put_many(&self, blocks: &[Block]) -> Result<()> {
242 {
244 let mut cache = self.cache.cache.lock();
245 for block in blocks {
246 cache.put(*block.cid(), block.clone());
247 }
248 }
249
250 self.store.put_many(blocks).await
252 }
253
254 async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
255 let mut results = Vec::with_capacity(cids.len());
256 let mut cache_misses = Vec::new();
257 let mut miss_indices = Vec::new();
258
259 {
261 let cache = self.cache.cache.lock();
262 for (i, cid) in cids.iter().enumerate() {
263 if cache.contains(cid) {
264 results.push(true);
265 } else {
266 results.push(false);
267 cache_misses.push(*cid);
268 miss_indices.push(i);
269 }
270 }
271 }
272
273 if !cache_misses.is_empty() {
275 let store_results = self.store.has_many(&cache_misses).await?;
276 for (idx, &exists) in miss_indices.iter().zip(store_results.iter()) {
277 results[*idx] = exists;
278 }
279 }
280
281 Ok(results)
282 }
283
284 async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
285 {
287 let mut cache = self.cache.cache.lock();
288 for cid in cids {
289 cache.pop(cid);
290 }
291 }
292
293 self.store.delete_many(cids).await
294 }
295}
296
297pub struct TieredBlockCache {
301 l1_cache: Arc<Mutex<LruCache<Cid, Block>>>,
303 l2_cache: Arc<Mutex<LruCache<Cid, Block>>>,
305 l1_capacity: usize,
307 l2_capacity: usize,
309 l1_hits: Arc<AtomicU64>,
311 l2_hits: Arc<AtomicU64>,
313 misses: Arc<AtomicU64>,
315}
316
317impl TieredBlockCache {
318 pub fn new(l1_capacity: usize, l2_capacity: usize) -> Self {
324 let l1_cap = NonZeroUsize::new(l1_capacity).unwrap_or(NonZeroUsize::new(100).unwrap());
325 let l2_cap = NonZeroUsize::new(l2_capacity).unwrap_or(NonZeroUsize::new(1000).unwrap());
326
327 Self {
328 l1_cache: Arc::new(Mutex::new(LruCache::new(l1_cap))),
329 l2_cache: Arc::new(Mutex::new(LruCache::new(l2_cap))),
330 l1_capacity,
331 l2_capacity,
332 l1_hits: Arc::new(AtomicU64::new(0)),
333 l2_hits: Arc::new(AtomicU64::new(0)),
334 misses: Arc::new(AtomicU64::new(0)),
335 }
336 }
337
338 #[inline]
340 pub fn get(&self, cid: &Cid) -> Option<Block> {
341 if let Some(block) = self.l1_cache.lock().get(cid) {
343 self.l1_hits.fetch_add(1, Ordering::Relaxed);
344 return Some(block.clone());
345 }
346
347 if let Some(block) = self.l2_cache.lock().get(cid) {
349 self.l2_hits.fetch_add(1, Ordering::Relaxed);
350 let block_clone = block.clone();
351 self.l1_cache.lock().put(*cid, block_clone.clone());
353 return Some(block_clone);
354 }
355
356 self.misses.fetch_add(1, Ordering::Relaxed);
357 None
358 }
359
360 #[inline]
362 pub fn put(&self, block: Block) {
363 let cid = *block.cid();
364
365 if let Some(evicted) = self.l1_cache.lock().push(cid, block.clone()) {
367 self.l2_cache.lock().put(evicted.0, evicted.1);
369 }
370 }
371
372 pub fn remove(&self, cid: &Cid) {
374 self.l1_cache.lock().pop(cid);
375 self.l2_cache.lock().pop(cid);
376 }
377
378 pub fn clear(&self) {
380 self.l1_cache.lock().clear();
381 self.l2_cache.lock().clear();
382 self.l1_hits.store(0, Ordering::Relaxed);
383 self.l2_hits.store(0, Ordering::Relaxed);
384 self.misses.store(0, Ordering::Relaxed);
385 }
386
387 pub fn stats(&self) -> TieredCacheStats {
389 TieredCacheStats {
390 l1_size: self.l1_cache.lock().len(),
391 l1_capacity: self.l1_capacity,
392 l2_size: self.l2_cache.lock().len(),
393 l2_capacity: self.l2_capacity,
394 l1_hits: self.l1_hits.load(Ordering::Relaxed),
395 l2_hits: self.l2_hits.load(Ordering::Relaxed),
396 misses: self.misses.load(Ordering::Relaxed),
397 }
398 }
399}
400
401#[derive(Debug, Clone)]
403pub struct TieredCacheStats {
404 pub l1_size: usize,
406 pub l1_capacity: usize,
408 pub l2_size: usize,
410 pub l2_capacity: usize,
412 pub l1_hits: u64,
414 pub l2_hits: u64,
416 pub misses: u64,
418}
419
420impl TieredCacheStats {
421 pub fn hit_rate(&self) -> f64 {
423 let total_hits = self.l1_hits + self.l2_hits;
424 let total = total_hits + self.misses;
425 if total == 0 {
426 0.0
427 } else {
428 total_hits as f64 / total as f64
429 }
430 }
431
432 pub fn l1_hit_rate(&self) -> f64 {
434 let total = self.l1_hits + self.l2_hits + self.misses;
435 if total == 0 {
436 0.0
437 } else {
438 self.l1_hits as f64 / total as f64
439 }
440 }
441
442 pub fn l2_hit_rate(&self) -> f64 {
444 let total = self.l1_hits + self.l2_hits + self.misses;
445 if total == 0 {
446 0.0
447 } else {
448 self.l2_hits as f64 / total as f64
449 }
450 }
451
452 pub fn miss_rate(&self) -> f64 {
454 1.0 - self.hit_rate()
455 }
456}
457
458pub struct TieredCachedBlockStore<S: BlockStore> {
460 store: S,
461 cache: TieredBlockCache,
462}
463
464impl<S: BlockStore> TieredCachedBlockStore<S> {
465 pub fn new(store: S, l1_capacity: usize, l2_capacity: usize) -> Self {
472 Self {
473 store,
474 cache: TieredBlockCache::new(l1_capacity, l2_capacity),
475 }
476 }
477
478 pub fn store(&self) -> &S {
480 &self.store
481 }
482
483 pub fn cache_stats(&self) -> TieredCacheStats {
485 self.cache.stats()
486 }
487}
488
489#[async_trait]
490impl<S: BlockStore> BlockStore for TieredCachedBlockStore<S> {
491 async fn put(&self, block: &Block) -> Result<()> {
492 self.cache.put(block.clone());
494 self.store.put(block).await
495 }
496
497 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
498 if let Some(block) = self.cache.get(cid) {
500 return Ok(Some(block));
501 }
502
503 if let Some(block) = self.store.get(cid).await? {
505 self.cache.put(block.clone());
506 Ok(Some(block))
507 } else {
508 Ok(None)
509 }
510 }
511
512 async fn has(&self, cid: &Cid) -> Result<bool> {
513 if self.cache.get(cid).is_some() {
515 return Ok(true);
516 }
517 self.store.has(cid).await
518 }
519
520 async fn delete(&self, cid: &Cid) -> Result<()> {
521 self.cache.remove(cid);
522 self.store.delete(cid).await
523 }
524
525 fn list_cids(&self) -> Result<Vec<Cid>> {
526 self.store.list_cids()
527 }
528
529 fn len(&self) -> usize {
530 self.store.len()
531 }
532
533 fn is_empty(&self) -> bool {
534 self.store.is_empty()
535 }
536
537 async fn flush(&self) -> Result<()> {
538 self.store.flush().await
539 }
540
541 async fn close(&self) -> Result<()> {
542 self.cache.clear();
543 self.store.close().await
544 }
545}