1use crate::error::{AmateRSError, ErrorContext, Result};
7use parking_lot::RwLock;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13pub struct BlockCacheKey {
14 pub sstable_path: String,
16 pub block_index: usize,
18}
19
20impl BlockCacheKey {
21 pub fn new(sstable_path: String, block_index: usize) -> Self {
23 Self {
24 sstable_path,
25 block_index,
26 }
27 }
28}
29
30#[derive(Debug, Clone)]
32pub struct CachedBlock {
33 pub data: Arc<Vec<u8>>,
35 pub size: usize,
37}
38
39impl CachedBlock {
40 pub fn new(data: Vec<u8>) -> Self {
42 let size = data.len();
43 Self {
44 data: Arc::new(data),
45 size,
46 }
47 }
48
49 pub fn as_slice(&self) -> &[u8] {
51 &self.data
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct BlockCacheConfig {
58 pub max_size_bytes: usize,
60 pub enable_stats: bool,
62}
63
64impl Default for BlockCacheConfig {
65 fn default() -> Self {
66 Self {
67 max_size_bytes: 128 * 1024 * 1024, enable_stats: true,
69 }
70 }
71}
72
73#[derive(Debug, Clone, Default)]
75pub struct CacheStats {
76 pub hits: u64,
78 pub misses: u64,
80 pub evictions: u64,
82 pub block_count: usize,
84 pub size_bytes: usize,
86}
87
88impl CacheStats {
89 pub fn hit_rate(&self) -> f64 {
91 let total = self.hits + self.misses;
92 if total == 0 {
93 0.0
94 } else {
95 self.hits as f64 / total as f64
96 }
97 }
98
99 pub fn miss_rate(&self) -> f64 {
101 1.0 - self.hit_rate()
102 }
103}
104
105struct CacheEntry {
107 key: BlockCacheKey,
108 block: CachedBlock,
109}
110
111pub struct BlockCache {
116 config: BlockCacheConfig,
118 cache: Arc<RwLock<HashMap<BlockCacheKey, CachedBlock>>>,
120 lru_order: Arc<RwLock<VecDeque<BlockCacheKey>>>,
122 current_size: Arc<RwLock<usize>>,
124 stats: Arc<RwLock<CacheStats>>,
126}
127
128impl BlockCache {
129 pub fn new() -> Self {
131 Self::with_config(BlockCacheConfig::default())
132 }
133
134 pub fn with_config(config: BlockCacheConfig) -> Self {
136 Self {
137 config,
138 cache: Arc::new(RwLock::new(HashMap::new())),
139 lru_order: Arc::new(RwLock::new(VecDeque::new())),
140 current_size: Arc::new(RwLock::new(0)),
141 stats: Arc::new(RwLock::new(CacheStats::default())),
142 }
143 }
144
145 pub fn get(&self, key: &BlockCacheKey) -> Option<CachedBlock> {
147 let block = {
149 let cache = self.cache.read();
150 cache.get(key).cloned()
151 };
152
153 if let Some(ref block) = block {
155 self.touch(key);
157
158 if self.config.enable_stats {
160 let mut stats = self.stats.write();
161 stats.hits += 1;
162 }
163
164 Some(block.clone())
165 } else {
166 if self.config.enable_stats {
168 let mut stats = self.stats.write();
169 stats.misses += 1;
170 }
171
172 None
173 }
174 }
175
176 pub fn put(&self, key: BlockCacheKey, block: CachedBlock) -> Result<()> {
178 let block_size = block.size;
179
180 self.evict_if_needed(block_size)?;
182
183 let (new_block_count, new_size_bytes) = {
185 let mut cache = self.cache.write();
186 let mut lru_order = self.lru_order.write();
187 let mut current_size = self.current_size.write();
188
189 if let Some(old_block) = cache.remove(&key) {
191 *current_size -= old_block.size;
192 lru_order.retain(|k| k != &key);
194 }
195
196 cache.insert(key.clone(), block);
198 lru_order.push_back(key);
199 *current_size += block_size;
200
201 (cache.len(), *current_size)
203 };
204
205 if self.config.enable_stats {
207 let mut stats = self.stats.write();
208 stats.block_count = new_block_count;
209 stats.size_bytes = new_size_bytes;
210 }
211
212 Ok(())
213 }
214
215 fn touch(&self, key: &BlockCacheKey) {
217 let mut lru_order = self.lru_order.write();
218
219 lru_order.retain(|k| k != key);
221
222 lru_order.push_back(key.clone());
224 }
225
226 fn evict_if_needed(&self, new_block_size: usize) -> Result<()> {
228 if new_block_size > self.config.max_size_bytes {
229 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
230 "Block size {} exceeds cache size {}",
231 new_block_size, self.config.max_size_bytes
232 ))));
233 }
234
235 let current_size = *self.current_size.read();
236 let mut size_to_free = if current_size + new_block_size > self.config.max_size_bytes {
237 current_size + new_block_size - self.config.max_size_bytes
238 } else {
239 0
240 };
241
242 while size_to_free > 0 {
243 let (evicted_size, should_update_stats) = {
245 let mut cache = self.cache.write();
246 let mut lru_order = self.lru_order.write();
247 let mut current_size = self.current_size.write();
248
249 if let Some(key) = lru_order.front().cloned() {
251 if let Some(block) = cache.remove(&key) {
252 lru_order.pop_front();
253 *current_size -= block.size;
254 (block.size, self.config.enable_stats)
255 } else {
256 (0, false)
257 }
258 } else {
259 (0, false)
261 }
262 };
263
264 if evicted_size == 0 {
265 break;
267 }
268
269 if should_update_stats {
271 let mut stats = self.stats.write();
272 stats.evictions += 1;
273 }
274
275 if evicted_size >= size_to_free {
276 size_to_free = 0;
277 } else {
278 size_to_free -= evicted_size;
279 }
280 }
281
282 Ok(())
283 }
284
285 pub fn clear(&self) {
287 let mut cache = self.cache.write();
288 let mut lru_order = self.lru_order.write();
289 let mut current_size = self.current_size.write();
290
291 cache.clear();
292 lru_order.clear();
293 *current_size = 0;
294
295 if self.config.enable_stats {
296 let mut stats = self.stats.write();
297 stats.block_count = 0;
298 stats.size_bytes = 0;
299 }
300 }
301
302 pub fn stats(&self) -> CacheStats {
304 self.stats.read().clone()
305 }
306
307 pub fn current_size(&self) -> usize {
309 *self.current_size.read()
310 }
311
312 pub fn block_count(&self) -> usize {
314 self.cache.read().len()
315 }
316
317 pub fn contains(&self, key: &BlockCacheKey) -> bool {
319 self.cache.read().contains_key(key)
320 }
321
322 pub fn remove(&self, key: &BlockCacheKey) -> Option<CachedBlock> {
324 let mut cache = self.cache.write();
325 let mut lru_order = self.lru_order.write();
326 let mut current_size = self.current_size.write();
327
328 if let Some(block) = cache.remove(key) {
329 lru_order.retain(|k| k != key);
330 *current_size -= block.size;
331
332 if self.config.enable_stats {
333 let mut stats = self.stats.write();
334 stats.block_count = cache.len();
335 stats.size_bytes = *current_size;
336 }
337
338 Some(block)
339 } else {
340 None
341 }
342 }
343
344 pub fn invalidate_sstable(&self, sstable_path: &str) {
346 let mut cache = self.cache.write();
347 let mut lru_order = self.lru_order.write();
348 let mut current_size = self.current_size.write();
349
350 let keys_to_remove: Vec<BlockCacheKey> = cache
352 .keys()
353 .filter(|k| k.sstable_path == sstable_path)
354 .cloned()
355 .collect();
356
357 for key in keys_to_remove {
359 if let Some(block) = cache.remove(&key) {
360 *current_size -= block.size;
361 lru_order.retain(|k| k != &key);
362 }
363 }
364
365 if self.config.enable_stats {
366 let mut stats = self.stats.write();
367 stats.block_count = cache.len();
368 stats.size_bytes = *current_size;
369 }
370 }
371}
372
373impl Default for BlockCache {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn test_block_cache_basic() -> Result<()> {
385 let cache = BlockCache::new();
386
387 let key = BlockCacheKey::new("test.sst".to_string(), 0);
388 let block = CachedBlock::new(vec![1, 2, 3, 4, 5]);
389
390 assert!(cache.get(&key).is_none());
392
393 cache.put(key.clone(), block.clone())?;
395
396 let retrieved = cache.get(&key).expect("Block should be in cache after put");
398 assert_eq!(retrieved.as_slice(), &[1, 2, 3, 4, 5]);
399
400 Ok(())
401 }
402
403 #[test]
404 fn test_block_cache_lru_eviction() -> Result<()> {
405 let config = BlockCacheConfig {
406 max_size_bytes: 100,
407 enable_stats: true,
408 };
409 let cache = BlockCache::with_config(config);
410
411 for i in 0..5 {
413 let key = BlockCacheKey::new("test.sst".to_string(), i);
414 let block = CachedBlock::new(vec![0u8; 30]); cache.put(key, block)?;
416 }
417
418 assert!(cache.current_size() <= 100);
420
421 let key0 = BlockCacheKey::new("test.sst".to_string(), 0);
423 let key1 = BlockCacheKey::new("test.sst".to_string(), 1);
424 assert!(cache.get(&key0).is_none());
425 assert!(cache.get(&key1).is_none());
426
427 let key4 = BlockCacheKey::new("test.sst".to_string(), 4);
429 assert!(cache.get(&key4).is_some());
430
431 Ok(())
432 }
433
434 #[test]
435 fn test_block_cache_touch() -> Result<()> {
436 let config = BlockCacheConfig {
437 max_size_bytes: 100,
438 enable_stats: true,
439 };
440 let cache = BlockCache::with_config(config);
441
442 for i in 0..3 {
444 let key = BlockCacheKey::new("test.sst".to_string(), i);
445 let block = CachedBlock::new(vec![0u8; 30]);
446 cache.put(key, block)?;
447 }
448
449 let key0 = BlockCacheKey::new("test.sst".to_string(), 0);
451 cache.get(&key0);
452
453 let key3 = BlockCacheKey::new("test.sst".to_string(), 3);
455 let block3 = CachedBlock::new(vec![0u8; 30]);
456 cache.put(key3, block3)?;
457
458 assert!(cache.get(&key0).is_some());
460
461 let key1 = BlockCacheKey::new("test.sst".to_string(), 1);
463 assert!(cache.get(&key1).is_none());
464
465 Ok(())
466 }
467
468 #[test]
469 fn test_block_cache_stats() -> Result<()> {
470 let cache = BlockCache::new();
471
472 let key = BlockCacheKey::new("test.sst".to_string(), 0);
473 let block = CachedBlock::new(vec![1, 2, 3]);
474
475 cache.get(&key);
477
478 cache.put(key.clone(), block)?;
480
481 cache.get(&key);
483 cache.get(&key);
484
485 let stats = cache.stats();
486 assert_eq!(stats.hits, 2);
487 assert_eq!(stats.misses, 1);
488 assert_eq!(stats.hit_rate(), 2.0 / 3.0);
489
490 Ok(())
491 }
492
493 #[test]
494 fn test_block_cache_clear() -> Result<()> {
495 let cache = BlockCache::new();
496
497 for i in 0..5 {
498 let key = BlockCacheKey::new("test.sst".to_string(), i);
499 let block = CachedBlock::new(vec![0u8; 100]);
500 cache.put(key, block)?;
501 }
502
503 assert!(cache.block_count() > 0);
504 assert!(cache.current_size() > 0);
505
506 cache.clear();
507
508 assert_eq!(cache.block_count(), 0);
509 assert_eq!(cache.current_size(), 0);
510
511 Ok(())
512 }
513
514 #[test]
515 fn test_block_cache_remove() -> Result<()> {
516 let cache = BlockCache::new();
517
518 let key = BlockCacheKey::new("test.sst".to_string(), 0);
519 let block = CachedBlock::new(vec![1, 2, 3]);
520
521 cache.put(key.clone(), block)?;
522 assert!(cache.contains(&key));
523
524 cache.remove(&key);
525 assert!(!cache.contains(&key));
526
527 Ok(())
528 }
529
530 #[test]
531 fn test_block_cache_invalidate_sstable() -> Result<()> {
532 let cache = BlockCache::new();
533
534 for i in 0..3 {
536 let key = BlockCacheKey::new("test1.sst".to_string(), i);
537 let block = CachedBlock::new(vec![0u8; 100]);
538 cache.put(key, block)?;
539 }
540
541 for i in 0..3 {
542 let key = BlockCacheKey::new("test2.sst".to_string(), i);
543 let block = CachedBlock::new(vec![0u8; 100]);
544 cache.put(key, block)?;
545 }
546
547 assert_eq!(cache.block_count(), 6);
548
549 cache.invalidate_sstable("test1.sst");
551
552 assert_eq!(cache.block_count(), 3);
553
554 let key1 = BlockCacheKey::new("test1.sst".to_string(), 0);
556 assert!(!cache.contains(&key1));
557
558 let key2 = BlockCacheKey::new("test2.sst".to_string(), 0);
560 assert!(cache.contains(&key2));
561
562 Ok(())
563 }
564
565 #[test]
566 fn test_block_cache_concurrent() -> Result<()> {
567 use std::sync::Arc;
568 use std::thread;
569
570 let cache = Arc::new(BlockCache::new());
571 let mut handles = vec![];
572
573 for thread_id in 0..4 {
575 let cache = Arc::clone(&cache);
576 let handle = thread::spawn(move || {
577 for i in 0..100 {
578 let key = BlockCacheKey::new(format!("test_{}.sst", thread_id), i);
579 let block = CachedBlock::new(vec![thread_id as u8; 100]);
580 cache
581 .put(key.clone(), block)
582 .expect("Cache put should succeed in concurrent test");
583 cache.get(&key);
584 }
585 });
586 handles.push(handle);
587 }
588
589 for handle in handles {
590 handle.join().expect("Thread should complete successfully");
591 }
592
593 assert!(cache.block_count() > 0);
595 let stats = cache.stats();
596 assert!(stats.hits > 0);
597
598 Ok(())
599 }
600}