forest/db/
blockstore_with_read_cache.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use cid::Cid;
5use fvm_ipld_blockstore::Blockstore;
6use std::sync::{
7    Arc,
8    atomic::{self, AtomicUsize},
9};
10
11use crate::utils::{cache::SizeTrackingLruCache, get_size};
12
13pub trait BlockstoreReadCache {
14    fn get(&self, k: &Cid) -> Option<Vec<u8>>;
15
16    fn put(&self, k: Cid, block: Vec<u8>);
17}
18
19pub type LruBlockstoreReadCache = SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>>;
20
21impl BlockstoreReadCache for SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>> {
22    fn get(&self, k: &Cid) -> Option<Vec<u8>> {
23        self.get_cloned(&(*k).into())
24    }
25
26    fn put(&self, k: Cid, block: Vec<u8>) {
27        self.push(k.into(), block);
28    }
29}
30
31impl<T: BlockstoreReadCache> BlockstoreReadCache for Arc<T> {
32    fn get(&self, k: &Cid) -> Option<Vec<u8>> {
33        self.as_ref().get(k)
34    }
35
36    fn put(&self, k: Cid, block: Vec<u8>) {
37        self.as_ref().put(k, block)
38    }
39}
40
41pub trait BlockstoreReadCacheStats {
42    fn hit(&self) -> usize;
43
44    fn track_hit(&self);
45
46    fn miss(&self) -> usize;
47
48    fn track_miss(&self);
49}
50
51#[derive(Debug, Default)]
52pub struct DefaultBlockstoreReadCacheStats {
53    hit: AtomicUsize,
54    miss: AtomicUsize,
55}
56
57impl BlockstoreReadCacheStats for DefaultBlockstoreReadCacheStats {
58    fn hit(&self) -> usize {
59        self.hit.load(atomic::Ordering::Relaxed)
60    }
61
62    fn track_hit(&self) {
63        self.hit.fetch_add(1, atomic::Ordering::Relaxed);
64    }
65
66    fn miss(&self) -> usize {
67        self.miss.load(atomic::Ordering::Relaxed)
68    }
69
70    fn track_miss(&self) {
71        self.miss.fetch_add(1, atomic::Ordering::Relaxed);
72    }
73}
74
75pub struct BlockstoreWithReadCache<
76    DB: Blockstore,
77    CACHE: BlockstoreReadCache,
78    STATS: BlockstoreReadCacheStats,
79> {
80    inner: DB,
81    cache: CACHE,
82    stats: Option<STATS>,
83}
84
85impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats>
86    BlockstoreWithReadCache<DB, CACHE, STATS>
87{
88    pub fn new(db: DB, cache: CACHE, stats: Option<STATS>) -> Self {
89        Self {
90            inner: db,
91            cache,
92            stats,
93        }
94    }
95
96    pub fn stats(&self) -> Option<&STATS> {
97        self.stats.as_ref()
98    }
99}
100
101impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats> Blockstore
102    for BlockstoreWithReadCache<DB, CACHE, STATS>
103{
104    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
105        if let Some(cached) = self.cache.get(k) {
106            self.stats.as_ref().map(BlockstoreReadCacheStats::track_hit);
107            Ok(Some(cached))
108        } else {
109            let block = self.inner.get(k)?;
110            self.stats
111                .as_ref()
112                .map(BlockstoreReadCacheStats::track_miss);
113            if let Some(block) = &block {
114                self.cache.put(*k, block.clone());
115            }
116            Ok(block)
117        }
118    }
119
120    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
121        self.inner.put_keyed(k, block)
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::{db::MemoryDB, utils::rand::forest_rng};
129    use fvm_ipld_encoding::DAG_CBOR;
130    use multihash_codetable::Code::Blake2b256;
131    use multihash_codetable::MultihashDigest as _;
132    use rand::Rng as _;
133
134    #[test]
135    fn test_blockstore_read_cache() {
136        const N_RECORDS: usize = 4;
137        const CACHE_SIZE: usize = 2;
138        let mem_db = Arc::new(MemoryDB::default());
139        let mut records = Vec::with_capacity(N_RECORDS);
140        for _ in 0..N_RECORDS {
141            let mut record = [0; 1024];
142            forest_rng().fill(&mut record);
143            let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
144            mem_db.put_keyed(&key, &record).unwrap();
145            records.push((key, record));
146        }
147        let cache = Arc::new(LruBlockstoreReadCache::new_without_metrics_registry(
148            "test_blockstore_read_cache".into(),
149            CACHE_SIZE.try_into().unwrap(),
150        ));
151        let db = BlockstoreWithReadCache::new(
152            mem_db.clone(),
153            cache.clone(),
154            Some(DefaultBlockstoreReadCacheStats::default()),
155        );
156
157        assert_eq!(cache.len(), 0);
158        assert_eq!(db.stats().unwrap().hit(), 0);
159        assert_eq!(db.stats().unwrap().miss(), 0);
160
161        for (i, (k, v)) in records.iter().enumerate() {
162            assert_eq!(&db.get(k).unwrap().unwrap(), v);
163
164            assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
165            assert_eq!(db.stats().unwrap().hit(), i);
166            assert_eq!(db.stats().unwrap().miss(), i + 1);
167
168            assert_eq!(&db.get(k).unwrap().unwrap(), v);
169
170            assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
171            assert_eq!(db.stats().unwrap().hit(), i + 1);
172            assert_eq!(db.stats().unwrap().miss(), i + 1);
173        }
174
175        let (k0, v0) = &records[0];
176
177        assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
178
179        assert_eq!(cache.len(), CACHE_SIZE);
180        assert_eq!(db.stats().unwrap().hit(), 4);
181        assert_eq!(db.stats().unwrap().miss(), 5);
182
183        assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
184
185        assert_eq!(cache.len(), CACHE_SIZE);
186        assert_eq!(db.stats().unwrap().hit(), 5);
187        assert_eq!(db.stats().unwrap().miss(), 5);
188    }
189}