amaters_core/storage/
block_cache.rs

1//! Block cache for SSTable blocks
2//!
3//! LRU (Least Recently Used) cache for caching SSTable data blocks in memory.
4//! Reduces disk I/O by keeping frequently accessed blocks in memory.
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use parking_lot::RwLock;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10
11/// Cache key identifying a specific block in a specific SSTable
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13pub struct BlockCacheKey {
14    /// SSTable file path
15    pub sstable_path: String,
16    /// Block index within the SSTable
17    pub block_index: usize,
18}
19
20impl BlockCacheKey {
21    /// Create a new cache key
22    pub fn new(sstable_path: String, block_index: usize) -> Self {
23        Self {
24            sstable_path,
25            block_index,
26        }
27    }
28}
29
30/// Cached block data
31#[derive(Debug, Clone)]
32pub struct CachedBlock {
33    /// The block data
34    pub data: Arc<Vec<u8>>,
35    /// Size in bytes
36    pub size: usize,
37}
38
39impl CachedBlock {
40    /// Create a new cached block
41    pub fn new(data: Vec<u8>) -> Self {
42        let size = data.len();
43        Self {
44            data: Arc::new(data),
45            size,
46        }
47    }
48
49    /// Get the block data as a slice
50    pub fn as_slice(&self) -> &[u8] {
51        &self.data
52    }
53}
54
55/// LRU block cache configuration
56#[derive(Debug, Clone)]
57pub struct BlockCacheConfig {
58    /// Maximum cache size in bytes
59    pub max_size_bytes: usize,
60    /// Whether to track cache statistics
61    pub enable_stats: bool,
62}
63
64impl Default for BlockCacheConfig {
65    fn default() -> Self {
66        Self {
67            max_size_bytes: 128 * 1024 * 1024, // 128 MB default
68            enable_stats: true,
69        }
70    }
71}
72
73/// Cache statistics
74#[derive(Debug, Clone, Default)]
75pub struct CacheStats {
76    /// Number of cache hits
77    pub hits: u64,
78    /// Number of cache misses
79    pub misses: u64,
80    /// Number of evictions
81    pub evictions: u64,
82    /// Current number of blocks in cache
83    pub block_count: usize,
84    /// Current cache size in bytes
85    pub size_bytes: usize,
86}
87
88impl CacheStats {
89    /// Calculate hit rate (0.0 to 1.0)
90    pub fn hit_rate(&self) -> f64 {
91        let total = self.hits + self.misses;
92        if total == 0 {
93            0.0
94        } else {
95            self.hits as f64 / total as f64
96        }
97    }
98
99    /// Calculate miss rate (0.0 to 1.0)
100    pub fn miss_rate(&self) -> f64 {
101        1.0 - self.hit_rate()
102    }
103}
104
105/// LRU cache entry
106struct CacheEntry {
107    key: BlockCacheKey,
108    block: CachedBlock,
109}
110
111/// LRU (Least Recently Used) block cache
112///
113/// Thread-safe cache using RwLock for concurrent read access.
114/// Evicts least recently used blocks when cache is full.
115pub struct BlockCache {
116    /// Configuration
117    config: BlockCacheConfig,
118    /// Cache entries (HashMap for O(1) lookups)
119    cache: Arc<RwLock<HashMap<BlockCacheKey, CachedBlock>>>,
120    /// LRU order (most recent at back)
121    lru_order: Arc<RwLock<VecDeque<BlockCacheKey>>>,
122    /// Current cache size in bytes
123    current_size: Arc<RwLock<usize>>,
124    /// Cache statistics
125    stats: Arc<RwLock<CacheStats>>,
126}
127
128impl BlockCache {
129    /// Create a new block cache with default configuration
130    pub fn new() -> Self {
131        Self::with_config(BlockCacheConfig::default())
132    }
133
134    /// Create a new block cache with custom configuration
135    pub fn with_config(config: BlockCacheConfig) -> Self {
136        Self {
137            config,
138            cache: Arc::new(RwLock::new(HashMap::new())),
139            lru_order: Arc::new(RwLock::new(VecDeque::new())),
140            current_size: Arc::new(RwLock::new(0)),
141            stats: Arc::new(RwLock::new(CacheStats::default())),
142        }
143    }
144
145    /// Get a block from the cache
146    pub fn get(&self, key: &BlockCacheKey) -> Option<CachedBlock> {
147        // First, check if block exists and clone it
148        let block = {
149            let cache = self.cache.read();
150            cache.get(key).cloned()
151        };
152
153        // Update LRU and stats after releasing cache lock
154        if let Some(ref block) = block {
155            // Update LRU order (move to back)
156            self.touch(key);
157
158            // Update statistics
159            if self.config.enable_stats {
160                let mut stats = self.stats.write();
161                stats.hits += 1;
162            }
163
164            Some(block.clone())
165        } else {
166            // Update statistics
167            if self.config.enable_stats {
168                let mut stats = self.stats.write();
169                stats.misses += 1;
170            }
171
172            None
173        }
174    }
175
176    /// Put a block into the cache
177    pub fn put(&self, key: BlockCacheKey, block: CachedBlock) -> Result<()> {
178        let block_size = block.size;
179
180        // Check if we need to evict blocks to make room
181        self.evict_if_needed(block_size)?;
182
183        // Insert into cache
184        let (new_block_count, new_size_bytes) = {
185            let mut cache = self.cache.write();
186            let mut lru_order = self.lru_order.write();
187            let mut current_size = self.current_size.write();
188
189            // Remove old entry if exists
190            if let Some(old_block) = cache.remove(&key) {
191                *current_size -= old_block.size;
192                // Remove from LRU order
193                lru_order.retain(|k| k != &key);
194            }
195
196            // Insert new entry
197            cache.insert(key.clone(), block);
198            lru_order.push_back(key);
199            *current_size += block_size;
200
201            // Return stats while we have the locks
202            (cache.len(), *current_size)
203        };
204
205        // Update statistics after releasing locks
206        if self.config.enable_stats {
207            let mut stats = self.stats.write();
208            stats.block_count = new_block_count;
209            stats.size_bytes = new_size_bytes;
210        }
211
212        Ok(())
213    }
214
215    /// Touch a key (move to most recent position)
216    fn touch(&self, key: &BlockCacheKey) {
217        let mut lru_order = self.lru_order.write();
218
219        // Remove from current position
220        lru_order.retain(|k| k != key);
221
222        // Add to back (most recent)
223        lru_order.push_back(key.clone());
224    }
225
226    /// Evict blocks if needed to make room for new block
227    fn evict_if_needed(&self, new_block_size: usize) -> Result<()> {
228        if new_block_size > self.config.max_size_bytes {
229            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
230                "Block size {} exceeds cache size {}",
231                new_block_size, self.config.max_size_bytes
232            ))));
233        }
234
235        let current_size = *self.current_size.read();
236        let mut size_to_free = if current_size + new_block_size > self.config.max_size_bytes {
237            current_size + new_block_size - self.config.max_size_bytes
238        } else {
239            0
240        };
241
242        while size_to_free > 0 {
243            // Get least recently used key (front of queue) and evict it atomically
244            let (evicted_size, should_update_stats) = {
245                let mut cache = self.cache.write();
246                let mut lru_order = self.lru_order.write();
247                let mut current_size = self.current_size.write();
248
249                // Get front key
250                if let Some(key) = lru_order.front().cloned() {
251                    if let Some(block) = cache.remove(&key) {
252                        lru_order.pop_front();
253                        *current_size -= block.size;
254                        (block.size, self.config.enable_stats)
255                    } else {
256                        (0, false)
257                    }
258                } else {
259                    // No more blocks to evict
260                    (0, false)
261                }
262            };
263
264            if evicted_size == 0 {
265                // No more blocks to evict
266                break;
267            }
268
269            // Update statistics after releasing locks
270            if should_update_stats {
271                let mut stats = self.stats.write();
272                stats.evictions += 1;
273            }
274
275            if evicted_size >= size_to_free {
276                size_to_free = 0;
277            } else {
278                size_to_free -= evicted_size;
279            }
280        }
281
282        Ok(())
283    }
284
285    /// Clear all blocks from the cache
286    pub fn clear(&self) {
287        let mut cache = self.cache.write();
288        let mut lru_order = self.lru_order.write();
289        let mut current_size = self.current_size.write();
290
291        cache.clear();
292        lru_order.clear();
293        *current_size = 0;
294
295        if self.config.enable_stats {
296            let mut stats = self.stats.write();
297            stats.block_count = 0;
298            stats.size_bytes = 0;
299        }
300    }
301
302    /// Get cache statistics
303    pub fn stats(&self) -> CacheStats {
304        self.stats.read().clone()
305    }
306
307    /// Get current cache size in bytes
308    pub fn current_size(&self) -> usize {
309        *self.current_size.read()
310    }
311
312    /// Get number of blocks in cache
313    pub fn block_count(&self) -> usize {
314        self.cache.read().len()
315    }
316
317    /// Check if cache contains a key
318    pub fn contains(&self, key: &BlockCacheKey) -> bool {
319        self.cache.read().contains_key(key)
320    }
321
322    /// Remove a specific block from cache
323    pub fn remove(&self, key: &BlockCacheKey) -> Option<CachedBlock> {
324        let mut cache = self.cache.write();
325        let mut lru_order = self.lru_order.write();
326        let mut current_size = self.current_size.write();
327
328        if let Some(block) = cache.remove(key) {
329            lru_order.retain(|k| k != key);
330            *current_size -= block.size;
331
332            if self.config.enable_stats {
333                let mut stats = self.stats.write();
334                stats.block_count = cache.len();
335                stats.size_bytes = *current_size;
336            }
337
338            Some(block)
339        } else {
340            None
341        }
342    }
343
344    /// Invalidate all blocks for a specific SSTable
345    pub fn invalidate_sstable(&self, sstable_path: &str) {
346        let mut cache = self.cache.write();
347        let mut lru_order = self.lru_order.write();
348        let mut current_size = self.current_size.write();
349
350        // Collect keys to remove
351        let keys_to_remove: Vec<BlockCacheKey> = cache
352            .keys()
353            .filter(|k| k.sstable_path == sstable_path)
354            .cloned()
355            .collect();
356
357        // Remove blocks
358        for key in keys_to_remove {
359            if let Some(block) = cache.remove(&key) {
360                *current_size -= block.size;
361                lru_order.retain(|k| k != &key);
362            }
363        }
364
365        if self.config.enable_stats {
366            let mut stats = self.stats.write();
367            stats.block_count = cache.len();
368            stats.size_bytes = *current_size;
369        }
370    }
371}
372
373impl Default for BlockCache {
374    fn default() -> Self {
375        Self::new()
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn test_block_cache_basic() -> Result<()> {
385        let cache = BlockCache::new();
386
387        let key = BlockCacheKey::new("test.sst".to_string(), 0);
388        let block = CachedBlock::new(vec![1, 2, 3, 4, 5]);
389
390        // Initially not in cache
391        assert!(cache.get(&key).is_none());
392
393        // Put in cache
394        cache.put(key.clone(), block.clone())?;
395
396        // Now should be in cache
397        let retrieved = cache.get(&key).expect("Block should be in cache after put");
398        assert_eq!(retrieved.as_slice(), &[1, 2, 3, 4, 5]);
399
400        Ok(())
401    }
402
403    #[test]
404    fn test_block_cache_lru_eviction() -> Result<()> {
405        let config = BlockCacheConfig {
406            max_size_bytes: 100,
407            enable_stats: true,
408        };
409        let cache = BlockCache::with_config(config);
410
411        // Add blocks that exceed cache size
412        for i in 0..5 {
413            let key = BlockCacheKey::new("test.sst".to_string(), i);
414            let block = CachedBlock::new(vec![0u8; 30]); // 30 bytes each
415            cache.put(key, block)?;
416        }
417
418        // Cache should have evicted oldest blocks
419        assert!(cache.current_size() <= 100);
420
421        // First blocks should be evicted
422        let key0 = BlockCacheKey::new("test.sst".to_string(), 0);
423        let key1 = BlockCacheKey::new("test.sst".to_string(), 1);
424        assert!(cache.get(&key0).is_none());
425        assert!(cache.get(&key1).is_none());
426
427        // Recent blocks should still be present
428        let key4 = BlockCacheKey::new("test.sst".to_string(), 4);
429        assert!(cache.get(&key4).is_some());
430
431        Ok(())
432    }
433
434    #[test]
435    fn test_block_cache_touch() -> Result<()> {
436        let config = BlockCacheConfig {
437            max_size_bytes: 100,
438            enable_stats: true,
439        };
440        let cache = BlockCache::with_config(config);
441
442        // Add 3 blocks
443        for i in 0..3 {
444            let key = BlockCacheKey::new("test.sst".to_string(), i);
445            let block = CachedBlock::new(vec![0u8; 30]);
446            cache.put(key, block)?;
447        }
448
449        // Touch block 0 (make it most recent)
450        let key0 = BlockCacheKey::new("test.sst".to_string(), 0);
451        cache.get(&key0);
452
453        // Add a new block (should evict block 1, not block 0)
454        let key3 = BlockCacheKey::new("test.sst".to_string(), 3);
455        let block3 = CachedBlock::new(vec![0u8; 30]);
456        cache.put(key3, block3)?;
457
458        // Block 0 should still be present (touched)
459        assert!(cache.get(&key0).is_some());
460
461        // Block 1 should be evicted (oldest untouched)
462        let key1 = BlockCacheKey::new("test.sst".to_string(), 1);
463        assert!(cache.get(&key1).is_none());
464
465        Ok(())
466    }
467
468    #[test]
469    fn test_block_cache_stats() -> Result<()> {
470        let cache = BlockCache::new();
471
472        let key = BlockCacheKey::new("test.sst".to_string(), 0);
473        let block = CachedBlock::new(vec![1, 2, 3]);
474
475        // Miss
476        cache.get(&key);
477
478        // Put
479        cache.put(key.clone(), block)?;
480
481        // Hit
482        cache.get(&key);
483        cache.get(&key);
484
485        let stats = cache.stats();
486        assert_eq!(stats.hits, 2);
487        assert_eq!(stats.misses, 1);
488        assert_eq!(stats.hit_rate(), 2.0 / 3.0);
489
490        Ok(())
491    }
492
493    #[test]
494    fn test_block_cache_clear() -> Result<()> {
495        let cache = BlockCache::new();
496
497        for i in 0..5 {
498            let key = BlockCacheKey::new("test.sst".to_string(), i);
499            let block = CachedBlock::new(vec![0u8; 100]);
500            cache.put(key, block)?;
501        }
502
503        assert!(cache.block_count() > 0);
504        assert!(cache.current_size() > 0);
505
506        cache.clear();
507
508        assert_eq!(cache.block_count(), 0);
509        assert_eq!(cache.current_size(), 0);
510
511        Ok(())
512    }
513
514    #[test]
515    fn test_block_cache_remove() -> Result<()> {
516        let cache = BlockCache::new();
517
518        let key = BlockCacheKey::new("test.sst".to_string(), 0);
519        let block = CachedBlock::new(vec![1, 2, 3]);
520
521        cache.put(key.clone(), block)?;
522        assert!(cache.contains(&key));
523
524        cache.remove(&key);
525        assert!(!cache.contains(&key));
526
527        Ok(())
528    }
529
530    #[test]
531    fn test_block_cache_invalidate_sstable() -> Result<()> {
532        let cache = BlockCache::new();
533
534        // Add blocks from two SSTables
535        for i in 0..3 {
536            let key = BlockCacheKey::new("test1.sst".to_string(), i);
537            let block = CachedBlock::new(vec![0u8; 100]);
538            cache.put(key, block)?;
539        }
540
541        for i in 0..3 {
542            let key = BlockCacheKey::new("test2.sst".to_string(), i);
543            let block = CachedBlock::new(vec![0u8; 100]);
544            cache.put(key, block)?;
545        }
546
547        assert_eq!(cache.block_count(), 6);
548
549        // Invalidate one SSTable
550        cache.invalidate_sstable("test1.sst");
551
552        assert_eq!(cache.block_count(), 3);
553
554        // test1.sst blocks should be gone
555        let key1 = BlockCacheKey::new("test1.sst".to_string(), 0);
556        assert!(!cache.contains(&key1));
557
558        // test2.sst blocks should still be present
559        let key2 = BlockCacheKey::new("test2.sst".to_string(), 0);
560        assert!(cache.contains(&key2));
561
562        Ok(())
563    }
564
565    #[test]
566    fn test_block_cache_concurrent() -> Result<()> {
567        use std::sync::Arc;
568        use std::thread;
569
570        let cache = Arc::new(BlockCache::new());
571        let mut handles = vec![];
572
573        // Spawn multiple threads doing cache operations
574        for thread_id in 0..4 {
575            let cache = Arc::clone(&cache);
576            let handle = thread::spawn(move || {
577                for i in 0..100 {
578                    let key = BlockCacheKey::new(format!("test_{}.sst", thread_id), i);
579                    let block = CachedBlock::new(vec![thread_id as u8; 100]);
580                    cache
581                        .put(key.clone(), block)
582                        .expect("Cache put should succeed in concurrent test");
583                    cache.get(&key);
584                }
585            });
586            handles.push(handle);
587        }
588
589        for handle in handles {
590            handle.join().expect("Thread should complete successfully");
591        }
592
593        // Cache should have blocks from all threads
594        assert!(cache.block_count() > 0);
595        let stats = cache.stats();
596        assert!(stats.hits > 0);
597
598        Ok(())
599    }
600}