1use crate::error::{AmateRSError, ErrorContext, Result};
7use parking_lot::RwLock;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13pub struct BlockCacheKey {
14 pub sstable_path: String,
16 pub block_index: usize,
18}
19
20impl BlockCacheKey {
21 pub fn new(sstable_path: String, block_index: usize) -> Self {
23 Self {
24 sstable_path,
25 block_index,
26 }
27 }
28}
29
30#[derive(Debug, Clone)]
32pub struct CachedBlock {
33 pub data: Arc<Vec<u8>>,
35 pub size: usize,
37}
38
39impl CachedBlock {
40 pub fn new(data: Vec<u8>) -> Self {
42 let size = data.len();
43 Self {
44 data: Arc::new(data),
45 size,
46 }
47 }
48
49 pub fn as_slice(&self) -> &[u8] {
51 &self.data
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct BlockCacheConfig {
58 pub max_size_bytes: usize,
60 pub enable_stats: bool,
62}
63
64impl Default for BlockCacheConfig {
65 fn default() -> Self {
66 Self {
67 max_size_bytes: 128 * 1024 * 1024, enable_stats: true,
69 }
70 }
71}
72
73#[derive(Debug, Clone, Default)]
75pub struct CacheStats {
76 pub hits: u64,
78 pub misses: u64,
80 pub evictions: u64,
82 pub block_count: usize,
84 pub size_bytes: usize,
86}
87
88impl CacheStats {
89 pub fn hit_rate(&self) -> f64 {
91 let total = self.hits + self.misses;
92 if total == 0 {
93 0.0
94 } else {
95 self.hits as f64 / total as f64
96 }
97 }
98
99 pub fn miss_rate(&self) -> f64 {
101 1.0 - self.hit_rate()
102 }
103}
104
105struct CacheEntry {
107 key: BlockCacheKey,
108 block: CachedBlock,
109}
110
111pub struct BlockCache {
116 config: BlockCacheConfig,
118 cache: Arc<RwLock<HashMap<BlockCacheKey, CachedBlock>>>,
120 lru_order: Arc<RwLock<VecDeque<BlockCacheKey>>>,
122 current_size: Arc<RwLock<usize>>,
124 stats: Arc<RwLock<CacheStats>>,
126}
127
128impl BlockCache {
129 pub fn new() -> Self {
131 Self::with_config(BlockCacheConfig::default())
132 }
133
134 pub fn with_config(config: BlockCacheConfig) -> Self {
136 Self {
137 config,
138 cache: Arc::new(RwLock::new(HashMap::new())),
139 lru_order: Arc::new(RwLock::new(VecDeque::new())),
140 current_size: Arc::new(RwLock::new(0)),
141 stats: Arc::new(RwLock::new(CacheStats::default())),
142 }
143 }
144
145 pub fn get(&self, key: &BlockCacheKey) -> Option<CachedBlock> {
147 let block = {
149 let cache = self.cache.read();
150 cache.get(key).cloned()
151 };
152
153 if let Some(ref block) = block {
155 self.touch(key);
157
158 if self.config.enable_stats {
160 let mut stats = self.stats.write();
161 stats.hits += 1;
162 }
163
164 Some(block.clone())
165 } else {
166 if self.config.enable_stats {
168 let mut stats = self.stats.write();
169 stats.misses += 1;
170 }
171
172 None
173 }
174 }
175
176 pub fn put(&self, key: BlockCacheKey, block: CachedBlock) -> Result<()> {
178 let block_size = block.size;
179
180 self.evict_if_needed(block_size)?;
182
183 let (new_block_count, new_size_bytes) = {
185 let mut cache = self.cache.write();
186 let mut lru_order = self.lru_order.write();
187 let mut current_size = self.current_size.write();
188
189 if let Some(old_block) = cache.remove(&key) {
191 *current_size -= old_block.size;
192 lru_order.retain(|k| k != &key);
194 }
195
196 cache.insert(key.clone(), block);
198 lru_order.push_back(key);
199 *current_size += block_size;
200
201 (cache.len(), *current_size)
203 };
204
205 if self.config.enable_stats {
207 let mut stats = self.stats.write();
208 stats.block_count = new_block_count;
209 stats.size_bytes = new_size_bytes;
210 }
211
212 Ok(())
213 }
214
215 fn touch(&self, key: &BlockCacheKey) {
217 let mut lru_order = self.lru_order.write();
218
219 lru_order.retain(|k| k != key);
221
222 lru_order.push_back(key.clone());
224 }
225
226 fn evict_if_needed(&self, new_block_size: usize) -> Result<()> {
228 if new_block_size > self.config.max_size_bytes {
229 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
230 "Block size {} exceeds cache size {}",
231 new_block_size, self.config.max_size_bytes
232 ))));
233 }
234
235 let current_size = *self.current_size.read();
236 let mut size_to_free =
237 (current_size + new_block_size).saturating_sub(self.config.max_size_bytes);
238
239 while size_to_free > 0 {
240 let (evicted_size, should_update_stats) = {
242 let mut cache = self.cache.write();
243 let mut lru_order = self.lru_order.write();
244 let mut current_size = self.current_size.write();
245
246 if let Some(key) = lru_order.front().cloned() {
248 if let Some(block) = cache.remove(&key) {
249 lru_order.pop_front();
250 *current_size -= block.size;
251 (block.size, self.config.enable_stats)
252 } else {
253 (0, false)
254 }
255 } else {
256 (0, false)
258 }
259 };
260
261 if evicted_size == 0 {
262 break;
264 }
265
266 if should_update_stats {
268 let mut stats = self.stats.write();
269 stats.evictions += 1;
270 }
271
272 if evicted_size >= size_to_free {
273 size_to_free = 0;
274 } else {
275 size_to_free -= evicted_size;
276 }
277 }
278
279 Ok(())
280 }
281
282 pub fn clear(&self) {
284 let mut cache = self.cache.write();
285 let mut lru_order = self.lru_order.write();
286 let mut current_size = self.current_size.write();
287
288 cache.clear();
289 lru_order.clear();
290 *current_size = 0;
291
292 if self.config.enable_stats {
293 let mut stats = self.stats.write();
294 stats.block_count = 0;
295 stats.size_bytes = 0;
296 }
297 }
298
299 pub fn stats(&self) -> CacheStats {
301 self.stats.read().clone()
302 }
303
304 pub fn current_size(&self) -> usize {
306 *self.current_size.read()
307 }
308
309 pub fn block_count(&self) -> usize {
311 self.cache.read().len()
312 }
313
314 pub fn contains(&self, key: &BlockCacheKey) -> bool {
316 self.cache.read().contains_key(key)
317 }
318
319 pub fn remove(&self, key: &BlockCacheKey) -> Option<CachedBlock> {
321 let mut cache = self.cache.write();
322 let mut lru_order = self.lru_order.write();
323 let mut current_size = self.current_size.write();
324
325 if let Some(block) = cache.remove(key) {
326 lru_order.retain(|k| k != key);
327 *current_size -= block.size;
328
329 if self.config.enable_stats {
330 let mut stats = self.stats.write();
331 stats.block_count = cache.len();
332 stats.size_bytes = *current_size;
333 }
334
335 Some(block)
336 } else {
337 None
338 }
339 }
340
341 pub fn invalidate_sstable(&self, sstable_path: &str) {
343 let mut cache = self.cache.write();
344 let mut lru_order = self.lru_order.write();
345 let mut current_size = self.current_size.write();
346
347 let keys_to_remove: Vec<BlockCacheKey> = cache
349 .keys()
350 .filter(|k| k.sstable_path == sstable_path)
351 .cloned()
352 .collect();
353
354 for key in keys_to_remove {
356 if let Some(block) = cache.remove(&key) {
357 *current_size -= block.size;
358 lru_order.retain(|k| k != &key);
359 }
360 }
361
362 if self.config.enable_stats {
363 let mut stats = self.stats.write();
364 stats.block_count = cache.len();
365 stats.size_bytes = *current_size;
366 }
367 }
368}
369
370impl Default for BlockCache {
371 fn default() -> Self {
372 Self::new()
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 #[test]
381 fn test_block_cache_basic() -> Result<()> {
382 let cache = BlockCache::new();
383
384 let key = BlockCacheKey::new("test.sst".to_string(), 0);
385 let block = CachedBlock::new(vec![1, 2, 3, 4, 5]);
386
387 assert!(cache.get(&key).is_none());
389
390 cache.put(key.clone(), block.clone())?;
392
393 let retrieved = cache.get(&key).expect("Block should be in cache after put");
395 assert_eq!(retrieved.as_slice(), &[1, 2, 3, 4, 5]);
396
397 Ok(())
398 }
399
400 #[test]
401 fn test_block_cache_lru_eviction() -> Result<()> {
402 let config = BlockCacheConfig {
403 max_size_bytes: 100,
404 enable_stats: true,
405 };
406 let cache = BlockCache::with_config(config);
407
408 for i in 0..5 {
410 let key = BlockCacheKey::new("test.sst".to_string(), i);
411 let block = CachedBlock::new(vec![0u8; 30]); cache.put(key, block)?;
413 }
414
415 assert!(cache.current_size() <= 100);
417
418 let key0 = BlockCacheKey::new("test.sst".to_string(), 0);
420 let key1 = BlockCacheKey::new("test.sst".to_string(), 1);
421 assert!(cache.get(&key0).is_none());
422 assert!(cache.get(&key1).is_none());
423
424 let key4 = BlockCacheKey::new("test.sst".to_string(), 4);
426 assert!(cache.get(&key4).is_some());
427
428 Ok(())
429 }
430
431 #[test]
432 fn test_block_cache_touch() -> Result<()> {
433 let config = BlockCacheConfig {
434 max_size_bytes: 100,
435 enable_stats: true,
436 };
437 let cache = BlockCache::with_config(config);
438
439 for i in 0..3 {
441 let key = BlockCacheKey::new("test.sst".to_string(), i);
442 let block = CachedBlock::new(vec![0u8; 30]);
443 cache.put(key, block)?;
444 }
445
446 let key0 = BlockCacheKey::new("test.sst".to_string(), 0);
448 cache.get(&key0);
449
450 let key3 = BlockCacheKey::new("test.sst".to_string(), 3);
452 let block3 = CachedBlock::new(vec![0u8; 30]);
453 cache.put(key3, block3)?;
454
455 assert!(cache.get(&key0).is_some());
457
458 let key1 = BlockCacheKey::new("test.sst".to_string(), 1);
460 assert!(cache.get(&key1).is_none());
461
462 Ok(())
463 }
464
465 #[test]
466 fn test_block_cache_stats() -> Result<()> {
467 let cache = BlockCache::new();
468
469 let key = BlockCacheKey::new("test.sst".to_string(), 0);
470 let block = CachedBlock::new(vec![1, 2, 3]);
471
472 cache.get(&key);
474
475 cache.put(key.clone(), block)?;
477
478 cache.get(&key);
480 cache.get(&key);
481
482 let stats = cache.stats();
483 assert_eq!(stats.hits, 2);
484 assert_eq!(stats.misses, 1);
485 assert_eq!(stats.hit_rate(), 2.0 / 3.0);
486
487 Ok(())
488 }
489
490 #[test]
491 fn test_block_cache_clear() -> Result<()> {
492 let cache = BlockCache::new();
493
494 for i in 0..5 {
495 let key = BlockCacheKey::new("test.sst".to_string(), i);
496 let block = CachedBlock::new(vec![0u8; 100]);
497 cache.put(key, block)?;
498 }
499
500 assert!(cache.block_count() > 0);
501 assert!(cache.current_size() > 0);
502
503 cache.clear();
504
505 assert_eq!(cache.block_count(), 0);
506 assert_eq!(cache.current_size(), 0);
507
508 Ok(())
509 }
510
511 #[test]
512 fn test_block_cache_remove() -> Result<()> {
513 let cache = BlockCache::new();
514
515 let key = BlockCacheKey::new("test.sst".to_string(), 0);
516 let block = CachedBlock::new(vec![1, 2, 3]);
517
518 cache.put(key.clone(), block)?;
519 assert!(cache.contains(&key));
520
521 cache.remove(&key);
522 assert!(!cache.contains(&key));
523
524 Ok(())
525 }
526
527 #[test]
528 fn test_block_cache_invalidate_sstable() -> Result<()> {
529 let cache = BlockCache::new();
530
531 for i in 0..3 {
533 let key = BlockCacheKey::new("test1.sst".to_string(), i);
534 let block = CachedBlock::new(vec![0u8; 100]);
535 cache.put(key, block)?;
536 }
537
538 for i in 0..3 {
539 let key = BlockCacheKey::new("test2.sst".to_string(), i);
540 let block = CachedBlock::new(vec![0u8; 100]);
541 cache.put(key, block)?;
542 }
543
544 assert_eq!(cache.block_count(), 6);
545
546 cache.invalidate_sstable("test1.sst");
548
549 assert_eq!(cache.block_count(), 3);
550
551 let key1 = BlockCacheKey::new("test1.sst".to_string(), 0);
553 assert!(!cache.contains(&key1));
554
555 let key2 = BlockCacheKey::new("test2.sst".to_string(), 0);
557 assert!(cache.contains(&key2));
558
559 Ok(())
560 }
561
562 #[test]
563 fn test_block_cache_concurrent() -> Result<()> {
564 use std::sync::Arc;
565 use std::thread;
566
567 let cache = Arc::new(BlockCache::new());
568 let mut handles = vec![];
569
570 for thread_id in 0..4 {
572 let cache = Arc::clone(&cache);
573 let handle = thread::spawn(move || {
574 for i in 0..100 {
575 let key = BlockCacheKey::new(format!("test_{}.sst", thread_id), i);
576 let block = CachedBlock::new(vec![thread_id as u8; 100]);
577 cache
578 .put(key.clone(), block)
579 .expect("Cache put should succeed in concurrent test");
580 cache.get(&key);
581 }
582 });
583 handles.push(handle);
584 }
585
586 for handle in handles {
587 handle.join().expect("Thread should complete successfully");
588 }
589
590 assert!(cache.block_count() > 0);
592 let stats = cache.stats();
593 assert!(stats.hits > 0);
594
595 Ok(())
596 }
597}