forest/db/
blockstore_with_read_cache.rs1use cid::Cid;
5use fvm_ipld_blockstore::Blockstore;
6use std::sync::{
7 Arc,
8 atomic::{self, AtomicUsize},
9};
10
11use crate::utils::{cache::SizeTrackingLruCache, get_size};
12
13pub trait BlockstoreReadCache {
14 fn get(&self, k: &Cid) -> Option<Vec<u8>>;
15
16 fn put(&self, k: Cid, block: Vec<u8>);
17}
18
19pub type LruBlockstoreReadCache = SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>>;
20
21impl BlockstoreReadCache for SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>> {
22 fn get(&self, k: &Cid) -> Option<Vec<u8>> {
23 self.get_cloned(&(*k).into())
24 }
25
26 fn put(&self, k: Cid, block: Vec<u8>) {
27 self.push(k.into(), block);
28 }
29}
30
31impl<T: BlockstoreReadCache> BlockstoreReadCache for Arc<T> {
32 fn get(&self, k: &Cid) -> Option<Vec<u8>> {
33 self.as_ref().get(k)
34 }
35
36 fn put(&self, k: Cid, block: Vec<u8>) {
37 self.as_ref().put(k, block)
38 }
39}
40
41pub trait BlockstoreReadCacheStats {
42 fn hit(&self) -> usize;
43
44 fn track_hit(&self);
45
46 fn miss(&self) -> usize;
47
48 fn track_miss(&self);
49}
50
51#[derive(Debug, Default)]
52pub struct DefaultBlockstoreReadCacheStats {
53 hit: AtomicUsize,
54 miss: AtomicUsize,
55}
56
57impl BlockstoreReadCacheStats for DefaultBlockstoreReadCacheStats {
58 fn hit(&self) -> usize {
59 self.hit.load(atomic::Ordering::Relaxed)
60 }
61
62 fn track_hit(&self) {
63 self.hit.fetch_add(1, atomic::Ordering::Relaxed);
64 }
65
66 fn miss(&self) -> usize {
67 self.miss.load(atomic::Ordering::Relaxed)
68 }
69
70 fn track_miss(&self) {
71 self.miss.fetch_add(1, atomic::Ordering::Relaxed);
72 }
73}
74
75pub struct BlockstoreWithReadCache<
76 DB: Blockstore,
77 CACHE: BlockstoreReadCache,
78 STATS: BlockstoreReadCacheStats,
79> {
80 inner: DB,
81 cache: CACHE,
82 stats: Option<STATS>,
83}
84
85impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats>
86 BlockstoreWithReadCache<DB, CACHE, STATS>
87{
88 pub fn new(db: DB, cache: CACHE, stats: Option<STATS>) -> Self {
89 Self {
90 inner: db,
91 cache,
92 stats,
93 }
94 }
95
96 pub fn stats(&self) -> Option<&STATS> {
97 self.stats.as_ref()
98 }
99}
100
101impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats> Blockstore
102 for BlockstoreWithReadCache<DB, CACHE, STATS>
103{
104 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
105 if let Some(cached) = self.cache.get(k) {
106 self.stats.as_ref().map(BlockstoreReadCacheStats::track_hit);
107 Ok(Some(cached))
108 } else {
109 let block = self.inner.get(k)?;
110 self.stats
111 .as_ref()
112 .map(BlockstoreReadCacheStats::track_miss);
113 if let Some(block) = &block {
114 self.cache.put(*k, block.clone());
115 }
116 Ok(block)
117 }
118 }
119
120 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
121 self.inner.put_keyed(k, block)
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::{db::MemoryDB, utils::rand::forest_rng};
129 use fvm_ipld_encoding::DAG_CBOR;
130 use multihash_codetable::Code::Blake2b256;
131 use multihash_codetable::MultihashDigest as _;
132 use rand::Rng as _;
133
134 #[test]
135 fn test_blockstore_read_cache() {
136 const N_RECORDS: usize = 4;
137 const CACHE_SIZE: usize = 2;
138 let mem_db = Arc::new(MemoryDB::default());
139 let mut records = Vec::with_capacity(N_RECORDS);
140 for _ in 0..N_RECORDS {
141 let mut record = [0; 1024];
142 forest_rng().fill(&mut record);
143 let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
144 mem_db.put_keyed(&key, &record).unwrap();
145 records.push((key, record));
146 }
147 let cache = Arc::new(LruBlockstoreReadCache::new_without_metrics_registry(
148 "test_blockstore_read_cache".into(),
149 CACHE_SIZE.try_into().unwrap(),
150 ));
151 let db = BlockstoreWithReadCache::new(
152 mem_db.clone(),
153 cache.clone(),
154 Some(DefaultBlockstoreReadCacheStats::default()),
155 );
156
157 assert_eq!(cache.len(), 0);
158 assert_eq!(db.stats().unwrap().hit(), 0);
159 assert_eq!(db.stats().unwrap().miss(), 0);
160
161 for (i, (k, v)) in records.iter().enumerate() {
162 assert_eq!(&db.get(k).unwrap().unwrap(), v);
163
164 assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
165 assert_eq!(db.stats().unwrap().hit(), i);
166 assert_eq!(db.stats().unwrap().miss(), i + 1);
167
168 assert_eq!(&db.get(k).unwrap().unwrap(), v);
169
170 assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
171 assert_eq!(db.stats().unwrap().hit(), i + 1);
172 assert_eq!(db.stats().unwrap().miss(), i + 1);
173 }
174
175 let (k0, v0) = &records[0];
176
177 assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
178
179 assert_eq!(cache.len(), CACHE_SIZE);
180 assert_eq!(db.stats().unwrap().hit(), 4);
181 assert_eq!(db.stats().unwrap().miss(), 5);
182
183 assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
184
185 assert_eq!(cache.len(), CACHE_SIZE);
186 assert_eq!(db.stats().unwrap().hit(), 5);
187 assert_eq!(db.stats().unwrap().miss(), 5);
188 }
189}