forest/db/
blockstore_with_read_cache.rs1use cid::Cid;
5use fvm_ipld_blockstore::Blockstore;
6use std::sync::atomic::{self, AtomicUsize};
7
8use crate::utils::{cache::SizeTrackingLruCache, get_size};
9
10#[auto_impl::auto_impl(&, Arc)]
11pub trait BlockstoreReadCache {
12 fn get(&self, k: &Cid) -> Option<Vec<u8>>;
13
14 fn put(&self, k: Cid, block: Vec<u8>);
15}
16
17pub type LruBlockstoreReadCache = SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>>;
18
19impl BlockstoreReadCache for SizeTrackingLruCache<get_size::CidWrapper, Vec<u8>> {
20 fn get(&self, k: &Cid) -> Option<Vec<u8>> {
21 self.get_cloned(&(*k).into())
22 }
23
24 fn put(&self, k: Cid, block: Vec<u8>) {
25 self.push(k.into(), block);
26 }
27}
28
29pub trait BlockstoreReadCacheStats {
30 fn hit(&self) -> usize;
31
32 fn track_hit(&self);
33
34 fn miss(&self) -> usize;
35
36 fn track_miss(&self);
37}
38
39#[derive(Debug, Default)]
40pub struct DefaultBlockstoreReadCacheStats {
41 hit: crossbeam_utils::CachePadded<AtomicUsize>,
42 miss: crossbeam_utils::CachePadded<AtomicUsize>,
43}
44
45impl BlockstoreReadCacheStats for DefaultBlockstoreReadCacheStats {
46 fn hit(&self) -> usize {
47 self.hit.load(atomic::Ordering::Relaxed)
48 }
49
50 fn track_hit(&self) {
51 self.hit.fetch_add(1, atomic::Ordering::Relaxed);
52 }
53
54 fn miss(&self) -> usize {
55 self.miss.load(atomic::Ordering::Relaxed)
56 }
57
58 fn track_miss(&self) {
59 self.miss.fetch_add(1, atomic::Ordering::Relaxed);
60 }
61}
62
63#[derive(derive_more::Constructor)]
64pub struct BlockstoreWithReadCache<
65 DB: Blockstore,
66 CACHE: BlockstoreReadCache,
67 STATS: BlockstoreReadCacheStats,
68> {
69 inner: DB,
70 cache: CACHE,
71 stats: Option<STATS>,
72}
73
74impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats>
75 BlockstoreWithReadCache<DB, CACHE, STATS>
76{
77 pub fn stats(&self) -> Option<&STATS> {
78 self.stats.as_ref()
79 }
80}
81
82impl<DB: Blockstore, CACHE: BlockstoreReadCache, STATS: BlockstoreReadCacheStats> Blockstore
83 for BlockstoreWithReadCache<DB, CACHE, STATS>
84{
85 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
86 if let Some(cached) = self.cache.get(k) {
87 self.stats.as_ref().map(BlockstoreReadCacheStats::track_hit);
88 Ok(Some(cached))
89 } else {
90 let block = self.inner.get(k)?;
91 self.stats
92 .as_ref()
93 .map(BlockstoreReadCacheStats::track_miss);
94 if let Some(block) = &block {
95 self.cache.put(*k, block.clone());
96 }
97 Ok(block)
98 }
99 }
100
101 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
102 self.inner.put_keyed(k, block)
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use crate::{db::MemoryDB, utils::rand::forest_rng};
110 use fvm_ipld_encoding::DAG_CBOR;
111 use multihash_codetable::Code::Blake2b256;
112 use multihash_codetable::MultihashDigest as _;
113 use rand::Rng as _;
114 use std::sync::Arc;
115
116 #[test]
117 fn test_blockstore_read_cache() {
118 const N_RECORDS: usize = 4;
119 const CACHE_SIZE: usize = 2;
120 let mem_db = Arc::new(MemoryDB::default());
121 let mut records = Vec::with_capacity(N_RECORDS);
122 for _ in 0..N_RECORDS {
123 let mut record = [0; 1024];
124 forest_rng().fill(&mut record);
125 let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
126 mem_db.put_keyed(&key, &record).unwrap();
127 records.push((key, record));
128 }
129 let cache = Arc::new(LruBlockstoreReadCache::new_without_metrics_registry(
130 "test_blockstore_read_cache".into(),
131 CACHE_SIZE.try_into().unwrap(),
132 ));
133 let db = BlockstoreWithReadCache::new(
134 mem_db.clone(),
135 cache.clone(),
136 Some(DefaultBlockstoreReadCacheStats::default()),
137 );
138
139 assert_eq!(cache.len(), 0);
140 assert_eq!(db.stats().unwrap().hit(), 0);
141 assert_eq!(db.stats().unwrap().miss(), 0);
142
143 for (i, (k, v)) in records.iter().enumerate() {
144 assert_eq!(&db.get(k).unwrap().unwrap(), v);
145
146 assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
147 assert_eq!(db.stats().unwrap().hit(), i);
148 assert_eq!(db.stats().unwrap().miss(), i + 1);
149
150 assert_eq!(&db.get(k).unwrap().unwrap(), v);
151
152 assert_eq!(cache.len(), CACHE_SIZE.min(i + 1));
153 assert_eq!(db.stats().unwrap().hit(), i + 1);
154 assert_eq!(db.stats().unwrap().miss(), i + 1);
155 }
156
157 let (k0, v0) = &records[0];
158
159 assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
160
161 assert_eq!(cache.len(), CACHE_SIZE);
162 assert_eq!(db.stats().unwrap().hit(), 4);
163 assert_eq!(db.stats().unwrap().miss(), 5);
164
165 assert_eq!(&db.get(k0).unwrap().unwrap(), v0);
166
167 assert_eq!(cache.len(), CACHE_SIZE);
168 assert_eq!(db.stats().unwrap().hit(), 5);
169 assert_eq!(db.stats().unwrap().miss(), 5);
170 }
171}