Skip to main content

forest/db/
blockstore_with_read_cache.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use cid::Cid;
5use fvm_ipld_blockstore::Blockstore;
6use std::sync::atomic::{self, AtomicUsize};
7
8use crate::utils::{cache::SizeTrackingLruCache, get_size};
9
10#[auto_impl::auto_impl(&, Arc)]
11pub trait BlockstoreReadCache {
12    fn get(&self, k: &Cid) -> Option<Vec<u8>>;
13
14    fn put(&self, k: Cid, block: Vec<u8>);
15}
16
17pub type LruBlockstoreReadCache = SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>>;
18
19impl BlockstoreReadCache for SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>> {
20    fn get(&self, k: &Cid) -> Option<Vec<u8>> {
21        self.get_cloned(&(*k).into())
22    }
23
24    fn put(&self, k: Cid, block: Vec<u8>) {
25        self.push(k.into(), block);
26    }
27}
28
29pub trait BlockstoreReadCacheStats {
30    fn hit(&self) -> usize;
31
32    fn track_hit(&self);
33
34    fn miss(&self) -> usize;
35
36    fn track_miss(&self);
37}
38
39#[derive(Debug, Default)]
40pub struct DefaultBlockstoreReadCacheStats {
41    hit: crossbeam_utils::CachePadded<AtomicUsize>,
42    miss: crossbeam_utils::CachePadded<AtomicUsize>,
43}
44
45impl BlockstoreReadCacheStats for DefaultBlockstoreReadCacheStats {
46    fn hit(&self) -> usize {
47        self.hit.load(atomic::Ordering::Relaxed)
48    }
49
50    fn track_hit(&self) {
51        self.hit.fetch_add(1, atomic::Ordering::Relaxed);
52    }
53
54    fn miss(&self) -> usize {
55        self.miss.load(atomic::Ordering::Relaxed)
56    }
57
58    fn track_miss(&self) {
59        self.miss.fetch_add(1, atomic::Ordering::Relaxed);
60    }
61}
62
63#[derive(derive_more::Constructor)]
64pub struct BlockstoreWithReadCache<
65    DB: Blockstore,
66    CACHE: BlockstoreReadCache,
67    STATS: BlockstoreReadCacheStats,
68> {
69    inner: DB,
70    cache: CACHE,
71    stats: Option<STATS>,
72}
73
74impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats>
75    BlockstoreWithReadCache<DB, CACHE, STATS>
76{
77    pub fn stats(&self) -> Option<&STATS> {
78        self.stats.as_ref()
79    }
80}
81
82impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats> Blockstore
83    for BlockstoreWithReadCache<DB, CACHE, STATS>
84{
85    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
86        if let Some(cached) = self.cache.get(k) {
87            self.stats.as_ref().map(BlockstoreReadCacheStats::track_hit);
88            Ok(Some(cached))
89        } else {
90            let block = self.inner.get(k)?;
91            self.stats
92                .as_ref()
93                .map(BlockstoreReadCacheStats::track_miss);
94            if let Some(block) = &block {
95                self.cache.put(*k, block.clone());
96            }
97            Ok(block)
98        }
99    }
100
101    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
102        self.inner.put_keyed(k, block)
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use crate::{db::MemoryDB, utils::rand::forest_rng};
110    use fvm_ipld_encoding::DAG_CBOR;
111    use multihash_codetable::Code::Blake2b256;
112    use multihash_codetable::MultihashDigest as _;
113    use rand::Rng as _;
114    use std::sync::Arc;
115
116    #[test]
117    fn test_blockstore_read_cache() {
118        const N_RECORDS: usize = 4;
119        const CACHE_SIZE: usize = 2;
120        let mem_db = Arc::new(MemoryDB::default());
121        let mut records = Vec::with_capacity(N_RECORDS);
122        for _ in 0..N_RECORDS {
123            let mut record = [0; 1024];
124            forest_rng().fill(&mut record);
125            let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
126            mem_db.put_keyed(&key, &record).unwrap();
127            records.push((key, record));
128        }
129        let cache = Arc::new(LruBlockstoreReadCache::new_without_metrics_registry(
130            "test_blockstore_read_cache".into(),
131            CACHE_SIZE.try_into().unwrap(),
132        ));
133        let db = BlockstoreWithReadCache::new(
134            mem_db.clone(),
135            cache.clone(),
136            Some(DefaultBlockstoreReadCacheStats::default()),
137        );
138
139        assert_eq!(cache.len(), 0);
140        assert_eq!(db.stats().unwrap().hit(), 0);
141        assert_eq!(db.stats().unwrap().miss(), 0);
142
143        for (i, (k, v)) in records.iter().enumerate() {
144            assert_eq!(&db.get(k).unwrap().unwrap(), v);
145
146            assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
147            assert_eq!(db.stats().unwrap().hit(), i);
148            assert_eq!(db.stats().unwrap().miss(), i + 1);
149
150            assert_eq!(&db.get(k).unwrap().unwrap(), v);
151
152            assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
153            assert_eq!(db.stats().unwrap().hit(), i + 1);
154            assert_eq!(db.stats().unwrap().miss(), i + 1);
155        }
156
157        let (k0, v0) = &records[0];
158
159        assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
160
161        assert_eq!(cache.len(), CACHE_SIZE);
162        assert_eq!(db.stats().unwrap().hit(), 4);
163        assert_eq!(db.stats().unwrap().miss(), 5);
164
165        assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
166
167        assert_eq!(cache.len(), CACHE_SIZE);
168        assert_eq!(db.stats().unwrap().hit(), 5);
169        assert_eq!(db.stats().unwrap().miss(), 5);
170    }
171}