elif_cache/backends/
memory.rs

1//! In-memory cache backend with LRU eviction
2
3use crate::{CacheBackend, CacheConfig, CacheResult, CacheStats};
4use async_trait::async_trait;
5use dashmap::DashMap;
6use lru::LruCache;
7use parking_lot::{RwLock, Mutex};
8use std::{
9    num::NonZeroUsize,
10    sync::{
11        atomic::{AtomicU64, Ordering},
12        Arc,
13    },
14    time::{Duration, Instant},
15};
16
17/// Entry in the memory cache
18#[derive(Debug)]
19struct CacheEntry {
20    data: Vec<u8>,
21    created_at: Instant,
22    expires_at: Option<Instant>,
23    access_count: AtomicU64,
24    last_accessed: RwLock<Instant>,
25}
26
27impl Clone for CacheEntry {
28    fn clone(&self) -> Self {
29        Self {
30            data: self.data.clone(),
31            created_at: self.created_at,
32            expires_at: self.expires_at,
33            access_count: AtomicU64::new(self.access_count.load(Ordering::Relaxed)),
34            last_accessed: RwLock::new(*self.last_accessed.read()),
35        }
36    }
37}
38
39impl CacheEntry {
40    fn new(data: Vec<u8>, ttl: Option<Duration>) -> Self {
41        let now = Instant::now();
42        Self {
43            data,
44            created_at: now,
45            expires_at: ttl.map(|ttl| now + ttl),
46            access_count: AtomicU64::new(1),
47            last_accessed: RwLock::new(now),
48        }
49    }
50    
51    fn is_expired(&self) -> bool {
52        self.expires_at.map_or(false, |exp| Instant::now() > exp)
53    }
54    
55    fn access(&self) -> Vec<u8> {
56        self.access_count.fetch_add(1, Ordering::Relaxed);
57        *self.last_accessed.write() = Instant::now();
58        self.data.clone()
59    }
60    
61    fn size(&self) -> usize {
62        self.data.len() + std::mem::size_of::<Self>()
63    }
64}
65
66/// High-performance LRU tracker with O(1) operations using the lru crate
67/// Much safer and more reliable than manual implementation
68struct LruTracker {
69    cache: RwLock<LruCache<String, ()>>,
70}
71
72impl LruTracker {
73    fn new() -> Self {
74        // Start with a reasonable default capacity, will resize as needed
75        let capacity = NonZeroUsize::new(1000).expect("1000 is non-zero");
76        Self {
77            cache: RwLock::new(LruCache::new(capacity)),
78        }
79    }
80    
81    /// Access a key, moving it to the front (most recently used)
82    /// O(1) time complexity
83    fn access(&self, key: &str) {
84        let mut cache = self.cache.write();
85        
86        // If the key is not in the cache and the cache is full, resize it first to avoid eviction
87        if cache.peek(key).is_none() && cache.len() == cache.cap().get() {
88            let new_capacity = NonZeroUsize::new(cache.cap().get() * 2)
89                .expect("doubled capacity should be non-zero");
90            cache.resize(new_capacity);
91        }
92        
93        // Now, this put will not cause an eviction of an existing item
94        // If key doesn't exist, it's inserted. If it exists, its position is updated.
95        cache.put(key.to_string(), ());
96    }
97    
98    /// Remove a key from the tracker
99    /// O(1) time complexity  
100    fn remove(&self, key: &str) {
101        let mut cache = self.cache.write();
102        cache.pop(key);
103    }
104    
105    /// Get the least recently used key
106    /// O(1) time complexity
107    fn least_recently_used(&self) -> Option<String> {
108        let cache = self.cache.read();
109        cache.iter().next_back().map(|(key, _)| key.clone())
110    }
111    
112    /// Clear all entries from the tracker
113    fn clear(&self) {
114        let mut cache = self.cache.write();
115        cache.clear();
116    }
117}
118
119/// In-memory cache backend with LRU eviction
120pub struct MemoryBackend {
121    entries: DashMap<String, CacheEntry>,
122    lru: LruTracker,
123    config: CacheConfig,
124    stats: Arc<Mutex<CacheStats>>,
125}
126
127impl MemoryBackend {
128    /// Create a new memory backend with the given configuration
129    pub fn new(config: CacheConfig) -> Self {
130        Self {
131            entries: DashMap::new(),
132            lru: LruTracker::new(),
133            config,
134            stats: Arc::new(Mutex::new(CacheStats::default())),
135        }
136    }
137    
138    /// Get current memory usage in bytes
139    fn memory_usage(&self) -> usize {
140        self.entries.iter().map(|entry| entry.value().size()).sum()
141    }
142    
143    /// Check if we need to evict entries
144    fn should_evict(&self) -> bool {
145        if let Some(max_entries) = self.config.get_max_entries() {
146            if self.entries.len() >= *max_entries {
147                return true;
148            }
149        }
150        
151        if let Some(max_memory) = self.config.get_max_memory() {
152            if self.memory_usage() >= *max_memory {
153                return true;
154            }
155        }
156        
157        false
158    }
159    
160    /// Evict expired and least recently used entries
161    async fn evict(&self) -> CacheResult<()> {
162        // First, remove expired entries
163        let expired_keys: Vec<String> = self.entries
164            .iter()
165            .filter_map(|entry| {
166                if entry.value().is_expired() {
167                    Some(entry.key().clone())
168                } else {
169                    None
170                }
171            })
172            .collect();
173        
174        for key in expired_keys {
175            if let Some((_, removed_entry)) = self.entries.remove(&key) {
176                self.lru.remove(&key);
177                let mut stats = self.stats.lock();
178                stats.total_keys = stats.total_keys.saturating_sub(1);
179                stats.memory_usage = stats.memory_usage.saturating_sub(removed_entry.size() as u64);
180            }
181        }
182        
183        // Then, evict LRU entries if still over limits
184        while self.should_evict() {
185            if let Some(lru_key) = self.lru.least_recently_used() {
186                if let Some((_, removed_entry)) = self.entries.remove(&lru_key) {
187                    self.lru.remove(&lru_key);
188                    let mut stats = self.stats.lock();
189                    stats.total_keys = stats.total_keys.saturating_sub(1);
190                    stats.memory_usage = stats.memory_usage.saturating_sub(removed_entry.size() as u64);
191                } else {
192                    break;
193                }
194            } else {
195                break;
196            }
197        }
198        
199        Ok(())
200    }
201    
202    /// Clean up expired entries (background task)
203    async fn cleanup_expired(&self) {
204        let expired_keys: Vec<String> = self.entries
205            .iter()
206            .filter_map(|entry| {
207                if entry.value().is_expired() {
208                    Some(entry.key().clone())
209                } else {
210                    None
211                }
212            })
213            .collect();
214        
215        for key in expired_keys {
216            if let Some((_, removed_entry)) = self.entries.remove(&key) {
217                self.lru.remove(&key);
218                let mut stats = self.stats.lock();
219                stats.total_keys = stats.total_keys.saturating_sub(1);
220                stats.memory_usage = stats.memory_usage.saturating_sub(removed_entry.size() as u64);
221            }
222        }
223    }
224}
225
226#[async_trait]
227impl CacheBackend for MemoryBackend {
228    async fn get(&self, key: &str) -> CacheResult<Option<Vec<u8>>> {
229        // Clean up expired entries occasionally
230        if rand::random::<f64>() < 0.01 { // 1% chance
231            self.cleanup_expired().await;
232        }
233        
234        if let Some(entry) = self.entries.get(key) {
235            if entry.is_expired() {
236                // Get entry size before dropping
237                let entry_size = entry.size() as u64;
238                drop(entry);
239                
240                // Remove expired entry
241                if self.entries.remove(key).is_some() {
242                    self.lru.remove(key);
243                    
244                    // Update stats
245                    let mut stats = self.stats.lock();
246                    stats.misses += 1;
247                    stats.total_keys = stats.total_keys.saturating_sub(1);
248                    stats.memory_usage = stats.memory_usage.saturating_sub(entry_size);
249                }
250                
251                return Ok(None);
252            }
253            
254            // Access the entry (updates LRU and access count)
255            let data = entry.access();
256            self.lru.access(key);
257            
258            // Update stats
259            self.stats.lock().hits += 1;
260            
261            Ok(Some(data))
262        } else {
263            // Update stats
264            self.stats.lock().misses += 1;
265            
266            Ok(None)
267        }
268    }
269    
270    async fn put(&self, key: &str, value: Vec<u8>, ttl: Option<Duration>) -> CacheResult<()> {
271        // Evict if necessary before adding
272        if self.should_evict() {
273            self.evict().await?;
274        }
275        
276        let entry = CacheEntry::new(value, ttl);
277        let entry_size = entry.size() as u64;
278        
279        // Insert or update entry
280        let old_entry = self.entries.insert(key.to_string(), entry);
281        
282        let mut stats = self.stats.lock();
283        if let Some(old_entry) = old_entry {
284            // Existing entry - update memory usage with size difference
285            let old_size = old_entry.size() as u64;
286            stats.memory_usage = stats.memory_usage.saturating_sub(old_size) + entry_size;
287        } else {
288            // New entry - increment total count and memory usage
289            stats.total_keys += 1;
290            stats.memory_usage += entry_size;
291        }
292        
293        // Update LRU
294        self.lru.access(key);
295        
296        Ok(())
297    }
298    
299    async fn forget(&self, key: &str) -> CacheResult<bool> {
300        if let Some((_, removed_entry)) = self.entries.remove(key) {
301            self.lru.remove(key);
302            
303            // Update stats efficiently
304            let mut stats = self.stats.lock();
305            stats.total_keys = stats.total_keys.saturating_sub(1);
306            stats.memory_usage = stats.memory_usage.saturating_sub(removed_entry.size() as u64);
307            
308            Ok(true)
309        } else {
310            Ok(false)
311        }
312    }
313    
314    async fn exists(&self, key: &str) -> CacheResult<bool> {
315        if let Some(entry) = self.entries.get(key) {
316            if entry.is_expired() {
317                // Get entry size before dropping
318                let entry_size = entry.size() as u64;
319                drop(entry);
320                
321                // Clean up expired entry
322                if self.entries.remove(key).is_some() {
323                    self.lru.remove(key);
324                    
325                    let mut stats = self.stats.lock();
326                    stats.total_keys = stats.total_keys.saturating_sub(1);
327                    stats.memory_usage = stats.memory_usage.saturating_sub(entry_size);
328                }
329                
330                return Ok(false);
331            }
332            Ok(true)
333        } else {
334            Ok(false)
335        }
336    }
337    
338    async fn flush(&self) -> CacheResult<()> {
339        self.entries.clear();
340        
341        // Reset LRU tracker
342        self.lru.clear();
343        
344        // Reset stats
345        let mut stats = self.stats.lock();
346        stats.total_keys = 0;
347        stats.memory_usage = 0;
348        
349        Ok(())
350    }
351    
352    async fn get_many(&self, keys: &[&str]) -> CacheResult<Vec<Option<Vec<u8>>>> {
353        let mut results = Vec::with_capacity(keys.len());
354        
355        for key in keys {
356            results.push(self.get(key).await?);
357        }
358        
359        Ok(results)
360    }
361    
362    async fn put_many(&self, entries: &[(&str, Vec<u8>, Option<Duration>)]) -> CacheResult<()> {
363        for (key, value, ttl) in entries {
364            self.put(key, value.clone(), *ttl).await?;
365        }
366        
367        Ok(())
368    }
369    
370    async fn forget_many(&self, keys: &[&str]) -> CacheResult<usize> {
371        let mut removed_count = 0;
372        let mut total_freed_memory = 0u64;
373        
374        // Remove entries and track freed memory
375        for key in keys {
376            if let Some((_, removed_entry)) = self.entries.remove(*key) {
377                self.lru.remove(key);
378                total_freed_memory += removed_entry.size() as u64;
379                removed_count += 1;
380            }
381        }
382        
383        // Update stats once for all removals
384        if removed_count > 0 {
385            let mut stats = self.stats.lock();
386            stats.total_keys = stats.total_keys.saturating_sub(removed_count as u64);
387            stats.memory_usage = stats.memory_usage.saturating_sub(total_freed_memory);
388        }
389        
390        Ok(removed_count)
391    }
392    
393    async fn stats(&self) -> CacheResult<CacheStats> {
394        Ok(self.stats.lock().clone())
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use std::time::Duration;
402    use tokio::time::sleep;
403    
404    #[tokio::test]
405    async fn test_memory_backend_basic_operations() {
406        let backend = MemoryBackend::new(CacheConfig::default());
407        
408        // Test put and get
409        backend.put("test", b"value".to_vec(), Some(Duration::from_secs(60))).await.unwrap();
410        let result = backend.get("test").await.unwrap();
411        assert_eq!(result, Some(b"value".to_vec()));
412        
413        // Test exists
414        assert!(backend.exists("test").await.unwrap());
415        assert!(!backend.exists("nonexistent").await.unwrap());
416        
417        // Test forget
418        assert!(backend.forget("test").await.unwrap());
419        assert!(!backend.exists("test").await.unwrap());
420    }
421    
422    #[tokio::test]
423    async fn test_memory_backend_ttl() {
424        let backend = MemoryBackend::new(CacheConfig::default());
425        
426        // Put with very short TTL
427        backend.put("ttl_test", b"value".to_vec(), Some(Duration::from_millis(50))).await.unwrap();
428        
429        // Should exist initially
430        assert!(backend.exists("ttl_test").await.unwrap());
431        
432        // Wait for expiration
433        sleep(Duration::from_millis(100)).await;
434        
435        // Should be expired
436        assert!(!backend.exists("ttl_test").await.unwrap());
437        let result = backend.get("ttl_test").await.unwrap();
438        assert_eq!(result, None);
439    }
440    
441    #[tokio::test]
442    async fn test_memory_backend_lru_eviction() {
443        let config = CacheConfig::builder()
444            .max_entries_limit(2)
445            .build_config();
446        let backend = MemoryBackend::new(config);
447        
448        // Fill cache to capacity
449        backend.put("key1", b"value1".to_vec(), None).await.unwrap();
450        backend.put("key2", b"value2".to_vec(), None).await.unwrap();
451        
452        // Access key1 to make it more recently used
453        backend.get("key1").await.unwrap();
454        
455        // Add third key, should evict key2 (least recently used)
456        backend.put("key3", b"value3".to_vec(), None).await.unwrap();
457        
458        // key1 and key3 should exist, key2 should be evicted
459        assert!(backend.exists("key1").await.unwrap());
460        assert!(!backend.exists("key2").await.unwrap());
461        assert!(backend.exists("key3").await.unwrap());
462    }
463    
464    #[tokio::test]
465    async fn test_memory_backend_stats() {
466        let backend = MemoryBackend::new(CacheConfig::default());
467        
468        // Initial stats
469        let stats = backend.stats().await.unwrap();
470        assert_eq!(stats.hits, 0);
471        assert_eq!(stats.misses, 0);
472        assert_eq!(stats.total_keys, 0);
473        
474        // Add some data
475        backend.put("test1", b"value1".to_vec(), None).await.unwrap();
476        backend.put("test2", b"value2".to_vec(), None).await.unwrap();
477        
478        // Check stats after puts
479        let stats = backend.stats().await.unwrap();
480        assert_eq!(stats.total_keys, 2);
481        assert!(stats.memory_usage > 0);
482        
483        // Test cache hit
484        backend.get("test1").await.unwrap();
485        let stats = backend.stats().await.unwrap();
486        assert_eq!(stats.hits, 1);
487        
488        // Test cache miss
489        backend.get("nonexistent").await.unwrap();
490        let stats = backend.stats().await.unwrap();
491        assert_eq!(stats.misses, 1);
492        
493        // Check hit ratio
494        assert_eq!(stats.hit_ratio(), 0.5);
495    }
496    
497    #[tokio::test]
498    async fn test_memory_backend_forget_many() {
499        let backend = MemoryBackend::new(CacheConfig::default());
500        
501        // Add test data
502        backend.put("key1", b"value1".to_vec(), None).await.unwrap();
503        backend.put("key2", b"value2".to_vec(), None).await.unwrap();
504        backend.put("key3", b"value3".to_vec(), None).await.unwrap();
505        backend.put("key4", b"value4".to_vec(), None).await.unwrap();
506        
507        // Verify all keys exist
508        assert!(backend.exists("key1").await.unwrap());
509        assert!(backend.exists("key2").await.unwrap());
510        assert!(backend.exists("key3").await.unwrap());
511        assert!(backend.exists("key4").await.unwrap());
512        
513        // Get initial stats
514        let initial_stats = backend.stats().await.unwrap();
515        assert_eq!(initial_stats.total_keys, 4);
516        
517        // Remove multiple keys at once
518        let keys_to_remove = ["key1", "key2", "key3"];
519        let removed_count = backend.forget_many(&keys_to_remove).await.unwrap();
520        assert_eq!(removed_count, 3);
521        
522        // Verify keys were removed
523        assert!(!backend.exists("key1").await.unwrap());
524        assert!(!backend.exists("key2").await.unwrap());
525        assert!(!backend.exists("key3").await.unwrap());
526        assert!(backend.exists("key4").await.unwrap());
527        
528        // Check stats were updated correctly
529        let final_stats = backend.stats().await.unwrap();
530        assert_eq!(final_stats.total_keys, 1);
531        assert!(final_stats.memory_usage < initial_stats.memory_usage);
532        
533        // Test removing non-existent keys
534        let nonexistent_keys = ["nonexistent1", "nonexistent2"];
535        let removed_count = backend.forget_many(&nonexistent_keys).await.unwrap();
536        assert_eq!(removed_count, 0);
537        
538        // Test empty key array
539        let empty_keys: Vec<&str> = vec![];
540        let removed_count = backend.forget_many(&empty_keys).await.unwrap();
541        assert_eq!(removed_count, 0);
542    }
543
544    #[tokio::test]
545    async fn test_memory_backend_flush() {
546        let backend = MemoryBackend::new(CacheConfig::default());
547        
548        // Add some data
549        backend.put("test1", b"value1".to_vec(), None).await.unwrap();
550        backend.put("test2", b"value2".to_vec(), None).await.unwrap();
551        
552        // Verify data exists
553        assert!(backend.exists("test1").await.unwrap());
554        assert!(backend.exists("test2").await.unwrap());
555        
556        // Flush cache
557        backend.flush().await.unwrap();
558        
559        // Verify cache is empty
560        assert!(!backend.exists("test1").await.unwrap());
561        assert!(!backend.exists("test2").await.unwrap());
562        
563        let stats = backend.stats().await.unwrap();
564        assert_eq!(stats.total_keys, 0);
565        assert_eq!(stats.memory_usage, 0);
566    }
567
568    #[tokio::test]
569    async fn test_lru_tracker_consistency() {
570        // Create a backend that starts with small LRU capacity to test edge case
571        let backend = MemoryBackend::new(CacheConfig::default());
572        
573        // Fill the LRU tracker to its initial capacity (1000 items)
574        // This tests that the LRU tracker properly resizes without evicting tracked items
575        for i in 0..1200 {
576            let key = format!("consistency_test_{}", i);
577            let value = format!("value_{}", i).into_bytes();
578            backend.put(&key, value, None).await.unwrap();
579        }
580        
581        // All items should still be accessible (no premature LRU eviction)
582        for i in 0..1200 {
583            let key = format!("consistency_test_{}", i);
584            assert!(backend.exists(&key).await.unwrap(), 
585                   "Key {} should exist but was not found", key);
586        }
587        
588        // Access some items to change LRU order
589        for i in (0..100).rev() {
590            let key = format!("consistency_test_{}", i);
591            backend.get(&key).await.unwrap();
592        }
593        
594        // Verify LRU tracking is still consistent - all items should still be accessible
595        for i in 0..1200 {
596            let key = format!("consistency_test_{}", i);
597            assert!(backend.exists(&key).await.unwrap(), 
598                   "Key {} should still exist after LRU access", key);
599        }
600    }
601}