Skip to main content

forest/db/
blockstore_with_read_cache.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use cid::Cid;
5use fvm_ipld_blockstore::Blockstore;
6use std::sync::{
7    Arc,
8    atomic::{self, AtomicUsize},
9};
10
11use crate::utils::{cache::SizeTrackingLruCache, get_size};
12
13pub trait BlockstoreReadCache {
14    fn get(&self, k: &Cid) -> Option<Vec<u8>>;
15
16    fn put(&self, k: Cid, block: Vec<u8>);
17}
18
19pub type LruBlockstoreReadCache = SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>>;
20
21impl BlockstoreReadCache for SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>> {
22    fn get(&self, k: &Cid) -> Option<Vec<u8>> {
23        self.get_cloned(&(*k).into())
24    }
25
26    fn put(&self, k: Cid, block: Vec<u8>) {
27        self.push(k.into(), block);
28    }
29}
30
31impl<T: BlockstoreReadCache> BlockstoreReadCache for Arc<T> {
32    fn get(&self, k: &Cid) -> Option<Vec<u8>> {
33        self.as_ref().get(k)
34    }
35
36    fn put(&self, k: Cid, block: Vec<u8>) {
37        self.as_ref().put(k, block)
38    }
39}
40
41pub trait BlockstoreReadCacheStats {
42    fn hit(&self) -> usize;
43
44    fn track_hit(&self);
45
46    fn miss(&self) -> usize;
47
48    fn track_miss(&self);
49}
50
51#[derive(Debug, Default)]
52pub struct DefaultBlockstoreReadCacheStats {
53    hit: AtomicUsize,
54    miss: AtomicUsize,
55}
56
57impl BlockstoreReadCacheStats for DefaultBlockstoreReadCacheStats {
58    fn hit(&self) -> usize {
59        self.hit.load(atomic::Ordering::Relaxed)
60    }
61
62    fn track_hit(&self) {
63        self.hit.fetch_add(1, atomic::Ordering::Relaxed);
64    }
65
66    fn miss(&self) -> usize {
67        self.miss.load(atomic::Ordering::Relaxed)
68    }
69
70    fn track_miss(&self) {
71        self.miss.fetch_add(1, atomic::Ordering::Relaxed);
72    }
73}
74
75#[derive(derive_more::Constructor)]
76pub struct BlockstoreWithReadCache<
77    DB: Blockstore,
78    CACHE: BlockstoreReadCache,
79    STATS: BlockstoreReadCacheStats,
80> {
81    inner: DB,
82    cache: CACHE,
83    stats: Option<STATS>,
84}
85
86impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats>
87    BlockstoreWithReadCache<DB, CACHE, STATS>
88{
89    pub fn stats(&self) -> Option<&STATS> {
90        self.stats.as_ref()
91    }
92}
93
94impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats> Blockstore
95    for BlockstoreWithReadCache<DB, CACHE, STATS>
96{
97    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
98        if let Some(cached) = self.cache.get(k) {
99            self.stats.as_ref().map(BlockstoreReadCacheStats::track_hit);
100            Ok(Some(cached))
101        } else {
102            let block = self.inner.get(k)?;
103            self.stats
104                .as_ref()
105                .map(BlockstoreReadCacheStats::track_miss);
106            if let Some(block) = &block {
107                self.cache.put(*k, block.clone());
108            }
109            Ok(block)
110        }
111    }
112
113    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
114        self.inner.put_keyed(k, block)
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use crate::{db::MemoryDB, utils::rand::forest_rng};
122    use fvm_ipld_encoding::DAG_CBOR;
123    use multihash_codetable::Code::Blake2b256;
124    use multihash_codetable::MultihashDigest as _;
125    use rand::Rng as _;
126
127    #[test]
128    fn test_blockstore_read_cache() {
129        const N_RECORDS: usize = 4;
130        const CACHE_SIZE: usize = 2;
131        let mem_db = Arc::new(MemoryDB::default());
132        let mut records = Vec::with_capacity(N_RECORDS);
133        for _ in 0..N_RECORDS {
134            let mut record = [0; 1024];
135            forest_rng().fill(&mut record);
136            let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
137            mem_db.put_keyed(&key, &record).unwrap();
138            records.push((key, record));
139        }
140        let cache = Arc::new(LruBlockstoreReadCache::new_without_metrics_registry(
141            "test_blockstore_read_cache".into(),
142            CACHE_SIZE.try_into().unwrap(),
143        ));
144        let db = BlockstoreWithReadCache::new(
145            mem_db.clone(),
146            cache.clone(),
147            Some(DefaultBlockstoreReadCacheStats::default()),
148        );
149
150        assert_eq!(cache.len(), 0);
151        assert_eq!(db.stats().unwrap().hit(), 0);
152        assert_eq!(db.stats().unwrap().miss(), 0);
153
154        for (i, (k, v)) in records.iter().enumerate() {
155            assert_eq!(&db.get(k).unwrap().unwrap(), v);
156
157            assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
158            assert_eq!(db.stats().unwrap().hit(), i);
159            assert_eq!(db.stats().unwrap().miss(), i + 1);
160
161            assert_eq!(&db.get(k).unwrap().unwrap(), v);
162
163            assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
164            assert_eq!(db.stats().unwrap().hit(), i + 1);
165            assert_eq!(db.stats().unwrap().miss(), i + 1);
166        }
167
168        let (k0, v0) = &records[0];
169
170        assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
171
172        assert_eq!(cache.len(), CACHE_SIZE);
173        assert_eq!(db.stats().unwrap().hit(), 4);
174        assert_eq!(db.stats().unwrap().miss(), 5);
175
176        assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
177
178        assert_eq!(cache.len(), CACHE_SIZE);
179        assert_eq!(db.stats().unwrap().hit(), 5);
180        assert_eq!(db.stats().unwrap().miss(), 5);
181    }
182}