Skip to main content

amaters_core/storage/
block_cache.rs

1//! Block cache for SSTable blocks
2//!
3//! LRU (Least Recently Used) cache for caching SSTable data blocks in memory.
4//! Reduces disk I/O by keeping frequently accessed blocks in memory.
5
6use crate::error::{AmateRSError, ErrorContext, Result};
7use parking_lot::RwLock;
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10
11/// Cache key identifying a specific block in a specific SSTable
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13pub struct BlockCacheKey {
14    /// SSTable file path
15    pub sstable_path: String,
16    /// Block index within the SSTable
17    pub block_index: usize,
18}
19
20impl BlockCacheKey {
21    /// Create a new cache key
22    pub fn new(sstable_path: String, block_index: usize) -> Self {
23        Self {
24            sstable_path,
25            block_index,
26        }
27    }
28}
29
30/// Cached block data
31#[derive(Debug, Clone)]
32pub struct CachedBlock {
33    /// The block data
34    pub data: Arc<Vec<u8>>,
35    /// Size in bytes
36    pub size: usize,
37}
38
39impl CachedBlock {
40    /// Create a new cached block
41    pub fn new(data: Vec<u8>) -> Self {
42        let size = data.len();
43        Self {
44            data: Arc::new(data),
45            size,
46        }
47    }
48
49    /// Get the block data as a slice
50    pub fn as_slice(&self) -> &[u8] {
51        &self.data
52    }
53}
54
55/// LRU block cache configuration
56#[derive(Debug, Clone)]
57pub struct BlockCacheConfig {
58    /// Maximum cache size in bytes
59    pub max_size_bytes: usize,
60    /// Whether to track cache statistics
61    pub enable_stats: bool,
62}
63
64impl Default for BlockCacheConfig {
65    fn default() -> Self {
66        Self {
67            max_size_bytes: 128 * 1024 * 1024, // 128 MB default
68            enable_stats: true,
69        }
70    }
71}
72
73/// Cache statistics
74#[derive(Debug, Clone, Default)]
75pub struct CacheStats {
76    /// Number of cache hits
77    pub hits: u64,
78    /// Number of cache misses
79    pub misses: u64,
80    /// Number of evictions
81    pub evictions: u64,
82    /// Current number of blocks in cache
83    pub block_count: usize,
84    /// Current cache size in bytes
85    pub size_bytes: usize,
86}
87
88impl CacheStats {
89    /// Calculate hit rate (0.0 to 1.0)
90    pub fn hit_rate(&self) -> f64 {
91        let total = self.hits + self.misses;
92        if total == 0 {
93            0.0
94        } else {
95            self.hits as f64 / total as f64
96        }
97    }
98
99    /// Calculate miss rate (0.0 to 1.0)
100    pub fn miss_rate(&self) -> f64 {
101        1.0 - self.hit_rate()
102    }
103}
104
105/// LRU cache entry
106struct CacheEntry {
107    key: BlockCacheKey,
108    block: CachedBlock,
109}
110
111/// LRU (Least Recently Used) block cache
112///
113/// Thread-safe cache using RwLock for concurrent read access.
114/// Evicts least recently used blocks when cache is full.
115pub struct BlockCache {
116    /// Configuration
117    config: BlockCacheConfig,
118    /// Cache entries (HashMap for O(1) lookups)
119    cache: Arc<RwLock<HashMap<BlockCacheKey, CachedBlock>>>,
120    /// LRU order (most recent at back)
121    lru_order: Arc<RwLock<VecDeque<BlockCacheKey>>>,
122    /// Current cache size in bytes
123    current_size: Arc<RwLock<usize>>,
124    /// Cache statistics
125    stats: Arc<RwLock<CacheStats>>,
126}
127
128impl BlockCache {
129    /// Create a new block cache with default configuration
130    pub fn new() -> Self {
131        Self::with_config(BlockCacheConfig::default())
132    }
133
134    /// Create a new block cache with custom configuration
135    pub fn with_config(config: BlockCacheConfig) -> Self {
136        Self {
137            config,
138            cache: Arc::new(RwLock::new(HashMap::new())),
139            lru_order: Arc::new(RwLock::new(VecDeque::new())),
140            current_size: Arc::new(RwLock::new(0)),
141            stats: Arc::new(RwLock::new(CacheStats::default())),
142        }
143    }
144
145    /// Get a block from the cache
146    pub fn get(&self, key: &BlockCacheKey) -> Option<CachedBlock> {
147        // First, check if block exists and clone it
148        let block = {
149            let cache = self.cache.read();
150            cache.get(key).cloned()
151        };
152
153        // Update LRU and stats after releasing cache lock
154        if let Some(ref block) = block {
155            // Update LRU order (move to back)
156            self.touch(key);
157
158            // Update statistics
159            if self.config.enable_stats {
160                let mut stats = self.stats.write();
161                stats.hits += 1;
162            }
163
164            Some(block.clone())
165        } else {
166            // Update statistics
167            if self.config.enable_stats {
168                let mut stats = self.stats.write();
169                stats.misses += 1;
170            }
171
172            None
173        }
174    }
175
176    /// Put a block into the cache
177    pub fn put(&self, key: BlockCacheKey, block: CachedBlock) -> Result<()> {
178        let block_size = block.size;
179
180        // Check if we need to evict blocks to make room
181        self.evict_if_needed(block_size)?;
182
183        // Insert into cache
184        let (new_block_count, new_size_bytes) = {
185            let mut cache = self.cache.write();
186            let mut lru_order = self.lru_order.write();
187            let mut current_size = self.current_size.write();
188
189            // Remove old entry if exists
190            if let Some(old_block) = cache.remove(&key) {
191                *current_size -= old_block.size;
192                // Remove from LRU order
193                lru_order.retain(|k| k != &key);
194            }
195
196            // Insert new entry
197            cache.insert(key.clone(), block);
198            lru_order.push_back(key);
199            *current_size += block_size;
200
201            // Return stats while we have the locks
202            (cache.len(), *current_size)
203        };
204
205        // Update statistics after releasing locks
206        if self.config.enable_stats {
207            let mut stats = self.stats.write();
208            stats.block_count = new_block_count;
209            stats.size_bytes = new_size_bytes;
210        }
211
212        Ok(())
213    }
214
215    /// Touch a key (move to most recent position)
216    fn touch(&self, key: &BlockCacheKey) {
217        let mut lru_order = self.lru_order.write();
218
219        // Remove from current position
220        lru_order.retain(|k| k != key);
221
222        // Add to back (most recent)
223        lru_order.push_back(key.clone());
224    }
225
226    /// Evict blocks if needed to make room for new block
227    fn evict_if_needed(&self, new_block_size: usize) -> Result<()> {
228        if new_block_size > self.config.max_size_bytes {
229            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(format!(
230                "Block size {} exceeds cache size {}",
231                new_block_size, self.config.max_size_bytes
232            ))));
233        }
234
235        let current_size = *self.current_size.read();
236        let mut size_to_free =
237            (current_size + new_block_size).saturating_sub(self.config.max_size_bytes);
238
239        while size_to_free > 0 {
240            // Get least recently used key (front of queue) and evict it atomically
241            let (evicted_size, should_update_stats) = {
242                let mut cache = self.cache.write();
243                let mut lru_order = self.lru_order.write();
244                let mut current_size = self.current_size.write();
245
246                // Get front key
247                if let Some(key) = lru_order.front().cloned() {
248                    if let Some(block) = cache.remove(&key) {
249                        lru_order.pop_front();
250                        *current_size -= block.size;
251                        (block.size, self.config.enable_stats)
252                    } else {
253                        (0, false)
254                    }
255                } else {
256                    // No more blocks to evict
257                    (0, false)
258                }
259            };
260
261            if evicted_size == 0 {
262                // No more blocks to evict
263                break;
264            }
265
266            // Update statistics after releasing locks
267            if should_update_stats {
268                let mut stats = self.stats.write();
269                stats.evictions += 1;
270            }
271
272            if evicted_size >= size_to_free {
273                size_to_free = 0;
274            } else {
275                size_to_free -= evicted_size;
276            }
277        }
278
279        Ok(())
280    }
281
282    /// Clear all blocks from the cache
283    pub fn clear(&self) {
284        let mut cache = self.cache.write();
285        let mut lru_order = self.lru_order.write();
286        let mut current_size = self.current_size.write();
287
288        cache.clear();
289        lru_order.clear();
290        *current_size = 0;
291
292        if self.config.enable_stats {
293            let mut stats = self.stats.write();
294            stats.block_count = 0;
295            stats.size_bytes = 0;
296        }
297    }
298
299    /// Get cache statistics
300    pub fn stats(&self) -> CacheStats {
301        self.stats.read().clone()
302    }
303
304    /// Get current cache size in bytes
305    pub fn current_size(&self) -> usize {
306        *self.current_size.read()
307    }
308
309    /// Get number of blocks in cache
310    pub fn block_count(&self) -> usize {
311        self.cache.read().len()
312    }
313
314    /// Check if cache contains a key
315    pub fn contains(&self, key: &BlockCacheKey) -> bool {
316        self.cache.read().contains_key(key)
317    }
318
319    /// Remove a specific block from cache
320    pub fn remove(&self, key: &BlockCacheKey) -> Option<CachedBlock> {
321        let mut cache = self.cache.write();
322        let mut lru_order = self.lru_order.write();
323        let mut current_size = self.current_size.write();
324
325        if let Some(block) = cache.remove(key) {
326            lru_order.retain(|k| k != key);
327            *current_size -= block.size;
328
329            if self.config.enable_stats {
330                let mut stats = self.stats.write();
331                stats.block_count = cache.len();
332                stats.size_bytes = *current_size;
333            }
334
335            Some(block)
336        } else {
337            None
338        }
339    }
340
341    /// Invalidate all blocks for a specific SSTable
342    pub fn invalidate_sstable(&self, sstable_path: &str) {
343        let mut cache = self.cache.write();
344        let mut lru_order = self.lru_order.write();
345        let mut current_size = self.current_size.write();
346
347        // Collect keys to remove
348        let keys_to_remove: Vec<BlockCacheKey> = cache
349            .keys()
350            .filter(|k| k.sstable_path == sstable_path)
351            .cloned()
352            .collect();
353
354        // Remove blocks
355        for key in keys_to_remove {
356            if let Some(block) = cache.remove(&key) {
357                *current_size -= block.size;
358                lru_order.retain(|k| k != &key);
359            }
360        }
361
362        if self.config.enable_stats {
363            let mut stats = self.stats.write();
364            stats.block_count = cache.len();
365            stats.size_bytes = *current_size;
366        }
367    }
368}
369
370impl Default for BlockCache {
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    #[test]
381    fn test_block_cache_basic() -> Result<()> {
382        let cache = BlockCache::new();
383
384        let key = BlockCacheKey::new("test.sst".to_string(), 0);
385        let block = CachedBlock::new(vec![1, 2, 3, 4, 5]);
386
387        // Initially not in cache
388        assert!(cache.get(&key).is_none());
389
390        // Put in cache
391        cache.put(key.clone(), block.clone())?;
392
393        // Now should be in cache
394        let retrieved = cache.get(&key).expect("Block should be in cache after put");
395        assert_eq!(retrieved.as_slice(), &[1, 2, 3, 4, 5]);
396
397        Ok(())
398    }
399
400    #[test]
401    fn test_block_cache_lru_eviction() -> Result<()> {
402        let config = BlockCacheConfig {
403            max_size_bytes: 100,
404            enable_stats: true,
405        };
406        let cache = BlockCache::with_config(config);
407
408        // Add blocks that exceed cache size
409        for i in 0..5 {
410            let key = BlockCacheKey::new("test.sst".to_string(), i);
411            let block = CachedBlock::new(vec![0u8; 30]); // 30 bytes each
412            cache.put(key, block)?;
413        }
414
415        // Cache should have evicted oldest blocks
416        assert!(cache.current_size() <= 100);
417
418        // First blocks should be evicted
419        let key0 = BlockCacheKey::new("test.sst".to_string(), 0);
420        let key1 = BlockCacheKey::new("test.sst".to_string(), 1);
421        assert!(cache.get(&key0).is_none());
422        assert!(cache.get(&key1).is_none());
423
424        // Recent blocks should still be present
425        let key4 = BlockCacheKey::new("test.sst".to_string(), 4);
426        assert!(cache.get(&key4).is_some());
427
428        Ok(())
429    }
430
431    #[test]
432    fn test_block_cache_touch() -> Result<()> {
433        let config = BlockCacheConfig {
434            max_size_bytes: 100,
435            enable_stats: true,
436        };
437        let cache = BlockCache::with_config(config);
438
439        // Add 3 blocks
440        for i in 0..3 {
441            let key = BlockCacheKey::new("test.sst".to_string(), i);
442            let block = CachedBlock::new(vec![0u8; 30]);
443            cache.put(key, block)?;
444        }
445
446        // Touch block 0 (make it most recent)
447        let key0 = BlockCacheKey::new("test.sst".to_string(), 0);
448        cache.get(&key0);
449
450        // Add a new block (should evict block 1, not block 0)
451        let key3 = BlockCacheKey::new("test.sst".to_string(), 3);
452        let block3 = CachedBlock::new(vec![0u8; 30]);
453        cache.put(key3, block3)?;
454
455        // Block 0 should still be present (touched)
456        assert!(cache.get(&key0).is_some());
457
458        // Block 1 should be evicted (oldest untouched)
459        let key1 = BlockCacheKey::new("test.sst".to_string(), 1);
460        assert!(cache.get(&key1).is_none());
461
462        Ok(())
463    }
464
465    #[test]
466    fn test_block_cache_stats() -> Result<()> {
467        let cache = BlockCache::new();
468
469        let key = BlockCacheKey::new("test.sst".to_string(), 0);
470        let block = CachedBlock::new(vec![1, 2, 3]);
471
472        // Miss
473        cache.get(&key);
474
475        // Put
476        cache.put(key.clone(), block)?;
477
478        // Hit
479        cache.get(&key);
480        cache.get(&key);
481
482        let stats = cache.stats();
483        assert_eq!(stats.hits, 2);
484        assert_eq!(stats.misses, 1);
485        assert_eq!(stats.hit_rate(), 2.0 / 3.0);
486
487        Ok(())
488    }
489
490    #[test]
491    fn test_block_cache_clear() -> Result<()> {
492        let cache = BlockCache::new();
493
494        for i in 0..5 {
495            let key = BlockCacheKey::new("test.sst".to_string(), i);
496            let block = CachedBlock::new(vec![0u8; 100]);
497            cache.put(key, block)?;
498        }
499
500        assert!(cache.block_count() > 0);
501        assert!(cache.current_size() > 0);
502
503        cache.clear();
504
505        assert_eq!(cache.block_count(), 0);
506        assert_eq!(cache.current_size(), 0);
507
508        Ok(())
509    }
510
511    #[test]
512    fn test_block_cache_remove() -> Result<()> {
513        let cache = BlockCache::new();
514
515        let key = BlockCacheKey::new("test.sst".to_string(), 0);
516        let block = CachedBlock::new(vec![1, 2, 3]);
517
518        cache.put(key.clone(), block)?;
519        assert!(cache.contains(&key));
520
521        cache.remove(&key);
522        assert!(!cache.contains(&key));
523
524        Ok(())
525    }
526
527    #[test]
528    fn test_block_cache_invalidate_sstable() -> Result<()> {
529        let cache = BlockCache::new();
530
531        // Add blocks from two SSTables
532        for i in 0..3 {
533            let key = BlockCacheKey::new("test1.sst".to_string(), i);
534            let block = CachedBlock::new(vec![0u8; 100]);
535            cache.put(key, block)?;
536        }
537
538        for i in 0..3 {
539            let key = BlockCacheKey::new("test2.sst".to_string(), i);
540            let block = CachedBlock::new(vec![0u8; 100]);
541            cache.put(key, block)?;
542        }
543
544        assert_eq!(cache.block_count(), 6);
545
546        // Invalidate one SSTable
547        cache.invalidate_sstable("test1.sst");
548
549        assert_eq!(cache.block_count(), 3);
550
551        // test1.sst blocks should be gone
552        let key1 = BlockCacheKey::new("test1.sst".to_string(), 0);
553        assert!(!cache.contains(&key1));
554
555        // test2.sst blocks should still be present
556        let key2 = BlockCacheKey::new("test2.sst".to_string(), 0);
557        assert!(cache.contains(&key2));
558
559        Ok(())
560    }
561
562    #[test]
563    fn test_block_cache_concurrent() -> Result<()> {
564        use std::sync::Arc;
565        use std::thread;
566
567        let cache = Arc::new(BlockCache::new());
568        let mut handles = vec![];
569
570        // Spawn multiple threads doing cache operations
571        for thread_id in 0..4 {
572            let cache = Arc::clone(&cache);
573            let handle = thread::spawn(move || {
574                for i in 0..100 {
575                    let key = BlockCacheKey::new(format!("test_{}.sst", thread_id), i);
576                    let block = CachedBlock::new(vec![thread_id as u8; 100]);
577                    cache
578                        .put(key.clone(), block)
579                        .expect("Cache put should succeed in concurrent test");
580                    cache.get(&key);
581                }
582            });
583            handles.push(handle);
584        }
585
586        for handle in handles {
587            handle.join().expect("Thread should complete successfully");
588        }
589
590        // Cache should have blocks from all threads
591        assert!(cache.block_count() > 0);
592        let stats = cache.stats();
593        assert!(stats.hits > 0);
594
595        Ok(())
596    }
597}