warpdrive_proxy/cache/
memory.rs

1//! In-memory LRU cache implementation
2//!
3//! This module provides a high-performance in-memory cache with:
4//! - LRU eviction using randomized sampling
5//! - TTL-based expiration
6//! - Thread-safe concurrent access via RwLock
7//! - Configurable capacity and max item size
8//!
9//! # Eviction Strategy
10//!
11//! The cache uses a probabilistic LRU approximation:
12//! - When eviction is needed, randomly sample 5 entries
13//! - Evict the oldest one (by last_accessed_at)
14//! - If an expired entry is found during sampling, evict it immediately
15//!
16//! This approach is O(5) = O(1) instead of O(n) for scanning all entries,
17//! while still evicting items in the oldest ~20% on average.
18//!
19//! # Performance Characteristics
20//!
21//! - Get: O(1) hash lookup + O(1) expiry check
22//! - Set: O(1) hash insertion + O(5) eviction sampling if needed
23//! - Thread-safe: Uses parking_lot::RwLock for better performance than std::Mutex
24//! - Zero-copy when possible: Returns references where applicable
25//!
26//! # Example
27//!
28//! ```no_run
29//! use warpdrive::cache::memory::MemoryCache;
30//! use warpdrive::cache::Cache;
31//!
32//! # async fn example() -> anyhow::Result<()> {
33//! // 32 MB cache, 1 MB max item size
34//! let cache = MemoryCache::new(32 * 1024 * 1024, 1024 * 1024);
35//!
36//! // Store data with 60 second TTL
37//! cache.set("user:123", b"user_data", 60).await?;
38//!
39//! // Retrieve data
40//! if let Some(data) = cache.get("user:123").await? {
41//!     println!("Found {} bytes", data.len());
42//! }
43//! # Ok(())
44//! # }
45//! ```
46
47use async_trait::async_trait;
48use parking_lot::RwLock;
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::time::{Duration, SystemTime};
52
53use crate::cache::Cache;
54
55/// Hash key type for cache entries
56///
57/// We use u64 hashing for efficient key storage. The hash is computed
58/// from the string key using a fast hashing algorithm.
59type CacheKey = u64;
60
61/// Cache entry with metadata
62///
63/// Stores the value along with access time and expiration metadata
64/// required for LRU eviction and TTL support.
65#[derive(Clone)]
66struct MemoryCacheEntry {
67    /// Last time this entry was accessed (for LRU)
68    last_accessed_at: SystemTime,
69
70    /// Absolute time when this entry expires
71    expires_at: SystemTime,
72
73    /// The cached value (binary data)
74    value: Vec<u8>,
75}
76
77/// Inner cache state protected by RwLock
78///
79/// All mutable state is contained within this struct, which is
80/// protected by a RwLock for thread-safe concurrent access.
81struct MemoryCacheInner {
82    /// Maximum total size in bytes
83    capacity: usize,
84
85    /// Maximum size of a single item in bytes
86    max_item_size: usize,
87
88    /// Current total size of all cached items
89    size: usize,
90
91    /// List of cache keys (for random sampling during eviction)
92    keys: Vec<CacheKey>,
93
94    /// Map of cache key to entry
95    items: HashMap<CacheKey, MemoryCacheEntry>,
96}
97
98/// In-memory LRU cache with TTL support
99///
100/// This cache provides ultra-fast local caching for frequently accessed data.
101/// It complements distributed caches (like Redis) by providing:
102/// - Sub-microsecond latency (no network round trip)
103/// - Automatic memory management via LRU eviction
104/// - TTL-based expiration
105/// - Thread-safe concurrent access
106///
107/// # Thread Safety
108///
109/// The cache uses `Arc<RwLock<...>>` for thread-safe access:
110/// - Multiple concurrent reads (via read lock)
111/// - Exclusive writes (via write lock)
112/// - Clone is cheap (just increments Arc reference count)
113///
114/// # Size Management
115///
116/// The cache tracks size in bytes and enforces two limits:
117/// - Total capacity: Sum of all cached values
118/// - Max item size: Individual value size limit
119///
120/// When capacity is exceeded, items are evicted using probabilistic LRU.
121#[derive(Clone)]
122pub struct MemoryCache {
123    inner: Arc<RwLock<MemoryCacheInner>>,
124}
125
126impl MemoryCache {
127    /// Create a new in-memory cache
128    ///
129    /// # Parameters
130    ///
131    /// - `capacity`: Maximum total size in bytes (sum of all values)
132    /// - `max_item_size`: Maximum size of a single item in bytes
133    ///
134    /// # Example
135    ///
136    /// ```
137    /// use warpdrive::cache::memory::MemoryCache;
138    ///
139    /// // 32 MB cache with 1 MB max item size
140    /// let cache = MemoryCache::new(32 * 1024 * 1024, 1024 * 1024);
141    /// ```
142    pub fn new(capacity: usize, max_item_size: usize) -> Self {
143        MemoryCache {
144            inner: Arc::new(RwLock::new(MemoryCacheInner {
145                capacity,
146                max_item_size,
147                size: 0,
148                keys: Vec::new(),
149                items: HashMap::new(),
150            })),
151        }
152    }
153
154    /// Hash a string key to a u64
155    ///
156    /// Uses the default hash algorithm (SipHash 1-3) for security and speed.
157    #[inline]
158    fn hash_key(key: &str) -> CacheKey {
159        use std::collections::hash_map::DefaultHasher;
160        use std::hash::{Hash, Hasher};
161
162        let mut hasher = DefaultHasher::new();
163        key.hash(&mut hasher);
164        hasher.finish()
165    }
166
167    /// Get current time (used internally)
168    ///
169    /// Isolated to a function for potential testing/mocking in the future.
170    #[inline]
171    fn current_time() -> SystemTime {
172        SystemTime::now()
173    }
174
175    /// Internal get implementation with write lock
176    ///
177    /// This is used by the public get() method and requires a write lock
178    /// because we update last_accessed_at on cache hits.
179    fn get_internal(&self, key: CacheKey) -> Option<Vec<u8>> {
180        let mut inner = self.inner.write();
181        let now = Self::current_time();
182
183        // Check if entry exists
184        let entry = inner.items.get_mut(&key)?;
185
186        // Check if expired
187        if entry.expires_at < now {
188            tracing::debug!("Cache: entry expired");
189            return None;
190        }
191
192        // Update last accessed time (for LRU)
193        entry.last_accessed_at = now;
194
195        // Return cloned value
196        Some(entry.value.clone())
197    }
198
199    /// Internal set implementation
200    ///
201    /// Handles size validation, eviction, and insertion.
202    fn set_internal(&self, key: CacheKey, value: Vec<u8>, expires_at: SystemTime) {
203        let mut inner = self.inner.write();
204
205        let item_size = value.len();
206
207        // Reject items that are too large
208        if item_size > inner.max_item_size || item_size > inner.capacity {
209            tracing::debug!(
210                "Cache: item is too large to store, len={}, max_item_size={}, capacity={}",
211                item_size,
212                inner.max_item_size,
213                inner.capacity
214            );
215            return;
216        }
217
218        // Evict items until we have enough space
219        let limit = inner.capacity - item_size;
220        while inner.size > limit {
221            tracing::debug!(
222                "Cache: evicting item to make space, current_size={}, need_size={}",
223                inner.size,
224                limit
225            );
226            Self::evict_oldest_item(&mut inner);
227        }
228
229        // If key exists, subtract old value size
230        if let Some(existing) = inner.items.get(&key) {
231            inner.size -= existing.value.len();
232        } else {
233            // New key, add to keys list
234            inner.keys.push(key);
235        }
236
237        // Insert new entry
238        inner.items.insert(
239            key,
240            MemoryCacheEntry {
241                last_accessed_at: Self::current_time(),
242                expires_at,
243                value,
244            },
245        );
246
247        // Update total size
248        inner.size += item_size;
249
250        tracing::debug!(
251            "Cache: added item, key={}, size={}, expires_at={:?}",
252            key,
253            item_size,
254            expires_at
255        );
256    }
257
258    /// Evict the oldest item using randomized sampling
259    ///
260    /// This implements a probabilistic LRU eviction strategy:
261    /// 1. Pick 5 random items from the cache
262    /// 2. Find the one with the oldest last_accessed_at
263    /// 3. If any expired item is found, evict it immediately
264    /// 4. Remove the selected item from cache
265    ///
266    /// This is O(5) = O(1) instead of O(n), while still providing
267    /// good approximation of LRU behavior (evicts from oldest ~20%).
268    fn evict_oldest_item(inner: &mut MemoryCacheInner) {
269        use rand::Rng;
270
271        if inner.keys.is_empty() {
272            return;
273        }
274
275        let mut oldest_key: CacheKey = 0;
276        let mut oldest_index: usize = 0;
277        let mut oldest_time = SystemTime::UNIX_EPOCH;
278
279        let now = Self::current_time();
280        let mut rng = rand::rng();
281
282        // Sample up to 5 random items (or fewer if cache is small)
283        let sample_size = std::cmp::min(5, inner.keys.len());
284
285        for _ in 0..sample_size {
286            let index = rng.random_range(0..inner.keys.len());
287            let key = inner.keys[index];
288
289            if let Some(entry) = inner.items.get(&key) {
290                // If expired, evict immediately
291                if entry.expires_at < now {
292                    oldest_key = key;
293                    oldest_index = index;
294                    break;
295                }
296
297                // Track oldest by last_accessed_at
298                if oldest_time == SystemTime::UNIX_EPOCH || entry.last_accessed_at < oldest_time {
299                    oldest_time = entry.last_accessed_at;
300                    oldest_key = key;
301                    oldest_index = index;
302                }
303            }
304        }
305
306        // Remove from keys list (swap with last element, then pop)
307        let keys_len = inner.keys.len();
308        if oldest_index < keys_len {
309            inner.keys.swap(oldest_index, keys_len - 1);
310            inner.keys.pop();
311        }
312
313        // Remove from items map and update size
314        if let Some(entry) = inner.items.remove(&oldest_key) {
315            inner.size -= entry.value.len();
316            tracing::debug!(
317                "Cache: evicted item, key={}, size={}",
318                oldest_key,
319                entry.value.len()
320            );
321        }
322    }
323
324    /// Get cache statistics (for debugging/monitoring)
325    ///
326    /// Returns current size, capacity, and item count.
327    ///
328    /// # Example
329    ///
330    /// ```
331    /// # use warpdrive::cache::memory::MemoryCache;
332    /// let cache = MemoryCache::new(1024, 256);
333    /// let (size, capacity, count) = cache.stats();
334    /// println!("Cache: {}/{} bytes, {} items", size, capacity, count);
335    /// ```
336    pub fn stats(&self) -> (usize, usize, usize) {
337        let inner = self.inner.read();
338        (inner.size, inner.capacity, inner.items.len())
339    }
340
341    /// Clear all entries from the cache
342    ///
343    /// Resets the cache to empty state.
344    ///
345    /// # Example
346    ///
347    /// ```
348    /// # use warpdrive::cache::memory::MemoryCache;
349    /// let cache = MemoryCache::new(1024, 256);
350    /// // ... use cache ...
351    /// cache.clear();
352    /// ```
353    pub fn clear(&self) {
354        let mut inner = self.inner.write();
355        inner.keys.clear();
356        inner.items.clear();
357        inner.size = 0;
358        tracing::debug!("Cache: cleared all entries");
359    }
360}
361
362#[async_trait]
363impl Cache for MemoryCache {
364    async fn get(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
365        let hash = Self::hash_key(key);
366        Ok(self.get_internal(hash))
367    }
368
369    async fn set(&self, key: &str, value: &[u8], ttl_seconds: u64) -> anyhow::Result<()> {
370        let hash = Self::hash_key(key);
371        let expires_at = if ttl_seconds > 0 {
372            Self::current_time() + Duration::from_secs(ttl_seconds)
373        } else {
374            // No TTL = expire far in future (100 years)
375            Self::current_time() + Duration::from_secs(100 * 365 * 24 * 3600)
376        };
377
378        self.set_internal(hash, value.to_vec(), expires_at);
379        Ok(())
380    }
381
382    async fn delete(&self, key: &str) -> anyhow::Result<()> {
383        let hash = Self::hash_key(key);
384        let mut inner = self.inner.write();
385
386        // Remove from items map
387        if let Some(entry) = inner.items.remove(&hash) {
388            inner.size -= entry.value.len();
389
390            // Remove from keys list
391            if let Some(pos) = inner.keys.iter().position(|&k| k == hash) {
392                let keys_len = inner.keys.len();
393                inner.keys.swap(pos, keys_len - 1);
394                inner.keys.pop();
395            }
396
397            tracing::debug!("Cache: deleted item, key={}", hash);
398        }
399
400        Ok(())
401    }
402
403    async fn exists(&self, key: &str) -> anyhow::Result<bool> {
404        let hash = Self::hash_key(key);
405        let inner = self.inner.read();
406        let now = Self::current_time();
407
408        if let Some(entry) = inner.items.get(&hash) {
409            // Check if not expired
410            Ok(entry.expires_at >= now)
411        } else {
412            Ok(false)
413        }
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    const KB: usize = 1024;
422    const MB: usize = 1024 * KB;
423
424    #[tokio::test]
425    async fn test_store_and_retrieve() {
426        let cache = MemoryCache::new(32 * MB, 1 * MB);
427        cache
428            .set("test_key", b"hello world", 30)
429            .await
430            .expect("set failed");
431
432        let value = cache.get("test_key").await.expect("get failed");
433        assert_eq!(value, Some(b"hello world".to_vec()));
434    }
435
436    #[tokio::test]
437    async fn test_storing_updates_existing_value() {
438        let cache = MemoryCache::new(32 * MB, 1 * MB);
439        cache.set("key", b"first", 30).await.expect("set failed");
440        cache.set("key", b"second", 30).await.expect("set failed");
441
442        let value = cache.get("key").await.expect("get failed");
443        assert_eq!(value, Some(b"second".to_vec()));
444    }
445
446    #[tokio::test]
447    async fn test_storing_existing_value_keeps_size_correct() {
448        let cache = MemoryCache::new(32 * MB, 1 * MB);
449        cache.set("key", b"first", 30).await.expect("set failed");
450        cache.set("key", b"second", 30).await.expect("set failed");
451
452        let (size, _, count) = cache.stats();
453        assert_eq!(count, 1);
454        assert_eq!(size, 6); // "second".len() == 6
455    }
456
457    #[tokio::test]
458    async fn test_expiry() {
459        let cache = MemoryCache::new(32 * MB, 1 * MB);
460
461        // Set with 1 second TTL
462        cache
463            .set("key", b"hello world", 1)
464            .await
465            .expect("set failed");
466
467        // Should be available immediately
468        let value = cache.get("key").await.expect("get failed");
469        assert_eq!(value, Some(b"hello world".to_vec()));
470
471        // Wait for expiration
472        tokio::time::sleep(Duration::from_secs(2)).await;
473
474        // Should be expired
475        let value = cache.get("key").await.expect("get failed");
476        assert_eq!(value, None);
477    }
478
479    #[tokio::test]
480    async fn test_does_not_store_items_over_cache_limit() {
481        let cache = MemoryCache::new(3 * KB, 50 * KB);
482
483        let payload = vec![0u8; 10 * KB];
484        cache.set("key", &payload, 3600).await.expect("set failed");
485
486        let value = cache.get("key").await.expect("get failed");
487        assert_eq!(value, None);
488    }
489
490    #[tokio::test]
491    async fn test_cache_of_size_zero_does_not_store_items() {
492        let cache = MemoryCache::new(0, 1 * KB);
493
494        cache
495            .set("key", b"no storage", 3600)
496            .await
497            .expect("set failed");
498
499        let value = cache.get("key").await.expect("get failed");
500        assert_eq!(value, None);
501    }
502
503    #[tokio::test]
504    async fn test_items_are_evicted_to_make_space() {
505        let max_cache_size = 10 * KB;
506        let cache = MemoryCache::new(max_cache_size, 1 * KB);
507
508        // Fill cache with 20 items of 1KB each
509        for i in 0u32..20 {
510            let key = format!("key_{}", i);
511            let payload = vec![i as u8; 1 * KB];
512            cache.set(&key, &payload, 3600).await.expect("set failed");
513
514            // Most recent item should always be available
515            let value = cache.get(&key).await.expect("get failed");
516            assert_eq!(value, Some(payload));
517        }
518
519        // Cache should be at capacity
520        let (size, capacity, _) = cache.stats();
521        assert_eq!(size, capacity);
522        assert_eq!(size, max_cache_size);
523    }
524
525    #[tokio::test]
526    async fn test_does_not_store_items_over_item_limit() {
527        let cache = MemoryCache::new(50 * KB, 3 * KB);
528
529        let payload = vec![0u8; 10 * KB];
530        cache.set("key", &payload, 3600).await.expect("set failed");
531
532        let value = cache.get("key").await.expect("get failed");
533        assert_eq!(value, None);
534    }
535
536    #[tokio::test]
537    async fn test_delete() {
538        let cache = MemoryCache::new(32 * MB, 1 * MB);
539        cache.set("key", b"value", 30).await.expect("set failed");
540
541        // Verify exists
542        let exists = cache.exists("key").await.expect("exists failed");
543        assert!(exists);
544
545        // Delete
546        cache.delete("key").await.expect("delete failed");
547
548        // Verify deleted
549        let exists = cache.exists("key").await.expect("exists failed");
550        assert!(!exists);
551
552        let value = cache.get("key").await.expect("get failed");
553        assert_eq!(value, None);
554    }
555
556    #[tokio::test]
557    async fn test_exists() {
558        let cache = MemoryCache::new(32 * MB, 1 * MB);
559
560        // Non-existent key
561        let exists = cache.exists("missing").await.expect("exists failed");
562        assert!(!exists);
563
564        // Add key
565        cache.set("key", b"value", 30).await.expect("set failed");
566
567        // Should exist
568        let exists = cache.exists("key").await.expect("exists failed");
569        assert!(exists);
570    }
571
572    #[tokio::test]
573    async fn test_exists_returns_false_for_expired() {
574        let cache = MemoryCache::new(32 * MB, 1 * MB);
575
576        // Set with 1 second TTL
577        cache.set("key", b"value", 1).await.expect("set failed");
578
579        // Should exist initially
580        let exists = cache.exists("key").await.expect("exists failed");
581        assert!(exists);
582
583        // Wait for expiration
584        tokio::time::sleep(Duration::from_secs(2)).await;
585
586        // Should not exist after expiration
587        let exists = cache.exists("key").await.expect("exists failed");
588        assert!(!exists);
589    }
590
591    #[tokio::test]
592    async fn test_clear() {
593        let cache = MemoryCache::new(32 * MB, 1 * MB);
594
595        // Add multiple items
596        for i in 0..10 {
597            cache
598                .set(&format!("key_{}", i), b"value", 30)
599                .await
600                .expect("set failed");
601        }
602
603        let (_, _, count_before) = cache.stats();
604        assert_eq!(count_before, 10);
605
606        // Clear cache
607        cache.clear();
608
609        let (size, _, count_after) = cache.stats();
610        assert_eq!(count_after, 0);
611        assert_eq!(size, 0);
612    }
613
614    #[tokio::test]
615    async fn test_concurrent_access() {
616        use std::sync::Arc;
617        use tokio::task;
618
619        let cache = Arc::new(MemoryCache::new(32 * MB, 1 * MB));
620
621        // Spawn multiple tasks doing concurrent reads/writes
622        let mut handles = vec![];
623
624        for i in 0..10 {
625            let cache = Arc::clone(&cache);
626            let handle = task::spawn(async move {
627                for j in 0..100 {
628                    let key = format!("key_{}_{}", i, j);
629                    let value = format!("value_{}_{}", i, j);
630
631                    cache.set(&key, value.as_bytes(), 30).await.unwrap();
632
633                    let retrieved = cache.get(&key).await.unwrap();
634                    assert_eq!(retrieved, Some(value.as_bytes().to_vec()));
635                }
636            });
637            handles.push(handle);
638        }
639
640        // Wait for all tasks to complete
641        for handle in handles {
642            handle.await.expect("task failed");
643        }
644
645        // Verify cache has items
646        let (_, _, count) = cache.stats();
647        assert!(count > 0);
648    }
649
650    #[test]
651    fn test_hash_key_consistency() {
652        let key1 = "test_key";
653        let key2 = "test_key";
654        let key3 = "different_key";
655
656        assert_eq!(MemoryCache::hash_key(key1), MemoryCache::hash_key(key2));
657        assert_ne!(MemoryCache::hash_key(key1), MemoryCache::hash_key(key3));
658    }
659}