1use std::hash::Hasher;
4use std::sync::atomic::AtomicU64;
5
6use rustc_hash::FxHasher;
7use tokio::sync::{Mutex, RwLock, RwLockReadGuard};
8use tracing::*;
9
10#[derive(Debug, thiserror::Error)]
11pub enum CacheError<E> {
12 #[error("Cache with too few blocks ({0})")]
13 TooFewBlocks(usize),
14 #[error("Position out of bounds")]
15 OutOfBounds,
16 #[error("Unexpected empty block")]
17 UnexpectedEmpty,
18 #[error("Block insertion error: {0}")]
19 Insert(E),
20}
21
22pub struct Block {
24 pub data: bytes::BytesMut,
25 pub pos: Option<u64>,
26}
27#[async_trait::async_trait]
29pub trait DataBlockCache<InsertError>: Send + Sync + std::fmt::Display + Sized + 'static {
30 fn new(
33 capacity_mb: u64,
34 block_size: u64,
35 total_size: u64,
36 ) -> Result<Self, CacheError<InsertError>>;
37 async fn get(&self, pos: u64) -> Option<RwLockReadGuard<Block>>;
39 async fn insert_lock<A: AsRef<[u8]> + Send>(
41 &self,
42 pos: u64,
43 f: impl std::future::Future<Output = Result<A, InsertError>> + Send,
44 ) -> Result<RwLockReadGuard<Block>, CacheError<InsertError>>;
45 fn stats(&self) -> &CacheStats;
47}
48
49#[derive(Default)]
51pub struct CacheStats {
52 pub capacity_blocks: usize,
53 pub capacity_mb: u64,
54 pub block_size: u64,
55 pub hits: AtomicU64,
56 pub misses: AtomicU64,
57}
58impl std::fmt::Display for CacheStats {
59 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
60 let hits = self.hits.load(std::sync::atomic::Ordering::SeqCst);
61 let misses = self.misses.load(std::sync::atomic::Ordering::SeqCst);
62 let misses_excluding = misses
63 .checked_sub(self.capacity_blocks as u64)
64 .unwrap_or_default();
65 write!(
66 f,
67 "Cache of capacity {} blocks of {:.2} MB (total {} MB). {} hits, {} hits excluding capacity",
68 self.capacity_blocks,
69 self.block_size as f64 / 1e6,
70 self.capacity_mb,
71 crate::utils::OutOf::new(hits, hits + misses).display_full(),
72 crate::utils::OutOf::new(hits, hits + misses_excluding).display_full(),
73 )
74 }
75}
76
77impl CacheStats {
78 pub fn add_hit(&self) {
79 self.hits.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
80 }
81 pub fn add_miss(&self) {
82 self.misses
83 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
84 }
85 pub fn remove_miss(&self) {
86 self.misses
87 .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
88 }
89}
90
91pub struct LRUCache {
94 lru: Mutex<lru::LruCache<u64, ()>>,
95 data: Vec<RwLock<Option<Block>>>,
96 stats: CacheStats,
97}
98impl std::fmt::Display for LRUCache {
99 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
100 write!(f, "{}", self.stats)
101 }
102}
103
104#[async_trait::async_trait]
105impl<E> DataBlockCache<E> for LRUCache {
106 fn new(capacity_mb: u64, block_size: u64, total_size: u64) -> Result<Self, CacheError<E>> {
107 let capacity_blocks = (capacity_mb as f64 / block_size as f64 * 1e6).ceil() as usize;
108 let total_blocks = (total_size as f64 / block_size as f64).ceil() as usize;
109 debug!(
110 capacity_blocks,
111 capacity_mb, block_size, "Allocating LRU cache"
112 );
113 if capacity_blocks < 4 {
114 return Err(CacheError::TooFewBlocks(capacity_blocks));
115 }
116
117 Ok(Self {
118 stats: CacheStats {
119 capacity_blocks,
120 capacity_mb,
121 block_size,
122 ..Default::default()
123 },
124 data: (0..total_blocks).map(|_| None).map(RwLock::new).collect(),
125 lru: Mutex::new(lru::LruCache::<u64, ()>::unbounded()),
126 })
127 }
128 async fn get(&self, pos: u64) -> Option<RwLockReadGuard<Block>> {
129 assert!((pos as usize) < self.data.len());
130 {
131 let mut lru = self.lru.lock().await;
132 lru.get(&pos);
133 }
134 let data = self.data[pos as usize].read().await;
135 if data.is_some() {
136 self.stats.add_hit();
137 return Some(RwLockReadGuard::map(data, |x| x.as_ref().unwrap()));
138 }
139 self.stats.add_miss();
140 None
141 }
142 async fn insert_lock<A: AsRef<[u8]> + Send>(
143 &self,
144 pos: u64,
145 f: impl std::future::Future<Output = Result<A, E>> + Send,
146 ) -> Result<RwLockReadGuard<Block>, CacheError<E>> {
147 if (pos as usize) >= self.data.len() {
148 return Err(CacheError::OutOfBounds);
149 }
150 let mut block = self.data[pos as usize].write().await;
151 if block.is_some() {
152 self.stats.remove_miss();
153 return Ok(RwLockReadGuard::map(block.downgrade(), |f| {
154 f.as_ref().unwrap()
155 }));
156 }
157
158 let buf = f.await.map_err(|e| CacheError::Insert(e))?;
159 let buf = buf.as_ref();
160
161 let mut lru = self.lru.lock().await;
162 if lru.len() < self.stats.capacity_blocks {
163 lru.put(pos, ());
164 let mut data = bytes::BytesMut::with_capacity(self.stats.block_size as usize);
165 data.resize(buf.len(), 0);
166 data.copy_from_slice(buf);
167 *block = Some(Block { data, pos: None });
168 } else {
169 let (old, _) = lru.pop_lru().unwrap();
170 let mut block_old = self.data[old as usize].write().await;
171 lru.put(pos, ());
172 let mut block_old = block_old.take().ok_or(CacheError::UnexpectedEmpty)?;
173 block_old.data.clear();
174 block_old.data.resize(buf.len(), 0);
175 block_old.data.copy_from_slice(buf);
176
177 *block = Some(block_old);
178 }
179 return Ok(RwLockReadGuard::map(block.downgrade(), |f| {
180 f.as_ref().unwrap()
181 }));
182 }
183 fn stats(&self) -> &CacheStats {
184 &self.stats
185 }
186}
187
188pub struct IndexCache {
193 data: Vec<RwLock<Block>>,
194 collisions: AtomicU64,
195 pub stats: CacheStats,
196 use_hash: bool,
197}
198
199impl IndexCache {
200 fn key(&self, pos: u64) -> usize {
201 if self.use_hash {
202 pos as usize % self.stats.capacity_blocks
203 } else {
204 let mut hasher = FxHasher::default();
205 hasher.write_u64(pos);
206 hasher.finish() as usize % self.stats.capacity_blocks
207 }
208 }
209}
210impl std::fmt::Display for IndexCache {
211 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
212 let collisions = self.collisions.load(std::sync::atomic::Ordering::SeqCst);
213 write!(f, "{}, {} collisions", self.stats, collisions)
214 }
215}
216impl std::fmt::Debug for IndexCache {
217 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
218 write!(f, "{}", self)
219 }
220}
221
222#[async_trait::async_trait]
223impl<E> DataBlockCache<E> for IndexCache {
224 fn new(capacity_mb: u64, block_size: u64, _total_size: u64) -> Result<Self, CacheError<E>> {
225 let capacity_blocks = (capacity_mb as f64 / block_size as f64 * 1e6).ceil() as usize;
226 debug!(capacity_blocks, capacity_mb, "Allocating index cache",);
227 if capacity_blocks < 4 {
228 return Err(CacheError::TooFewBlocks(capacity_blocks));
229 }
230
231 Ok(Self {
232 stats: CacheStats {
233 capacity_mb,
234 capacity_blocks,
235 block_size,
236 ..Default::default()
237 },
238 data: (0..capacity_blocks)
239 .map(|_| bytes::BytesMut::zeroed(block_size as usize))
240 .map(|data| Block { pos: None, data })
241 .map(RwLock::new)
242 .collect(),
243 collisions: Default::default(),
244 use_hash: false,
245 })
246 }
247 fn stats(&self) -> &CacheStats {
248 &self.stats
249 }
250 async fn get(&self, pos: u64) -> Option<RwLockReadGuard<Block>> {
251 let data = self.data[self.key(pos)].read().await;
252 if data.pos == Some(pos) {
253 self.stats.add_hit();
254 return Some(data);
255 }
256 self.stats.add_miss();
257 None
258 }
259 async fn insert_lock<A: AsRef<[u8]> + Send>(
260 &self,
261 pos: u64,
262 f: impl std::future::Future<Output = Result<A, E>> + Send,
263 ) -> Result<RwLockReadGuard<Block>, CacheError<E>> {
264 let key = self.key(pos);
265 let mut block = self.data[key].write().await;
266 match block.pos {
267 Some(pos2) if pos2 == pos => {
268 self.stats.remove_miss();
269 return Ok(block.downgrade());
270 }
271 Some(_) => {
272 self.collisions
273 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
274 }
275 None => {}
276 }
277
278 block.pos = Some(pos);
279 let buf = f.await.map_err(|e| CacheError::Insert(e))?;
280 let buf = buf.as_ref();
281 block.data.resize(buf.len(), 0);
282 block.data.copy_from_slice(buf);
283 return Ok(block.downgrade());
284 }
285}