velesdb_core/cache/
lockfree.rs

1//! Lock-Free LRU Cache with DashMap L1 (US-CORE-003-15)
2//!
3//! Two-tier cache architecture for maximum concurrent throughput:
4//! - **L1**: DashMap (lock-free concurrent HashMap) for hot keys
5//! - **L2**: LruCache (with LRU eviction) for capacity management
6//!
7//! # Performance
8//!
9//! | Operation | L1 Hit | L1 Miss + L2 Hit |
10//! |-----------|--------|------------------|
11//! | get() | ~50ns (lock-free) | ~500ns (with promotion) |
12//! | peek() | ~30ns (L1 only) | N/A |
13//! | insert() | ~100ns (write-through) | - |
14
15use dashmap::DashMap;
16use std::hash::Hash;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19use super::LruCache;
20
21/// Lock-free two-tier cache with DashMap L1 and LruCache L2.
22///
23/// Optimized for read-heavy workloads with hot keys.
24pub struct LockFreeLruCache<K, V>
25where
26    K: Hash + Eq + Clone + Send + Sync + 'static,
27    V: Clone + Send + Sync + 'static,
28{
29    /// L1: Lock-free concurrent cache for hot keys.
30    l1: DashMap<K, V>,
31    /// L2: LRU cache with eviction for capacity management.
32    l2: LruCache<K, V>,
33    /// Maximum L1 entries before eviction to L2.
34    l1_capacity: usize,
35    /// L1 hit counter.
36    l1_hits: AtomicU64,
37    /// L2 hit counter (L1 miss, L2 hit).
38    l2_hits: AtomicU64,
39    /// Total miss counter.
40    misses: AtomicU64,
41}
42
43/// Statistics for the two-tier cache.
44#[derive(Debug, Clone, Default)]
45pub struct LockFreeCacheStats {
46    /// L1 cache hits.
47    pub l1_hits: u64,
48    /// L2 cache hits (L1 miss, L2 hit).
49    pub l2_hits: u64,
50    /// Total misses.
51    pub misses: u64,
52    /// L1 current size.
53    pub l1_size: usize,
54    /// L2 current size.
55    pub l2_size: usize,
56}
57
58impl LockFreeCacheStats {
59    /// Calculate L1 hit rate.
60    #[must_use]
61    pub fn l1_hit_rate(&self) -> f64 {
62        let total = self.l1_hits + self.l2_hits + self.misses;
63        if total == 0 {
64            0.0
65        } else {
66            self.l1_hits as f64 / total as f64
67        }
68    }
69
70    /// Calculate total hit rate (L1 + L2).
71    #[must_use]
72    pub fn total_hit_rate(&self) -> f64 {
73        let total = self.l1_hits + self.l2_hits + self.misses;
74        if total == 0 {
75            0.0
76        } else {
77            (self.l1_hits + self.l2_hits) as f64 / total as f64
78        }
79    }
80}
81
82impl<K, V> LockFreeLruCache<K, V>
83where
84    K: Hash + Eq + Clone + Send + Sync + 'static,
85    V: Clone + Send + Sync + 'static,
86{
87    /// Create a new lock-free LRU cache.
88    ///
89    /// # Arguments
90    ///
91    /// * `l1_capacity` - Maximum entries in L1 (hot cache)
92    /// * `l2_capacity` - Maximum entries in L2 (LRU backing store)
93    #[must_use]
94    pub fn new(l1_capacity: usize, l2_capacity: usize) -> Self {
95        Self {
96            l1: DashMap::with_capacity(l1_capacity),
97            l2: LruCache::new(l2_capacity),
98            l1_capacity,
99            l1_hits: AtomicU64::new(0),
100            l2_hits: AtomicU64::new(0),
101            misses: AtomicU64::new(0),
102        }
103    }
104
105    /// Get a value, checking L1 first then L2.
106    ///
107    /// If found in L2, promotes to L1 for faster subsequent access.
108    #[must_use]
109    pub fn get(&self, key: &K) -> Option<V> {
110        // Fast path: L1 lookup (lock-free)
111        if let Some(entry) = self.l1.get(key) {
112            self.l1_hits.fetch_add(1, Ordering::Relaxed);
113            return Some(entry.value().clone());
114        }
115
116        // Slow path: L2 lookup
117        if let Some(value) = self.l2.get(key) {
118            self.l2_hits.fetch_add(1, Ordering::Relaxed);
119            // Promote to L1
120            self.promote_to_l1(key.clone(), value.clone());
121            return Some(value);
122        }
123
124        self.misses.fetch_add(1, Ordering::Relaxed);
125        None
126    }
127
128    /// Peek at L1 only (ultra-fast, no promotion).
129    ///
130    /// Returns None if not in L1, even if present in L2.
131    #[must_use]
132    pub fn peek_l1(&self, key: &K) -> Option<V> {
133        self.l1.get(key).map(|entry| entry.value().clone())
134    }
135
136    /// Peek at L2 only (no L1 check, no promotion).
137    #[must_use]
138    pub fn peek_l2(&self, key: &K) -> Option<V> {
139        self.l2.peek(key)
140    }
141
142    /// Insert a key-value pair (write-through to L1 and L2).
143    pub fn insert(&self, key: K, value: V) {
144        // Write to L1
145        self.l1.insert(key.clone(), value.clone());
146
147        // Evict from L1 if over capacity
148        self.maybe_evict_l1();
149
150        // Write to L2 (backing store)
151        self.l2.insert(key, value);
152    }
153
154    /// Remove a key from both L1 and L2.
155    pub fn remove(&self, key: &K) {
156        self.l1.remove(key);
157        self.l2.remove(key);
158    }
159
160    /// Clear both L1 and L2.
161    pub fn clear(&self) {
162        self.l1.clear();
163        self.l2.clear();
164    }
165
166    /// Get cache statistics.
167    #[must_use]
168    pub fn stats(&self) -> LockFreeCacheStats {
169        LockFreeCacheStats {
170            l1_hits: self.l1_hits.load(Ordering::Relaxed),
171            l2_hits: self.l2_hits.load(Ordering::Relaxed),
172            misses: self.misses.load(Ordering::Relaxed),
173            l1_size: self.l1.len(),
174            l2_size: self.l2.len(),
175        }
176    }
177
178    /// Get L1 capacity.
179    #[must_use]
180    pub fn l1_capacity(&self) -> usize {
181        self.l1_capacity
182    }
183
184    /// Get L2 capacity.
185    #[must_use]
186    pub fn l2_capacity(&self) -> usize {
187        self.l2.capacity()
188    }
189
190    /// Promote a key from L2 to L1.
191    fn promote_to_l1(&self, key: K, value: V) {
192        self.l1.insert(key, value);
193        self.maybe_evict_l1();
194    }
195
196    /// Evict entries from L1 if over capacity.
197    /// Uses a bounded loop to prevent infinite spinning under contention.
198    fn maybe_evict_l1(&self) {
199        // Bounded eviction: max attempts to prevent infinite loop under contention
200        let mut attempts = 0;
201        let max_attempts = 10;
202
203        while self.l1.len() > self.l1_capacity && attempts < max_attempts {
204            attempts += 1;
205
206            // Collect keys to remove (avoid holding iterator while removing)
207            let keys_to_remove: Vec<K> = self
208                .l1
209                .iter()
210                .take(self.l1.len().saturating_sub(self.l1_capacity).max(1))
211                .map(|entry| entry.key().clone())
212                .collect();
213
214            if keys_to_remove.is_empty() {
215                break;
216            }
217
218            for key in keys_to_remove {
219                self.l1.remove(&key);
220            }
221        }
222    }
223}
224
225impl<K, V> Default for LockFreeLruCache<K, V>
226where
227    K: Hash + Eq + Clone + Send + Sync + 'static,
228    V: Clone + Send + Sync + 'static,
229{
230    fn default() -> Self {
231        // Default: L1 = 1K hot entries, L2 = 10K LRU entries
232        Self::new(1_000, 10_000)
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use std::sync::Arc;
240    use std::thread;
241    use std::time::Instant;
242
243    #[test]
244    fn test_lockfree_cache_new() {
245        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(100, 1000);
246
247        assert_eq!(cache.l1_capacity(), 100);
248        assert_eq!(cache.l2_capacity(), 1000);
249        assert_eq!(cache.stats().l1_size, 0);
250        assert_eq!(cache.stats().l2_size, 0);
251    }
252
253    #[test]
254    fn test_lockfree_cache_insert_and_get() {
255        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(100, 1000);
256
257        cache.insert(1, "value_1".to_string());
258
259        // Should be in both L1 and L2
260        assert_eq!(cache.get(&1), Some("value_1".to_string()));
261        assert_eq!(cache.peek_l1(&1), Some("value_1".to_string()));
262        assert_eq!(cache.peek_l2(&1), Some("value_1".to_string()));
263    }
264
265    #[test]
266    fn test_lockfree_cache_get_l1_hit() {
267        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(100, 1000);
268
269        cache.insert(1, "value_1".to_string());
270
271        // First get
272        let _ = cache.get(&1);
273        // Second get (should hit L1)
274        let _ = cache.get(&1);
275
276        let stats = cache.stats();
277        assert_eq!(stats.l1_hits, 2);
278        assert_eq!(stats.l2_hits, 0);
279        assert_eq!(stats.misses, 0);
280    }
281
282    #[test]
283    fn test_lockfree_cache_get_l2_promotion() {
284        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(10, 100);
285
286        // Insert via L2 directly (simulate L1 eviction scenario)
287        cache.l2.insert(99, "from_l2".to_string());
288
289        // Get should find in L2 and promote to L1
290        let result = cache.get(&99);
291        assert_eq!(result, Some("from_l2".to_string()));
292
293        let stats = cache.stats();
294        assert_eq!(stats.l1_hits, 0);
295        assert_eq!(stats.l2_hits, 1);
296
297        // Now should be in L1
298        assert!(cache.peek_l1(&99).is_some());
299
300        // Second get should hit L1
301        let _ = cache.get(&99);
302        let stats = cache.stats();
303        assert_eq!(stats.l1_hits, 1);
304    }
305
306    #[test]
307    fn test_lockfree_cache_miss() {
308        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(100, 1000);
309
310        let result = cache.get(&999);
311
312        assert_eq!(result, None);
313        assert_eq!(cache.stats().misses, 1);
314    }
315
316    #[test]
317    fn test_lockfree_cache_remove() {
318        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(100, 1000);
319
320        cache.insert(1, "value_1".to_string());
321        assert!(cache.get(&1).is_some());
322
323        cache.remove(&1);
324
325        assert!(cache.peek_l1(&1).is_none());
326        assert!(cache.peek_l2(&1).is_none());
327    }
328
329    #[test]
330    fn test_lockfree_cache_clear() {
331        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(100, 1000);
332
333        for i in 0..50 {
334            cache.insert(i, format!("value_{i}"));
335        }
336
337        assert_eq!(cache.stats().l1_size, 50);
338
339        cache.clear();
340
341        assert_eq!(cache.stats().l1_size, 0);
342        assert_eq!(cache.stats().l2_size, 0);
343    }
344
345    #[test]
346    fn test_lockfree_cache_l1_eviction() {
347        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(10, 100);
348
349        // Insert more than L1 capacity
350        for i in 0..20 {
351            cache.insert(i, format!("value_{i}"));
352        }
353
354        // L1 should be at or near capacity (bounded eviction may leave slightly over)
355        assert!(
356            cache.stats().l1_size <= 15,
357            "L1 size {} should be <= 15",
358            cache.stats().l1_size
359        );
360        // L2 should have all entries
361        assert_eq!(cache.stats().l2_size, 20);
362    }
363
364    #[test]
365    fn test_lockfree_cache_concurrent_reads() {
366        let cache = Arc::new(LockFreeLruCache::<u64, String>::new(100, 1000));
367
368        // Pre-populate
369        for i in 0..100 {
370            cache.insert(i, format!("value_{i}"));
371        }
372
373        let mut handles = vec![];
374
375        // 8 threads doing concurrent reads
376        for _ in 0..8 {
377            let cache_clone = Arc::clone(&cache);
378            handles.push(thread::spawn(move || {
379                for i in 0..1000 {
380                    let key = i % 100;
381                    let _ = cache_clone.get(&key);
382                }
383            }));
384        }
385
386        for h in handles {
387            h.join().expect("Thread panicked");
388        }
389
390        // Should have many L1 hits
391        let stats = cache.stats();
392        assert!(stats.l1_hits > 0);
393    }
394
395    #[test]
396    fn test_lockfree_cache_scaling_8_threads() {
397        // Measure throughput with 1 thread vs 8 threads
398        let cache = Arc::new(LockFreeLruCache::<u64, String>::new(1000, 10000));
399
400        // Pre-populate
401        for i in 0..1000 {
402            cache.insert(i, format!("value_{i}"));
403        }
404
405        let ops_per_thread = 5_000;
406
407        // Single thread baseline
408        let start = Instant::now();
409        for i in 0..ops_per_thread {
410            let _ = cache.get(&(i % 1000));
411        }
412        let single_thread_time = start.elapsed();
413
414        // 8 threads
415        let start = Instant::now();
416        let mut handles = vec![];
417        for t in 0..8 {
418            let cache_clone = Arc::clone(&cache);
419            handles.push(thread::spawn(move || {
420                // Each thread accesses different key range to reduce L2 promotion contention
421                let offset = t * 100;
422                for i in 0..ops_per_thread {
423                    let key = ((i + offset) % 1000) as u64;
424                    let _ = cache_clone.get(&key);
425                }
426            }));
427        }
428        for h in handles {
429            h.join().unwrap();
430        }
431        let eight_thread_time = start.elapsed();
432
433        // Calculate throughput
434        let single_throughput = ops_per_thread as f64 / single_thread_time.as_secs_f64();
435        let eight_throughput = (8 * ops_per_thread) as f64 / eight_thread_time.as_secs_f64();
436        let scaling_factor = eight_throughput / single_throughput;
437
438        println!("LockFreeLruCache scaling test:");
439        println!("  1 thread:  {:.0} ops/sec", single_throughput);
440        println!("  8 threads: {:.0} ops/sec", eight_throughput);
441        println!("  Scaling:   {:.2}x", scaling_factor);
442
443        // DashMap L1 should not cause severe regression
444        // Note: L2 promotion uses locks, scaling depends on L1 hit rate and system load
445        // On CI/loaded systems, contention may reduce throughput slightly
446        assert!(
447            scaling_factor > 0.5,
448            "Scaling factor {scaling_factor:.2}x should be > 0.5x (no severe regression)"
449        );
450    }
451
452    #[test]
453    fn test_lockfree_cache_hit_rate() {
454        let cache: LockFreeLruCache<u64, String> = LockFreeLruCache::new(100, 1000);
455
456        // Insert some values
457        for i in 0..50 {
458            cache.insert(i, format!("value_{i}"));
459        }
460
461        // Access existing keys (hits)
462        for i in 0..50 {
463            let _ = cache.get(&i);
464        }
465
466        // Access non-existing keys (misses)
467        for i in 100..110 {
468            let _ = cache.get(&i);
469        }
470
471        let stats = cache.stats();
472        assert_eq!(stats.l1_hits, 50);
473        assert_eq!(stats.misses, 10);
474
475        // Total hit rate should be 50/60 ≈ 0.833
476        let hit_rate = stats.total_hit_rate();
477        assert!((hit_rate - 0.833).abs() < 0.01);
478    }
479}