forest/db/
blockstore_with_read_cache.rs1use cid::Cid;
5use fvm_ipld_blockstore::Blockstore;
6use std::sync::{
7 Arc,
8 atomic::{self, AtomicUsize},
9};
10
11use crate::utils::{cache::SizeTrackingLruCache, get_size};
12
13pub trait BlockstoreReadCache {
14 fn get(&self, k: &Cid) -> Option<Vec<u8>>;
15
16 fn put(&self, k: Cid, block: Vec<u8>);
17}
18
19pub type LruBlockstoreReadCache = SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>>;
20
21impl BlockstoreReadCache for SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>> {
22 fn get(&self, k: &Cid) -> Option<Vec<u8>> {
23 self.get_cloned(&(*k).into())
24 }
25
26 fn put(&self, k: Cid, block: Vec<u8>) {
27 self.push(k.into(), block);
28 }
29}
30
31impl<T: BlockstoreReadCache> BlockstoreReadCache for Arc<T> {
32 fn get(&self, k: &Cid) -> Option<Vec<u8>> {
33 self.as_ref().get(k)
34 }
35
36 fn put(&self, k: Cid, block: Vec<u8>) {
37 self.as_ref().put(k, block)
38 }
39}
40
41pub trait BlockstoreReadCacheStats {
42 fn hit(&self) -> usize;
43
44 fn track_hit(&self);
45
46 fn miss(&self) -> usize;
47
48 fn track_miss(&self);
49}
50
51#[derive(Debug, Default)]
52pub struct DefaultBlockstoreReadCacheStats {
53 hit: AtomicUsize,
54 miss: AtomicUsize,
55}
56
57impl BlockstoreReadCacheStats for DefaultBlockstoreReadCacheStats {
58 fn hit(&self) -> usize {
59 self.hit.load(atomic::Ordering::Relaxed)
60 }
61
62 fn track_hit(&self) {
63 self.hit.fetch_add(1, atomic::Ordering::Relaxed);
64 }
65
66 fn miss(&self) -> usize {
67 self.miss.load(atomic::Ordering::Relaxed)
68 }
69
70 fn track_miss(&self) {
71 self.miss.fetch_add(1, atomic::Ordering::Relaxed);
72 }
73}
74
75#[derive(derive_more::Constructor)]
76pub struct BlockstoreWithReadCache<
77 DB: Blockstore,
78 CACHE: BlockstoreReadCache,
79 STATS: BlockstoreReadCacheStats,
80> {
81 inner: DB,
82 cache: CACHE,
83 stats: Option<STATS>,
84}
85
86impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats>
87 BlockstoreWithReadCache<DB, CACHE, STATS>
88{
89 pub fn stats(&self) -> Option<&STATS> {
90 self.stats.as_ref()
91 }
92}
93
94impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats> Blockstore
95 for BlockstoreWithReadCache<DB, CACHE, STATS>
96{
97 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
98 if let Some(cached) = self.cache.get(k) {
99 self.stats.as_ref().map(BlockstoreReadCacheStats::track_hit);
100 Ok(Some(cached))
101 } else {
102 let block = self.inner.get(k)?;
103 self.stats
104 .as_ref()
105 .map(BlockstoreReadCacheStats::track_miss);
106 if let Some(block) = &block {
107 self.cache.put(*k, block.clone());
108 }
109 Ok(block)
110 }
111 }
112
113 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
114 self.inner.put_keyed(k, block)
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 use crate::{db::MemoryDB, utils::rand::forest_rng};
122 use fvm_ipld_encoding::DAG_CBOR;
123 use multihash_codetable::Code::Blake2b256;
124 use multihash_codetable::MultihashDigest as _;
125 use rand::Rng as _;
126
127 #[test]
128 fn test_blockstore_read_cache() {
129 const N_RECORDS: usize = 4;
130 const CACHE_SIZE: usize = 2;
131 let mem_db = Arc::new(MemoryDB::default());
132 let mut records = Vec::with_capacity(N_RECORDS);
133 for _ in 0..N_RECORDS {
134 let mut record = [0; 1024];
135 forest_rng().fill(&mut record);
136 let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
137 mem_db.put_keyed(&key, &record).unwrap();
138 records.push((key, record));
139 }
140 let cache = Arc::new(LruBlockstoreReadCache::new_without_metrics_registry(
141 "test_blockstore_read_cache".into(),
142 CACHE_SIZE.try_into().unwrap(),
143 ));
144 let db = BlockstoreWithReadCache::new(
145 mem_db.clone(),
146 cache.clone(),
147 Some(DefaultBlockstoreReadCacheStats::default()),
148 );
149
150 assert_eq!(cache.len(), 0);
151 assert_eq!(db.stats().unwrap().hit(), 0);
152 assert_eq!(db.stats().unwrap().miss(), 0);
153
154 for (i, (k, v)) in records.iter().enumerate() {
155 assert_eq!(&db.get(k).unwrap().unwrap(), v);
156
157 assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
158 assert_eq!(db.stats().unwrap().hit(), i);
159 assert_eq!(db.stats().unwrap().miss(), i + 1);
160
161 assert_eq!(&db.get(k).unwrap().unwrap(), v);
162
163 assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
164 assert_eq!(db.stats().unwrap().hit(), i + 1);
165 assert_eq!(db.stats().unwrap().miss(), i + 1);
166 }
167
168 let (k0, v0) = &records[0];
169
170 assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
171
172 assert_eq!(cache.len(), CACHE_SIZE);
173 assert_eq!(db.stats().unwrap().hit(), 4);
174 assert_eq!(db.stats().unwrap().miss(), 5);
175
176 assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
177
178 assert_eq!(cache.len(), CACHE_SIZE);
179 assert_eq!(db.stats().unwrap().hit(), 5);
180 assert_eq!(db.stats().unwrap().miss(), 5);
181 }
182}