Skip to main content

oxigdal_cloud/cache/
eviction.rs

1//! LRU, LFU, and ARC eviction policy implementations
2
3#[cfg(feature = "cache")]
4use dashmap::DashMap;
5#[cfg(feature = "cache")]
6use lru::LruCache;
7use std::collections::VecDeque;
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11
12#[cfg(feature = "async")]
13use tokio::sync::RwLock;
14
15use bytes::Bytes;
16use std::time::Duration;
17
18use super::CacheConfig;
19use super::metadata::{CacheEntry, CacheKey, CacheStats};
20use crate::error::{CacheError, CloudError, Result};
21
22/// LRU cache with TTL support
23#[cfg(feature = "cache")]
24pub struct LruTtlCache {
25    /// LRU cache storage
26    pub(crate) cache: Arc<RwLock<LruCache<CacheKey, CacheEntry>>>,
27    /// Current size tracking
28    pub(crate) current_size: Arc<AtomicUsize>,
29    /// Configuration
30    config: CacheConfig,
31    /// Statistics
32    stats: CacheStats,
33}
34
35#[cfg(feature = "cache")]
36impl LruTtlCache {
37    /// Creates a new LRU TTL cache
38    pub fn new(config: CacheConfig) -> Result<Self> {
39        let capacity = NonZeroUsize::new(config.max_entries.max(1)).ok_or_else(|| {
40            CloudError::Cache(CacheError::Full {
41                message: "Invalid cache capacity".to_string(),
42            })
43        })?;
44
45        Ok(Self {
46            cache: Arc::new(RwLock::new(LruCache::new(capacity))),
47            current_size: Arc::new(AtomicUsize::new(0)),
48            config,
49            stats: CacheStats::default(),
50        })
51    }
52
53    /// Gets an entry from the cache
54    pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
55        let mut cache = self.cache.write().await;
56
57        if let Some(entry) = cache.get_mut(key) {
58            // Check TTL expiration
59            if entry.is_expired() {
60                let size = entry.size;
61                cache.pop(key);
62                self.current_size.fetch_sub(size, Ordering::SeqCst);
63                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
64                return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
65            }
66
67            // Check max age
68            if let Some(max_age) = self.config.max_age {
69                if entry.age() > max_age {
70                    let size = entry.size;
71                    cache.pop(key);
72                    self.current_size.fetch_sub(size, Ordering::SeqCst);
73                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
74                    return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
75                }
76            }
77
78            entry.record_access();
79            self.stats.hits.fetch_add(1, Ordering::Relaxed);
80
81            let data = if entry.compressed {
82                self.decompress(&entry.data)?
83            } else {
84                entry.data.clone()
85            };
86
87            Ok(data)
88        } else {
89            self.stats.misses.fetch_add(1, Ordering::Relaxed);
90            Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
91        }
92    }
93
94    /// Puts an entry with optional TTL
95    pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
96        let (final_data, is_compressed) =
97            if self.config.compress && data.len() > self.config.compress_threshold {
98                (self.compress(&data)?, true)
99            } else {
100                (data, false)
101            };
102
103        let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
104            CacheEntry::with_ttl(final_data.clone(), is_compressed, ttl_duration)
105        } else {
106            CacheEntry::new(final_data.clone(), is_compressed)
107        };
108
109        let entry_size = entry.size;
110        let mut cache = self.cache.write().await;
111
112        // Evict expired entries first
113        self.evict_expired(&mut cache).await;
114
115        // Evict entries if necessary to make room
116        while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
117            && !cache.is_empty()
118        {
119            if let Some((_, evicted_entry)) = cache.pop_lru() {
120                self.current_size
121                    .fetch_sub(evicted_entry.size, Ordering::SeqCst);
122                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
123            }
124        }
125
126        if let Some(old_entry) = cache.put(key, entry) {
127            self.current_size
128                .fetch_sub(old_entry.size, Ordering::SeqCst);
129        }
130
131        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
132        self.stats.writes.fetch_add(1, Ordering::Relaxed);
133
134        Ok(())
135    }
136
137    /// Evicts expired entries
138    async fn evict_expired(&self, cache: &mut LruCache<CacheKey, CacheEntry>) {
139        let mut keys_to_remove = Vec::new();
140
141        for (key, entry) in cache.iter() {
142            if entry.is_expired() {
143                keys_to_remove.push(key.clone());
144            }
145        }
146
147        for key in keys_to_remove {
148            if let Some(entry) = cache.pop(&key) {
149                self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
150                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
151            }
152        }
153    }
154
155    /// Removes an entry
156    pub async fn remove(&self, key: &CacheKey) -> Result<()> {
157        let mut cache = self.cache.write().await;
158        if let Some(entry) = cache.pop(key) {
159            self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
160        }
161        Ok(())
162    }
163
164    /// Clears the cache
165    pub async fn clear(&self) -> Result<()> {
166        let mut cache = self.cache.write().await;
167        cache.clear();
168        self.current_size.store(0, Ordering::SeqCst);
169        Ok(())
170    }
171
172    /// Returns cache statistics
173    #[must_use]
174    pub fn stats(&self) -> &CacheStats {
175        &self.stats
176    }
177
178    /// Compresses data using flate2
179    fn compress(&self, data: &Bytes) -> Result<Bytes> {
180        use flate2::Compression;
181        use flate2::write::GzEncoder;
182        use std::io::Write;
183
184        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
185        encoder.write_all(data).map_err(|e| {
186            CloudError::Cache(CacheError::Compression {
187                message: format!("Compression failed: {e}"),
188            })
189        })?;
190
191        let compressed = encoder.finish().map_err(|e| {
192            CloudError::Cache(CacheError::Compression {
193                message: format!("Compression failed: {e}"),
194            })
195        })?;
196
197        Ok(Bytes::from(compressed))
198    }
199
200    /// Decompresses data
201    fn decompress(&self, data: &Bytes) -> Result<Bytes> {
202        use flate2::read::GzDecoder;
203        use std::io::Read;
204
205        let mut decoder = GzDecoder::new(&data[..]);
206        let mut decompressed = Vec::new();
207
208        decoder.read_to_end(&mut decompressed).map_err(|e| {
209            CloudError::Cache(CacheError::Decompression {
210                message: format!("Decompression failed: {e}"),
211            })
212        })?;
213
214        Ok(Bytes::from(decompressed))
215    }
216}
217
218/// LFU (Least Frequently Used) cache
219#[cfg(feature = "cache")]
220pub struct LfuCache {
221    /// Storage map
222    storage: Arc<DashMap<CacheKey, CacheEntry>>,
223    /// Frequency tracking
224    frequency_map: Arc<DashMap<CacheKey, u64>>,
225    /// Minimum frequency tracker
226    min_frequency: Arc<AtomicU64>,
227    /// Current size
228    current_size: Arc<AtomicUsize>,
229    /// Configuration
230    config: CacheConfig,
231    /// Statistics
232    stats: CacheStats,
233}
234
235#[cfg(feature = "cache")]
236impl LfuCache {
237    /// Creates a new LFU cache
238    pub fn new(config: CacheConfig) -> Self {
239        Self {
240            storage: Arc::new(DashMap::new()),
241            frequency_map: Arc::new(DashMap::new()),
242            min_frequency: Arc::new(AtomicU64::new(0)),
243            current_size: Arc::new(AtomicUsize::new(0)),
244            config,
245            stats: CacheStats::default(),
246        }
247    }
248
249    /// Gets an entry
250    pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
251        if let Some(mut entry) = self.storage.get_mut(key) {
252            // Check expiration
253            if entry.is_expired() {
254                drop(entry);
255                self.remove(key).await?;
256                return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
257            }
258
259            entry.record_access();
260            self.frequency_map
261                .entry(key.clone())
262                .and_modify(|f| *f += 1)
263                .or_insert(1);
264
265            self.stats.hits.fetch_add(1, Ordering::Relaxed);
266            Ok(entry.data.clone())
267        } else {
268            self.stats.misses.fetch_add(1, Ordering::Relaxed);
269            Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
270        }
271    }
272
273    /// Puts an entry
274    pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
275        let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
276            CacheEntry::with_ttl(data, false, ttl_duration)
277        } else {
278            CacheEntry::new(data, false)
279        };
280
281        let entry_size = entry.size;
282
283        // Evict entries if necessary
284        while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
285            && !self.storage.is_empty()
286        {
287            self.evict_lfu().await;
288        }
289
290        if let Some(old_entry) = self.storage.insert(key.clone(), entry) {
291            self.current_size
292                .fetch_sub(old_entry.size, Ordering::SeqCst);
293        }
294
295        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
296        self.frequency_map.insert(key, 1);
297        self.min_frequency.store(1, Ordering::SeqCst);
298        self.stats.writes.fetch_add(1, Ordering::Relaxed);
299
300        Ok(())
301    }
302
303    /// Evicts the least frequently used entry
304    async fn evict_lfu(&self) {
305        // Find the entry with minimum frequency
306        let mut min_freq = u64::MAX;
307        let mut min_key: Option<String> = None;
308
309        for entry in self.frequency_map.iter() {
310            if *entry.value() < min_freq {
311                min_freq = *entry.value();
312                min_key = Some(entry.key().clone());
313            }
314        }
315
316        if let Some(key) = min_key {
317            if let Some((_, entry)) = self.storage.remove(&key) {
318                self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
319                self.frequency_map.remove(&key);
320                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
321            }
322        }
323    }
324
325    /// Removes an entry
326    pub async fn remove(&self, key: &CacheKey) -> Result<()> {
327        if let Some((_, entry)) = self.storage.remove(key) {
328            self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
329            self.frequency_map.remove(key);
330        }
331        Ok(())
332    }
333
334    /// Clears the cache
335    pub async fn clear(&self) -> Result<()> {
336        self.storage.clear();
337        self.frequency_map.clear();
338        self.current_size.store(0, Ordering::SeqCst);
339        self.min_frequency.store(0, Ordering::SeqCst);
340        Ok(())
341    }
342
343    /// Returns cache statistics
344    #[must_use]
345    pub fn stats(&self) -> &CacheStats {
346        &self.stats
347    }
348}
349
350/// ARC (Adaptive Replacement Cache)
351///
352/// Combines LRU and LFU strategies adaptively based on access patterns.
353#[cfg(feature = "cache")]
354pub struct ArcCache {
355    /// T1: Recent entries (LRU)
356    t1: Arc<RwLock<VecDeque<CacheKey>>>,
357    /// T2: Frequent entries (LFU)
358    t2: Arc<RwLock<VecDeque<CacheKey>>>,
359    /// B1: Ghost entries from T1
360    b1: Arc<RwLock<VecDeque<CacheKey>>>,
361    /// B2: Ghost entries from T2
362    b2: Arc<RwLock<VecDeque<CacheKey>>>,
363    /// Data storage
364    storage: Arc<DashMap<CacheKey, CacheEntry>>,
365    /// Adaptation parameter p
366    p: Arc<RwLock<f64>>,
367    /// Target cache size
368    c: usize,
369    /// Current size
370    current_size: Arc<AtomicUsize>,
371    /// Configuration
372    config: CacheConfig,
373    /// Statistics
374    stats: CacheStats,
375}
376
377#[cfg(feature = "cache")]
378impl ArcCache {
379    /// Creates a new ARC cache
380    pub fn new(config: CacheConfig) -> Self {
381        let c = config.max_entries;
382        Self {
383            t1: Arc::new(RwLock::new(VecDeque::new())),
384            t2: Arc::new(RwLock::new(VecDeque::new())),
385            b1: Arc::new(RwLock::new(VecDeque::new())),
386            b2: Arc::new(RwLock::new(VecDeque::new())),
387            storage: Arc::new(DashMap::new()),
388            p: Arc::new(RwLock::new(0.0)),
389            c,
390            current_size: Arc::new(AtomicUsize::new(0)),
391            config,
392            stats: CacheStats::default(),
393        }
394    }
395
396    /// Gets an entry using ARC algorithm
397    pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
398        // Check if in T1 or T2
399        if let Some(mut entry) = self.storage.get_mut(key) {
400            if entry.is_expired() {
401                drop(entry);
402                self.remove(key).await?;
403                return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
404            }
405
406            entry.record_access();
407
408            // Move from T1 to T2 (if in T1)
409            let mut t1 = self.t1.write().await;
410            if let Some(pos) = t1.iter().position(|k| k == key) {
411                t1.remove(pos);
412                let mut t2 = self.t2.write().await;
413                t2.push_back(key.clone());
414            }
415
416            self.stats.hits.fetch_add(1, Ordering::Relaxed);
417            Ok(entry.data.clone())
418        } else {
419            self.stats.misses.fetch_add(1, Ordering::Relaxed);
420            Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
421        }
422    }
423
424    /// Puts an entry using ARC algorithm
425    pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
426        let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
427            CacheEntry::with_ttl(data, false, ttl_duration)
428        } else {
429            CacheEntry::new(data, false)
430        };
431
432        let entry_size = entry.size;
433
434        // Check ghost lists and adapt
435        let in_b1 = {
436            let b1 = self.b1.read().await;
437            b1.contains(&key)
438        };
439        let in_b2 = {
440            let b2 = self.b2.read().await;
441            b2.contains(&key)
442        };
443
444        if in_b1 {
445            // Case II: key in B1 (recently evicted from T1)
446            // Adapt p: increase preference for T1
447            let b1_len = self.b1.read().await.len();
448            let b2_len = self.b2.read().await.len();
449            let delta = if b1_len >= b2_len {
450                1.0
451            } else {
452                b2_len as f64 / b1_len as f64
453            };
454            let mut p = self.p.write().await;
455            *p = (*p + delta).min(self.c as f64);
456
457            // Remove from B1
458            let mut b1 = self.b1.write().await;
459            if let Some(pos) = b1.iter().position(|k| k == &key) {
460                b1.remove(pos);
461            }
462        } else if in_b2 {
463            // Case III: key in B2 (recently evicted from T2)
464            // Adapt p: decrease preference for T1
465            let b1_len = self.b1.read().await.len();
466            let b2_len = self.b2.read().await.len();
467            let delta = if b2_len >= b1_len {
468                1.0
469            } else {
470                b1_len as f64 / b2_len as f64
471            };
472            let mut p = self.p.write().await;
473            *p = (*p - delta).max(0.0);
474
475            // Remove from B2
476            let mut b2 = self.b2.write().await;
477            if let Some(pos) = b2.iter().position(|k| k == &key) {
478                b2.remove(pos);
479            }
480        }
481
482        // Ensure we have space
483        while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size {
484            self.replace(&key).await;
485        }
486
487        // Add to T1 (new entries always go to T1)
488        let mut t1 = self.t1.write().await;
489        t1.push_back(key.clone());
490
491        if let Some(old) = self.storage.insert(key, entry) {
492            self.current_size.fetch_sub(old.size, Ordering::SeqCst);
493        }
494        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
495        self.stats.writes.fetch_add(1, Ordering::Relaxed);
496
497        Ok(())
498    }
499
500    /// ARC replacement policy
501    async fn replace(&self, _key: &CacheKey) {
502        let t1_len = self.t1.read().await.len();
503        let p = *self.p.read().await;
504
505        if !self.storage.is_empty() {
506            if t1_len > 0 && (t1_len as f64 > p || self.t2.read().await.is_empty()) {
507                // Evict from T1
508                let mut t1 = self.t1.write().await;
509                if let Some(evict_key) = t1.pop_front() {
510                    if let Some((_, entry)) = self.storage.remove(&evict_key) {
511                        self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
512                    }
513                    // Add to B1 ghost list
514                    let mut b1 = self.b1.write().await;
515                    b1.push_back(evict_key);
516                    // Limit ghost list size
517                    while b1.len() > self.c {
518                        b1.pop_front();
519                    }
520                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
521                }
522            } else {
523                // Evict from T2
524                let mut t2 = self.t2.write().await;
525                if let Some(evict_key) = t2.pop_front() {
526                    if let Some((_, entry)) = self.storage.remove(&evict_key) {
527                        self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
528                    }
529                    // Add to B2 ghost list
530                    let mut b2 = self.b2.write().await;
531                    b2.push_back(evict_key);
532                    // Limit ghost list size
533                    while b2.len() > self.c {
534                        b2.pop_front();
535                    }
536                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
537                }
538            }
539        }
540    }
541
542    /// Removes an entry
543    pub async fn remove(&self, key: &CacheKey) -> Result<()> {
544        if let Some((_, entry)) = self.storage.remove(key) {
545            self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
546        }
547
548        // Remove from T1 or T2
549        {
550            let mut t1 = self.t1.write().await;
551            if let Some(pos) = t1.iter().position(|k| k == key) {
552                t1.remove(pos);
553            }
554        }
555        {
556            let mut t2 = self.t2.write().await;
557            if let Some(pos) = t2.iter().position(|k| k == key) {
558                t2.remove(pos);
559            }
560        }
561
562        Ok(())
563    }
564
565    /// Clears the cache
566    pub async fn clear(&self) -> Result<()> {
567        self.storage.clear();
568        self.t1.write().await.clear();
569        self.t2.write().await.clear();
570        self.b1.write().await.clear();
571        self.b2.write().await.clear();
572        self.current_size.store(0, Ordering::SeqCst);
573        *self.p.write().await = 0.0;
574        Ok(())
575    }
576
577    /// Returns cache statistics
578    #[must_use]
579    pub fn stats(&self) -> &CacheStats {
580        &self.stats
581    }
582}