1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4
5use lru::LruCache;
6use parking_lot::{Condvar, Mutex};
7use smallvec::SmallVec;
8
9use crate::error::Result;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub struct ChunkKey {
17 pub dataset_addr: u64,
18 pub chunk_offsets: SmallVec<[u64; 4]>,
19}
20
21pub struct ChunkCache {
26 inner: Mutex<ChunkCacheState>,
27 max_bytes: usize,
28 max_slots: usize,
29}
30
31struct ChunkCacheState {
32 cache: LruCache<ChunkKey, Arc<Vec<u8>>>,
33 current_bytes: usize,
34 in_flight: HashMap<ChunkKey, Arc<InFlightLoad>>,
35 hits: u64,
36 misses: u64,
37 inserts: u64,
38 evictions: u64,
39}
40
41struct InFlightLoad {
42 completed: Mutex<bool>,
43 ready: Condvar,
44}
45
46#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48pub struct ChunkCacheStats {
49 pub hits: u64,
50 pub misses: u64,
51 pub inserts: u64,
52 pub evictions: u64,
53 pub current_bytes: usize,
54 pub entries: usize,
55 pub in_flight: usize,
56 pub max_bytes: usize,
57 pub max_slots: usize,
58}
59
60impl ChunkCache {
61 pub fn new(max_bytes: usize, max_slots: usize) -> Self {
66 let slots = NonZeroUsize::new(max_slots).unwrap_or(NonZeroUsize::new(521).unwrap());
67 ChunkCache {
68 inner: Mutex::new(ChunkCacheState {
69 cache: LruCache::new(slots),
70 current_bytes: 0,
71 in_flight: HashMap::new(),
72 hits: 0,
73 misses: 0,
74 inserts: 0,
75 evictions: 0,
76 }),
77 max_bytes,
78 max_slots: slots.get(),
79 }
80 }
81
82 pub fn get(&self, key: &ChunkKey) -> Option<Arc<Vec<u8>>> {
84 let mut cache = self.inner.lock();
85 let result = cache.cache.get(key).cloned();
86 if result.is_some() {
87 cache.hits += 1;
88 } else {
89 cache.misses += 1;
90 }
91 result
92 }
93
94 pub fn insert(&self, key: ChunkKey, data: Vec<u8>) -> Arc<Vec<u8>> {
96 let data_len = data.len();
97 let arc = Arc::new(data);
98
99 if self.max_bytes == 0 || data_len > self.max_bytes {
100 return arc;
101 }
102
103 let mut state = self.inner.lock();
104 while state.current_bytes + data_len > self.max_bytes && !state.cache.is_empty() {
106 if let Some((_, evicted)) = state.cache.pop_lru() {
107 state.current_bytes = state.current_bytes.saturating_sub(evicted.len());
108 state.evictions += 1;
109 }
110 }
111
112 if let Some(replaced) = state.cache.peek(&key) {
113 state.current_bytes = state.current_bytes.saturating_sub(replaced.len());
114 }
115 state.current_bytes += data_len;
116 state.inserts += 1;
117 state.cache.put(key, arc.clone());
118
119 arc
120 }
121
122 pub fn get_or_insert_with<F>(&self, key: ChunkKey, load: F) -> Result<Arc<Vec<u8>>>
124 where
125 F: FnOnce() -> Result<Vec<u8>>,
126 {
127 loop {
128 let in_flight = {
129 let mut state = self.inner.lock();
130 if let Some(cached) = state.cache.get(&key).cloned() {
131 state.hits += 1;
132 return Ok(cached);
133 }
134 state.misses += 1;
135
136 if let Some(in_flight) = state.in_flight.get(&key) {
137 Arc::clone(in_flight)
138 } else {
139 let in_flight = Arc::new(InFlightLoad {
140 completed: Mutex::new(false),
141 ready: Condvar::new(),
142 });
143 state.in_flight.insert(key.clone(), Arc::clone(&in_flight));
144 drop(state);
145
146 let result = load().map(|data| self.insert(key.clone(), data));
147
148 let mut state = self.inner.lock();
149 state.in_flight.remove(&key);
150 let mut completed = in_flight.completed.lock();
151 *completed = true;
152 in_flight.ready.notify_all();
153
154 return result;
155 }
156 };
157
158 let mut completed = in_flight.completed.lock();
159 while !*completed {
160 in_flight.ready.wait(&mut completed);
161 }
162 }
163 }
164
165 pub fn stats(&self) -> ChunkCacheStats {
167 let state = self.inner.lock();
168 ChunkCacheStats {
169 hits: state.hits,
170 misses: state.misses,
171 inserts: state.inserts,
172 evictions: state.evictions,
173 current_bytes: state.current_bytes,
174 entries: state.cache.len(),
175 in_flight: state.in_flight.len(),
176 max_bytes: self.max_bytes,
177 max_slots: self.max_slots,
178 }
179 }
180}
181
182impl Default for ChunkCache {
183 fn default() -> Self {
184 Self::new(64 * 1024 * 1024, 521)
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn test_cache_insert_and_get() {
194 let cache = ChunkCache::new(1024, 10);
195 let key = ChunkKey {
196 dataset_addr: 100,
197 chunk_offsets: SmallVec::from_vec(vec![0, 0]),
198 };
199 cache.insert(key.clone(), vec![1, 2, 3]);
200 let val = cache.get(&key).unwrap();
201 assert_eq!(&**val, &[1, 2, 3]);
202 }
203
204 #[test]
205 fn test_cache_eviction() {
206 let cache = ChunkCache::new(10, 10); for i in 0..5 {
208 let key = ChunkKey {
209 dataset_addr: 100,
210 chunk_offsets: SmallVec::from_vec(vec![i]),
211 };
212 cache.insert(key, vec![0; 4]); }
214 let first_key = ChunkKey {
217 dataset_addr: 100,
218 chunk_offsets: SmallVec::from_vec(vec![0]),
219 };
220 assert!(cache.get(&first_key).is_none()); }
222
223 #[test]
224 fn test_cache_disabled_bypasses_storage() {
225 let cache = ChunkCache::new(0, 10);
226 let key = ChunkKey {
227 dataset_addr: 100,
228 chunk_offsets: SmallVec::from_vec(vec![0]),
229 };
230 cache.insert(key.clone(), vec![1, 2, 3]);
231 assert!(cache.get(&key).is_none());
232 }
233
234 #[test]
235 fn test_cache_promotes_on_get() {
236 let cache = ChunkCache::new(12, 10); let key_a = ChunkKey {
239 dataset_addr: 1,
240 chunk_offsets: SmallVec::from_vec(vec![0]),
241 };
242 let key_b = ChunkKey {
243 dataset_addr: 2,
244 chunk_offsets: SmallVec::from_vec(vec![0]),
245 };
246 let key_c = ChunkKey {
247 dataset_addr: 3,
248 chunk_offsets: SmallVec::from_vec(vec![0]),
249 };
250
251 cache.insert(key_a.clone(), vec![0; 4]); cache.insert(key_b.clone(), vec![0; 4]); cache.insert(key_c.clone(), vec![0; 4]); assert!(cache.get(&key_a).is_some()); let key_d = ChunkKey {
260 dataset_addr: 4,
261 chunk_offsets: SmallVec::from_vec(vec![0]),
262 };
263 cache.insert(key_d, vec![0; 4]); assert!(cache.get(&key_a).is_some()); assert!(cache.get(&key_b).is_none()); }
268
269 #[test]
270 fn test_cache_replacement_updates_accounting() {
271 let cache = ChunkCache::new(8, 10);
272 let key = ChunkKey {
273 dataset_addr: 100,
274 chunk_offsets: SmallVec::from_vec(vec![0]),
275 };
276
277 cache.insert(key.clone(), vec![1, 2, 3, 4]);
278 cache.insert(key.clone(), vec![5, 6]);
279
280 let other = ChunkKey {
281 dataset_addr: 100,
282 chunk_offsets: SmallVec::from_vec(vec![1]),
283 };
284 cache.insert(other.clone(), vec![7, 8, 9, 10]);
285
286 assert_eq!(&**cache.get(&key).unwrap(), &[5, 6]);
287 assert!(cache.get(&other).is_some());
288 }
289
290 #[test]
291 fn test_cache_get_or_insert_with_deduplicates_concurrent_loads() {
292 use std::sync::atomic::{AtomicUsize, Ordering};
293
294 let cache = Arc::new(ChunkCache::new(1024, 10));
295 let key = ChunkKey {
296 dataset_addr: 100,
297 chunk_offsets: SmallVec::from_vec(vec![0, 0]),
298 };
299 let load_count = Arc::new(AtomicUsize::new(0));
300
301 std::thread::scope(|scope| {
302 for _ in 0..8 {
303 let cache = Arc::clone(&cache);
304 let key = key.clone();
305 let load_count = Arc::clone(&load_count);
306 scope.spawn(move || {
307 let value = cache
308 .get_or_insert_with(key, || {
309 load_count.fetch_add(1, Ordering::SeqCst);
310 std::thread::sleep(std::time::Duration::from_millis(10));
311 Ok(vec![1, 2, 3, 4])
312 })
313 .unwrap();
314 assert_eq!(&*value, &[1, 2, 3, 4]);
315 });
316 }
317 });
318
319 assert_eq!(load_count.load(Ordering::SeqCst), 1);
320 }
321
322 #[test]
323 fn test_cache_stats_track_hits_misses_and_evictions() {
324 let cache = ChunkCache::new(8, 2);
325 let key_a = ChunkKey {
326 dataset_addr: 1,
327 chunk_offsets: SmallVec::from_vec(vec![0]),
328 };
329 let key_b = ChunkKey {
330 dataset_addr: 1,
331 chunk_offsets: SmallVec::from_vec(vec![1]),
332 };
333 let key_c = ChunkKey {
334 dataset_addr: 1,
335 chunk_offsets: SmallVec::from_vec(vec![2]),
336 };
337
338 assert!(cache.get(&key_a).is_none());
339 cache.insert(key_a.clone(), vec![1, 2, 3, 4]);
340 assert!(cache.get(&key_a).is_some());
341 cache.insert(key_b, vec![5, 6, 7, 8]);
342 cache.insert(key_c, vec![9, 10, 11, 12]);
343
344 let stats = cache.stats();
345 assert_eq!(stats.hits, 1);
346 assert_eq!(stats.misses, 1);
347 assert_eq!(stats.inserts, 3);
348 assert_eq!(stats.evictions, 1);
349 assert_eq!(stats.entries, 2);
350 assert_eq!(stats.current_bytes, 8);
351 assert_eq!(stats.max_bytes, 8);
352 assert_eq!(stats.max_slots, 2);
353 }
354}