Skip to main content

oxigdal_cloud/cache/
eviction.rs

1//! LRU, LFU, and ARC eviction policy implementations
2
3#[cfg(feature = "cache")]
4use dashmap::DashMap;
5#[cfg(feature = "cache")]
6use lru::LruCache;
7use std::collections::VecDeque;
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11
12#[cfg(feature = "async")]
13use tokio::sync::RwLock;
14
15use bytes::Bytes;
16use std::time::Duration;
17
18use super::CacheConfig;
19use super::metadata::{CacheEntry, CacheKey, CacheStats};
20use crate::error::{CacheError, CloudError, Result};
21
22/// LRU cache with TTL support
23#[cfg(feature = "cache")]
24pub struct LruTtlCache {
25    /// LRU cache storage
26    pub(crate) cache: Arc<RwLock<LruCache<CacheKey, CacheEntry>>>,
27    /// Current size tracking
28    pub(crate) current_size: Arc<AtomicUsize>,
29    /// Configuration
30    config: CacheConfig,
31    /// Statistics
32    stats: CacheStats,
33}
34
35#[cfg(feature = "cache")]
36impl LruTtlCache {
37    /// Creates a new LRU TTL cache
38    pub fn new(config: CacheConfig) -> Result<Self> {
39        let capacity = NonZeroUsize::new(config.max_entries.max(1)).ok_or_else(|| {
40            CloudError::Cache(CacheError::Full {
41                message: "Invalid cache capacity".to_string(),
42            })
43        })?;
44
45        Ok(Self {
46            cache: Arc::new(RwLock::new(LruCache::new(capacity))),
47            current_size: Arc::new(AtomicUsize::new(0)),
48            config,
49            stats: CacheStats::default(),
50        })
51    }
52
53    /// Gets an entry from the cache
54    pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
55        let mut cache = self.cache.write().await;
56
57        if let Some(entry) = cache.get_mut(key) {
58            // Check TTL expiration
59            if entry.is_expired() {
60                let size = entry.size;
61                cache.pop(key);
62                self.current_size.fetch_sub(size, Ordering::SeqCst);
63                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
64                return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
65            }
66
67            // Check max age
68            if let Some(max_age) = self.config.max_age {
69                if entry.age() > max_age {
70                    let size = entry.size;
71                    cache.pop(key);
72                    self.current_size.fetch_sub(size, Ordering::SeqCst);
73                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
74                    return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
75                }
76            }
77
78            entry.record_access();
79            self.stats.hits.fetch_add(1, Ordering::Relaxed);
80
81            let data = if entry.compressed {
82                self.decompress(&entry.data)?
83            } else {
84                entry.data.clone()
85            };
86
87            Ok(data)
88        } else {
89            self.stats.misses.fetch_add(1, Ordering::Relaxed);
90            Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
91        }
92    }
93
94    /// Puts an entry with optional TTL
95    pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
96        let (final_data, is_compressed) =
97            if self.config.compress && data.len() > self.config.compress_threshold {
98                (self.compress(&data)?, true)
99            } else {
100                (data, false)
101            };
102
103        let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
104            CacheEntry::with_ttl(final_data.clone(), is_compressed, ttl_duration)
105        } else {
106            CacheEntry::new(final_data.clone(), is_compressed)
107        };
108
109        let entry_size = entry.size;
110        let mut cache = self.cache.write().await;
111
112        // Evict expired entries first
113        self.evict_expired(&mut cache).await;
114
115        // Evict entries if necessary to make room
116        while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
117            && !cache.is_empty()
118        {
119            if let Some((_, evicted_entry)) = cache.pop_lru() {
120                self.current_size
121                    .fetch_sub(evicted_entry.size, Ordering::SeqCst);
122                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
123            }
124        }
125
126        if let Some(old_entry) = cache.put(key, entry) {
127            self.current_size
128                .fetch_sub(old_entry.size, Ordering::SeqCst);
129        }
130
131        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
132        self.stats.writes.fetch_add(1, Ordering::Relaxed);
133
134        Ok(())
135    }
136
137    /// Evicts expired entries
138    async fn evict_expired(&self, cache: &mut LruCache<CacheKey, CacheEntry>) {
139        let mut keys_to_remove = Vec::new();
140
141        for (key, entry) in cache.iter() {
142            if entry.is_expired() {
143                keys_to_remove.push(key.clone());
144            }
145        }
146
147        for key in keys_to_remove {
148            if let Some(entry) = cache.pop(&key) {
149                self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
150                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
151            }
152        }
153    }
154
155    /// Removes an entry
156    pub async fn remove(&self, key: &CacheKey) -> Result<()> {
157        let mut cache = self.cache.write().await;
158        if let Some(entry) = cache.pop(key) {
159            self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
160        }
161        Ok(())
162    }
163
164    /// Clears the cache
165    pub async fn clear(&self) -> Result<()> {
166        let mut cache = self.cache.write().await;
167        cache.clear();
168        self.current_size.store(0, Ordering::SeqCst);
169        Ok(())
170    }
171
172    /// Returns cache statistics
173    #[must_use]
174    pub fn stats(&self) -> &CacheStats {
175        &self.stats
176    }
177
178    /// Compresses data using gzip
179    fn compress(&self, data: &Bytes) -> Result<Bytes> {
180        oxiarc_archive::gzip::compress(data, 6)
181            .map(Bytes::from)
182            .map_err(|e| {
183                CloudError::Cache(CacheError::Compression {
184                    message: format!("Compression failed: {e}"),
185                })
186            })
187    }
188
189    /// Decompresses data
190    fn decompress(&self, data: &Bytes) -> Result<Bytes> {
191        let mut reader = std::io::Cursor::new(data.as_ref());
192        oxiarc_archive::gzip::decompress(&mut reader)
193            .map(Bytes::from)
194            .map_err(|e| {
195                CloudError::Cache(CacheError::Decompression {
196                    message: format!("Decompression failed: {e}"),
197                })
198            })
199    }
200}
201
202/// LFU (Least Frequently Used) cache
203#[cfg(feature = "cache")]
204pub struct LfuCache {
205    /// Storage map
206    storage: Arc<DashMap<CacheKey, CacheEntry>>,
207    /// Frequency tracking
208    frequency_map: Arc<DashMap<CacheKey, u64>>,
209    /// Minimum frequency tracker
210    min_frequency: Arc<AtomicU64>,
211    /// Current size
212    current_size: Arc<AtomicUsize>,
213    /// Configuration
214    config: CacheConfig,
215    /// Statistics
216    stats: CacheStats,
217}
218
219#[cfg(feature = "cache")]
220impl LfuCache {
221    /// Creates a new LFU cache
222    pub fn new(config: CacheConfig) -> Self {
223        Self {
224            storage: Arc::new(DashMap::new()),
225            frequency_map: Arc::new(DashMap::new()),
226            min_frequency: Arc::new(AtomicU64::new(0)),
227            current_size: Arc::new(AtomicUsize::new(0)),
228            config,
229            stats: CacheStats::default(),
230        }
231    }
232
233    /// Gets an entry
234    pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
235        if let Some(mut entry) = self.storage.get_mut(key) {
236            // Check expiration
237            if entry.is_expired() {
238                drop(entry);
239                self.remove(key).await?;
240                return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
241            }
242
243            entry.record_access();
244            self.frequency_map
245                .entry(key.clone())
246                .and_modify(|f| *f += 1)
247                .or_insert(1);
248
249            self.stats.hits.fetch_add(1, Ordering::Relaxed);
250            Ok(entry.data.clone())
251        } else {
252            self.stats.misses.fetch_add(1, Ordering::Relaxed);
253            Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
254        }
255    }
256
257    /// Puts an entry
258    pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
259        let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
260            CacheEntry::with_ttl(data, false, ttl_duration)
261        } else {
262            CacheEntry::new(data, false)
263        };
264
265        let entry_size = entry.size;
266
267        // Evict entries if necessary
268        while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
269            && !self.storage.is_empty()
270        {
271            self.evict_lfu().await;
272        }
273
274        if let Some(old_entry) = self.storage.insert(key.clone(), entry) {
275            self.current_size
276                .fetch_sub(old_entry.size, Ordering::SeqCst);
277        }
278
279        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
280        self.frequency_map.insert(key, 1);
281        self.min_frequency.store(1, Ordering::SeqCst);
282        self.stats.writes.fetch_add(1, Ordering::Relaxed);
283
284        Ok(())
285    }
286
287    /// Evicts the least frequently used entry
288    async fn evict_lfu(&self) {
289        // Find the entry with minimum frequency
290        let mut min_freq = u64::MAX;
291        let mut min_key: Option<String> = None;
292
293        for entry in self.frequency_map.iter() {
294            if *entry.value() < min_freq {
295                min_freq = *entry.value();
296                min_key = Some(entry.key().clone());
297            }
298        }
299
300        if let Some(key) = min_key {
301            if let Some((_, entry)) = self.storage.remove(&key) {
302                self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
303                self.frequency_map.remove(&key);
304                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
305            }
306        }
307    }
308
309    /// Removes an entry
310    pub async fn remove(&self, key: &CacheKey) -> Result<()> {
311        if let Some((_, entry)) = self.storage.remove(key) {
312            self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
313            self.frequency_map.remove(key);
314        }
315        Ok(())
316    }
317
318    /// Clears the cache
319    pub async fn clear(&self) -> Result<()> {
320        self.storage.clear();
321        self.frequency_map.clear();
322        self.current_size.store(0, Ordering::SeqCst);
323        self.min_frequency.store(0, Ordering::SeqCst);
324        Ok(())
325    }
326
327    /// Returns cache statistics
328    #[must_use]
329    pub fn stats(&self) -> &CacheStats {
330        &self.stats
331    }
332}
333
334/// ARC (Adaptive Replacement Cache)
335///
336/// Combines LRU and LFU strategies adaptively based on access patterns.
337#[cfg(feature = "cache")]
338pub struct ArcCache {
339    /// T1: Recent entries (LRU)
340    t1: Arc<RwLock<VecDeque<CacheKey>>>,
341    /// T2: Frequent entries (LFU)
342    t2: Arc<RwLock<VecDeque<CacheKey>>>,
343    /// B1: Ghost entries from T1
344    b1: Arc<RwLock<VecDeque<CacheKey>>>,
345    /// B2: Ghost entries from T2
346    b2: Arc<RwLock<VecDeque<CacheKey>>>,
347    /// Data storage
348    storage: Arc<DashMap<CacheKey, CacheEntry>>,
349    /// Adaptation parameter p
350    p: Arc<RwLock<f64>>,
351    /// Target cache size
352    c: usize,
353    /// Current size
354    current_size: Arc<AtomicUsize>,
355    /// Configuration
356    config: CacheConfig,
357    /// Statistics
358    stats: CacheStats,
359}
360
361#[cfg(feature = "cache")]
362impl ArcCache {
363    /// Creates a new ARC cache
364    pub fn new(config: CacheConfig) -> Self {
365        let c = config.max_entries;
366        Self {
367            t1: Arc::new(RwLock::new(VecDeque::new())),
368            t2: Arc::new(RwLock::new(VecDeque::new())),
369            b1: Arc::new(RwLock::new(VecDeque::new())),
370            b2: Arc::new(RwLock::new(VecDeque::new())),
371            storage: Arc::new(DashMap::new()),
372            p: Arc::new(RwLock::new(0.0)),
373            c,
374            current_size: Arc::new(AtomicUsize::new(0)),
375            config,
376            stats: CacheStats::default(),
377        }
378    }
379
380    /// Gets an entry using ARC algorithm
381    pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
382        // Check if in T1 or T2
383        if let Some(mut entry) = self.storage.get_mut(key) {
384            if entry.is_expired() {
385                drop(entry);
386                self.remove(key).await?;
387                return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
388            }
389
390            entry.record_access();
391
392            // Move from T1 to T2 (if in T1)
393            let mut t1 = self.t1.write().await;
394            if let Some(pos) = t1.iter().position(|k| k == key) {
395                t1.remove(pos);
396                let mut t2 = self.t2.write().await;
397                t2.push_back(key.clone());
398            }
399
400            self.stats.hits.fetch_add(1, Ordering::Relaxed);
401            Ok(entry.data.clone())
402        } else {
403            self.stats.misses.fetch_add(1, Ordering::Relaxed);
404            Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
405        }
406    }
407
408    /// Puts an entry using ARC algorithm
409    pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
410        let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
411            CacheEntry::with_ttl(data, false, ttl_duration)
412        } else {
413            CacheEntry::new(data, false)
414        };
415
416        let entry_size = entry.size;
417
418        // Check ghost lists and adapt
419        let in_b1 = {
420            let b1 = self.b1.read().await;
421            b1.contains(&key)
422        };
423        let in_b2 = {
424            let b2 = self.b2.read().await;
425            b2.contains(&key)
426        };
427
428        if in_b1 {
429            // Case II: key in B1 (recently evicted from T1)
430            // Adapt p: increase preference for T1
431            let b1_len = self.b1.read().await.len();
432            let b2_len = self.b2.read().await.len();
433            let delta = if b1_len >= b2_len {
434                1.0
435            } else {
436                b2_len as f64 / b1_len as f64
437            };
438            let mut p = self.p.write().await;
439            *p = (*p + delta).min(self.c as f64);
440
441            // Remove from B1
442            let mut b1 = self.b1.write().await;
443            if let Some(pos) = b1.iter().position(|k| k == &key) {
444                b1.remove(pos);
445            }
446        } else if in_b2 {
447            // Case III: key in B2 (recently evicted from T2)
448            // Adapt p: decrease preference for T1
449            let b1_len = self.b1.read().await.len();
450            let b2_len = self.b2.read().await.len();
451            let delta = if b2_len >= b1_len {
452                1.0
453            } else {
454                b1_len as f64 / b2_len as f64
455            };
456            let mut p = self.p.write().await;
457            *p = (*p - delta).max(0.0);
458
459            // Remove from B2
460            let mut b2 = self.b2.write().await;
461            if let Some(pos) = b2.iter().position(|k| k == &key) {
462                b2.remove(pos);
463            }
464        }
465
466        // Ensure we have space
467        while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size {
468            self.replace(&key).await;
469        }
470
471        // Add to T1 (new entries always go to T1)
472        let mut t1 = self.t1.write().await;
473        t1.push_back(key.clone());
474
475        if let Some(old) = self.storage.insert(key, entry) {
476            self.current_size.fetch_sub(old.size, Ordering::SeqCst);
477        }
478        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
479        self.stats.writes.fetch_add(1, Ordering::Relaxed);
480
481        Ok(())
482    }
483
484    /// ARC replacement policy
485    async fn replace(&self, _key: &CacheKey) {
486        let t1_len = self.t1.read().await.len();
487        let p = *self.p.read().await;
488
489        if !self.storage.is_empty() {
490            if t1_len > 0 && (t1_len as f64 > p || self.t2.read().await.is_empty()) {
491                // Evict from T1
492                let mut t1 = self.t1.write().await;
493                if let Some(evict_key) = t1.pop_front() {
494                    if let Some((_, entry)) = self.storage.remove(&evict_key) {
495                        self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
496                    }
497                    // Add to B1 ghost list
498                    let mut b1 = self.b1.write().await;
499                    b1.push_back(evict_key);
500                    // Limit ghost list size
501                    while b1.len() > self.c {
502                        b1.pop_front();
503                    }
504                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
505                }
506            } else {
507                // Evict from T2
508                let mut t2 = self.t2.write().await;
509                if let Some(evict_key) = t2.pop_front() {
510                    if let Some((_, entry)) = self.storage.remove(&evict_key) {
511                        self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
512                    }
513                    // Add to B2 ghost list
514                    let mut b2 = self.b2.write().await;
515                    b2.push_back(evict_key);
516                    // Limit ghost list size
517                    while b2.len() > self.c {
518                        b2.pop_front();
519                    }
520                    self.stats.evictions.fetch_add(1, Ordering::Relaxed);
521                }
522            }
523        }
524    }
525
526    /// Removes an entry
527    pub async fn remove(&self, key: &CacheKey) -> Result<()> {
528        if let Some((_, entry)) = self.storage.remove(key) {
529            self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
530        }
531
532        // Remove from T1 or T2
533        {
534            let mut t1 = self.t1.write().await;
535            if let Some(pos) = t1.iter().position(|k| k == key) {
536                t1.remove(pos);
537            }
538        }
539        {
540            let mut t2 = self.t2.write().await;
541            if let Some(pos) = t2.iter().position(|k| k == key) {
542                t2.remove(pos);
543            }
544        }
545
546        Ok(())
547    }
548
549    /// Clears the cache
550    pub async fn clear(&self) -> Result<()> {
551        self.storage.clear();
552        self.t1.write().await.clear();
553        self.t2.write().await.clear();
554        self.b1.write().await.clear();
555        self.b2.write().await.clear();
556        self.current_size.store(0, Ordering::SeqCst);
557        *self.p.write().await = 0.0;
558        Ok(())
559    }
560
561    /// Returns cache statistics
562    #[must_use]
563    pub fn stats(&self) -> &CacheStats {
564        &self.stats
565    }
566}