Skip to main content

oxirs_vec/
advanced_caching_eviction.rs

1//! Eviction policies and cache storage implementations for the advanced caching system.
2//!
3//! Contains:
4//! - `MemoryCache` — in-process LRU/LFU/ARC/FIFO/TTL cache
5//! - `PersistentCache` — disk-backed cache with optional RLE compression
6
7use crate::advanced_caching::{CacheConfig, CacheEntry, CacheKey, CacheStats, EvictionPolicy};
8use anyhow::{anyhow, Result};
9use std::collections::{HashMap, VecDeque};
10use std::hash::{Hash, Hasher};
11use std::time::{Duration, Instant};
12
13/// Memory cache implementation
14pub struct MemoryCache {
15    pub(super) config: CacheConfig,
16    pub(super) entries: HashMap<CacheKey, CacheEntry>,
17    access_order: VecDeque<CacheKey>,      // For LRU
18    frequency_map: HashMap<CacheKey, u64>, // For LFU
19    current_memory_bytes: usize,
20    // ARC state
21    arc_t1: VecDeque<CacheKey>, // Recently accessed pages
22    arc_t2: VecDeque<CacheKey>, // Frequently accessed pages
23    arc_b1: VecDeque<CacheKey>, // Ghost list for T1
24    arc_b2: VecDeque<CacheKey>, // Ghost list for T2
25    arc_p: usize,               // Target size for T1
26}
27
28impl MemoryCache {
29    pub fn new(config: CacheConfig) -> Self {
30        Self {
31            config,
32            entries: HashMap::new(),
33            access_order: VecDeque::new(),
34            frequency_map: HashMap::new(),
35            current_memory_bytes: 0,
36            arc_t1: VecDeque::new(),
37            arc_t2: VecDeque::new(),
38            arc_b1: VecDeque::new(),
39            arc_b2: VecDeque::new(),
40            arc_p: 0,
41        }
42    }
43
44    /// Insert or update cache entry
45    pub fn insert(&mut self, key: CacheKey, entry: CacheEntry) -> Result<()> {
46        // Remove expired entries first
47        self.clean_expired();
48
49        // Check if we need to evict
50        while self.should_evict(&entry) {
51            self.evict_one()?;
52        }
53
54        // Remove existing entry if present
55        if let Some(old_entry) = self.entries.remove(&key) {
56            self.current_memory_bytes -= old_entry.size_bytes;
57            self.remove_from_tracking(&key);
58        }
59
60        // Insert new entry
61        self.current_memory_bytes += entry.size_bytes;
62        self.entries.insert(key.clone(), entry);
63        self.track_access(&key);
64
65        Ok(())
66    }
67
68    /// Get cache entry
69    pub fn get(&mut self, key: &CacheKey) -> Option<crate::Vector> {
70        // Check if entry exists and is not expired
71        let should_remove = if let Some(entry) = self.entries.get(key) {
72            entry.is_expired()
73        } else {
74            false
75        };
76
77        if should_remove {
78            self.remove(key);
79            return None;
80        }
81
82        if let Some(entry) = self.entries.get_mut(key) {
83            let data = entry.data.clone();
84            entry.touch();
85            self.track_access(key);
86            Some(data)
87        } else {
88            None
89        }
90    }
91
92    /// Remove entry from cache
93    pub fn remove(&mut self, key: &CacheKey) -> Option<CacheEntry> {
94        if let Some(entry) = self.entries.remove(key) {
95            self.current_memory_bytes -= entry.size_bytes;
96            self.remove_from_tracking(key);
97            Some(entry)
98        } else {
99            None
100        }
101    }
102
103    /// Clear all entries
104    pub fn clear(&mut self) {
105        self.entries.clear();
106        self.access_order.clear();
107        self.frequency_map.clear();
108        self.current_memory_bytes = 0;
109    }
110
111    /// Check if eviction is needed
112    fn should_evict(&self, new_entry: &CacheEntry) -> bool {
113        self.entries.len() >= self.config.max_memory_entries
114            || self.current_memory_bytes + new_entry.size_bytes > self.config.max_memory_bytes
115    }
116
117    /// Evict one entry based on policy
118    fn evict_one(&mut self) -> Result<()> {
119        let key_to_evict = match self.config.eviction_policy {
120            EvictionPolicy::LRU => self.find_lru_key(),
121            EvictionPolicy::LFU => self.find_lfu_key(),
122            EvictionPolicy::ARC => self.find_arc_key(),
123            EvictionPolicy::FIFO => self.find_fifo_key(),
124            EvictionPolicy::TTL => self.find_expired_key(),
125        };
126
127        if let Some(key) = key_to_evict {
128            self.remove(&key);
129            Ok(())
130        } else if !self.entries.is_empty() {
131            // Fallback: remove first entry
132            let key = self
133                .entries
134                .keys()
135                .next()
136                .expect("entries should not be empty when at capacity")
137                .clone();
138            self.remove(&key);
139            Ok(())
140        } else {
141            Err(anyhow!("No entries to evict"))
142        }
143    }
144
145    /// Find LRU key
146    fn find_lru_key(&self) -> Option<CacheKey> {
147        self.access_order.front().cloned()
148    }
149
150    /// Find LFU key
151    fn find_lfu_key(&self) -> Option<CacheKey> {
152        self.frequency_map
153            .iter()
154            .min_by_key(|&(_, &freq)| freq)
155            .map(|(key, _)| key.clone())
156    }
157
158    /// Find ARC key using Adaptive Replacement Cache algorithm
159    fn find_arc_key(&mut self) -> Option<CacheKey> {
160        let c = self.config.max_memory_entries;
161
162        // If T1 is not empty and |T1| > p, evict from T1
163        if !self.arc_t1.is_empty()
164            && (self.arc_t1.len() > self.arc_p
165                || (self.arc_t2.is_empty() && self.arc_t1.len() == self.arc_p))
166        {
167            if let Some(key) = self.arc_t1.pop_front() {
168                // Move to B1
169                self.arc_b1.push_back(key.clone());
170                if self.arc_b1.len() > c {
171                    self.arc_b1.pop_front();
172                }
173                return Some(key);
174            }
175        }
176
177        // Otherwise evict from T2
178        if let Some(key) = self.arc_t2.pop_front() {
179            // Move to B2
180            self.arc_b2.push_back(key.clone());
181            if self.arc_b2.len() > c {
182                self.arc_b2.pop_front();
183            }
184            return Some(key);
185        }
186
187        // Fallback to LRU if ARC lists are empty
188        self.find_lru_key()
189    }
190
191    /// Find FIFO key (oldest entry)
192    fn find_fifo_key(&self) -> Option<CacheKey> {
193        self.entries
194            .iter()
195            .min_by_key(|(_, entry)| entry.created_at)
196            .map(|(key, _)| key.clone())
197    }
198
199    /// Find expired key
200    fn find_expired_key(&self) -> Option<CacheKey> {
201        self.entries
202            .iter()
203            .find(|(_, entry)| entry.is_expired())
204            .map(|(key, _)| key.clone())
205    }
206
207    /// Track access for LRU/LFU/ARC
208    fn track_access(&mut self, key: &CacheKey) {
209        // Update LRU order
210        if let Some(pos) = self.access_order.iter().position(|k| k == key) {
211            self.access_order.remove(pos);
212        }
213        self.access_order.push_back(key.clone());
214
215        // Update LFU frequency
216        *self.frequency_map.entry(key.clone()).or_insert(0) += 1;
217
218        // Update ARC tracking
219        if self.config.eviction_policy == EvictionPolicy::ARC {
220            self.track_arc_access(key);
221        }
222    }
223
224    /// Track access for ARC algorithm
225    fn track_arc_access(&mut self, key: &CacheKey) {
226        let c = self.config.max_memory_entries;
227
228        // Check if key is in T1 or T2
229        if let Some(pos) = self.arc_t1.iter().position(|k| k == key) {
230            // Move from T1 to T2 (promote to frequent)
231            self.arc_t1.remove(pos);
232            self.arc_t2.push_back(key.clone());
233        } else if let Some(pos) = self.arc_t2.iter().position(|k| k == key) {
234            // Move to end of T2 (most recently used)
235            self.arc_t2.remove(pos);
236            self.arc_t2.push_back(key.clone());
237        } else if let Some(pos) = self.arc_b1.iter().position(|k| k == key) {
238            // Hit in B1: increase p and move to T2
239            self.arc_b1.remove(pos);
240            self.arc_p = (self.arc_p + 1.max(self.arc_b2.len() / self.arc_b1.len())).min(c);
241            self.arc_t2.push_back(key.clone());
242        } else if let Some(pos) = self.arc_b2.iter().position(|k| k == key) {
243            // Hit in B2: decrease p and move to T2
244            self.arc_b2.remove(pos);
245            self.arc_p = self
246                .arc_p
247                .saturating_sub(1.max(self.arc_b1.len() / self.arc_b2.len()));
248            self.arc_t2.push_back(key.clone());
249        } else {
250            // New key: add to T1
251            self.arc_t1.push_back(key.clone());
252        }
253    }
254
255    /// Remove from tracking structures
256    fn remove_from_tracking(&mut self, key: &CacheKey) {
257        if let Some(pos) = self.access_order.iter().position(|k| k == key) {
258            self.access_order.remove(pos);
259        }
260        self.frequency_map.remove(key);
261
262        // Remove from ARC structures
263        if self.config.eviction_policy == EvictionPolicy::ARC {
264            if let Some(pos) = self.arc_t1.iter().position(|k| k == key) {
265                self.arc_t1.remove(pos);
266            }
267            if let Some(pos) = self.arc_t2.iter().position(|k| k == key) {
268                self.arc_t2.remove(pos);
269            }
270            if let Some(pos) = self.arc_b1.iter().position(|k| k == key) {
271                self.arc_b1.remove(pos);
272            }
273            if let Some(pos) = self.arc_b2.iter().position(|k| k == key) {
274                self.arc_b2.remove(pos);
275            }
276        }
277    }
278
279    /// Clean expired entries
280    fn clean_expired(&mut self) {
281        let expired_keys: Vec<CacheKey> = self
282            .entries
283            .iter()
284            .filter(|(_, entry)| entry.is_expired())
285            .map(|(key, _)| key.clone())
286            .collect();
287
288        for key in expired_keys {
289            self.remove(&key);
290        }
291    }
292
293    /// Get cache statistics
294    pub fn stats(&self) -> CacheStats {
295        CacheStats {
296            entries: self.entries.len(),
297            memory_bytes: self.current_memory_bytes,
298            max_entries: self.config.max_memory_entries,
299            max_memory_bytes: self.config.max_memory_bytes,
300            hit_ratio: 0.0, // Would need to track hits/misses
301        }
302    }
303}
304
305// ---------------------------------------------------------------------------
306// PersistentCache
307// ---------------------------------------------------------------------------
308
309/// Persistent cache for disk storage
310pub struct PersistentCache {
311    pub(super) config: CacheConfig,
312    pub(super) cache_dir: std::path::PathBuf,
313}
314
315impl PersistentCache {
316    pub fn new(config: CacheConfig) -> Result<Self> {
317        let cache_dir = config
318            .persistent_cache_dir
319            .clone()
320            .unwrap_or_else(|| std::env::temp_dir().join("oxirs_vec_cache"));
321
322        std::fs::create_dir_all(&cache_dir)?;
323
324        Ok(Self { config, cache_dir })
325    }
326
327    /// Store entry to disk
328    pub fn store(&self, key: &CacheKey, entry: &CacheEntry) -> Result<()> {
329        let file_path = self.get_file_path(key);
330
331        if let Some(parent) = file_path.parent() {
332            std::fs::create_dir_all(parent)?;
333        }
334
335        let serialized = self.serialize_entry(entry)?;
336        let final_data = if self.config.enable_compression {
337            self.compress_data(&serialized)?
338        } else {
339            serialized
340        };
341
342        std::fs::write(file_path, final_data)?;
343        Ok(())
344    }
345
346    /// Load entry from disk
347    pub fn load(&self, key: &CacheKey) -> Result<Option<CacheEntry>> {
348        let file_path = self.get_file_path(key);
349
350        if !file_path.exists() {
351            return Ok(None);
352        }
353
354        let data = std::fs::read(&file_path)?;
355
356        let decompressed = if self.config.enable_compression {
357            self.decompress_data(&data)?
358        } else {
359            data
360        };
361
362        let entry = self.deserialize_entry(&decompressed)?;
363
364        // Check if entry has expired
365        if entry.is_expired() {
366            // Remove expired entry
367            let _ = std::fs::remove_file(file_path);
368            Ok(None)
369        } else {
370            Ok(Some(entry))
371        }
372    }
373
374    /// Remove entry from disk
375    pub fn remove(&self, key: &CacheKey) -> Result<()> {
376        let file_path = self.get_file_path(key);
377        if file_path.exists() {
378            std::fs::remove_file(file_path)?;
379        }
380        Ok(())
381    }
382
383    /// Clear all persistent cache
384    pub fn clear(&self) -> Result<()> {
385        if self.cache_dir.exists() {
386            std::fs::remove_dir_all(&self.cache_dir)?;
387            std::fs::create_dir_all(&self.cache_dir)?;
388        }
389        Ok(())
390    }
391
392    /// Get file path for cache key
393    pub(super) fn get_file_path(&self, key: &CacheKey) -> std::path::PathBuf {
394        let key_str = key.to_string();
395        let hash = self.hash_key(&key_str);
396
397        // Create subdirectory structure to avoid too many files in one directory
398        let sub_dir = format!("{:02x}", (hash % 256) as u8);
399
400        // Encode key information in filename for reconstruction during cleanup
401        let encoded_key = self.encode_cache_key_for_filename(key);
402        let filename = format!("{hash:016x}_{encoded_key}.cache");
403
404        self.cache_dir.join(sub_dir).join(filename)
405    }
406
407    /// Encode cache key information into filename-safe format
408    fn encode_cache_key_for_filename(&self, key: &CacheKey) -> String {
409        let key_data = serde_json::json!({
410            "namespace": key.namespace,
411            "key": key.key,
412            "variant": key.variant
413        });
414
415        // Use base64 encoding to safely include key information in filename
416        use base64::{engine::general_purpose, Engine as _};
417        general_purpose::URL_SAFE_NO_PAD.encode(key_data.to_string().as_bytes())
418    }
419
420    /// Decode cache key from filename
421    pub(super) fn decode_cache_key_from_filename(&self, filename: &str) -> Option<CacheKey> {
422        if let Some(encoded_part) = filename
423            .strip_suffix(".cache")
424            .and_then(|s| s.split('_').nth(1))
425        {
426            use base64::{engine::general_purpose, Engine as _};
427            if let Ok(decoded_bytes) = general_purpose::URL_SAFE_NO_PAD.decode(encoded_part) {
428                if let Ok(decoded_str) = String::from_utf8(decoded_bytes) {
429                    if let Ok(key_data) = serde_json::from_str::<serde_json::Value>(&decoded_str) {
430                        return Some(CacheKey {
431                            namespace: key_data["namespace"].as_str()?.to_string(),
432                            key: key_data["key"].as_str()?.to_string(),
433                            variant: key_data["variant"].as_str().map(|s| s.to_string()),
434                        });
435                    }
436                }
437            }
438        }
439        None
440    }
441
442    /// Hash cache key
443    fn hash_key(&self, key: &str) -> u64 {
444        let mut hasher = std::collections::hash_map::DefaultHasher::new();
445        key.hash(&mut hasher);
446        hasher.finish()
447    }
448
449    /// Serialize cache entry to bytes
450    pub(super) fn serialize_entry(&self, entry: &CacheEntry) -> Result<Vec<u8>> {
451        // Custom binary serialization since CacheEntry has Instant fields
452        let mut data = Vec::new();
453
454        // Serialize vector data
455        let vector_data = &entry.data.as_f32();
456        data.extend_from_slice(&(vector_data.len() as u32).to_le_bytes());
457        for &value in vector_data {
458            data.extend_from_slice(&value.to_le_bytes());
459        }
460
461        // Serialize timestamps as epoch nanos from creation
462        let created_nanos = entry.created_at.elapsed().as_nanos() as u64;
463        let accessed_nanos = entry.last_accessed.elapsed().as_nanos() as u64;
464        data.extend_from_slice(&created_nanos.to_le_bytes());
465        data.extend_from_slice(&accessed_nanos.to_le_bytes());
466
467        // Serialize other fields
468        data.extend_from_slice(&entry.access_count.to_le_bytes());
469        data.extend_from_slice(&(entry.size_bytes as u64).to_le_bytes());
470
471        // Serialize TTL
472        if let Some(ttl) = entry.ttl {
473            data.push(1); // TTL present
474            data.extend_from_slice(&ttl.as_nanos().to_le_bytes());
475        } else {
476            data.push(0); // No TTL
477        }
478
479        // Serialize tags
480        data.extend_from_slice(&(entry.tags.len() as u32).to_le_bytes());
481        for (key, value) in &entry.tags {
482            data.extend_from_slice(&(key.len() as u32).to_le_bytes());
483            data.extend_from_slice(key.as_bytes());
484            data.extend_from_slice(&(value.len() as u32).to_le_bytes());
485            data.extend_from_slice(value.as_bytes());
486        }
487
488        Ok(data)
489    }
490
491    /// Deserialize cache entry from bytes
492    pub(super) fn deserialize_entry(&self, data: &[u8]) -> Result<CacheEntry> {
493        // Check if data is empty or too small
494        if data.len() < 4 {
495            return Err(anyhow::anyhow!(
496                "Invalid cache entry data: too small (expected at least 4 bytes, got {})",
497                data.len()
498            ));
499        }
500
501        let mut offset = 0;
502
503        // Deserialize vector data
504        let vector_len = u32::from_le_bytes([
505            data[offset],
506            data[offset + 1],
507            data[offset + 2],
508            data[offset + 3],
509        ]) as usize;
510        offset += 4;
511
512        let mut vector_data = Vec::with_capacity(vector_len);
513        for _ in 0..vector_len {
514            let value = f32::from_le_bytes([
515                data[offset],
516                data[offset + 1],
517                data[offset + 2],
518                data[offset + 3],
519            ]);
520            vector_data.push(value);
521            offset += 4;
522        }
523        let vector = crate::Vector::new(vector_data);
524
525        // Deserialize timestamps (stored as elapsed nanos, convert back to Instant)
526        let created_nanos = u64::from_le_bytes([
527            data[offset],
528            data[offset + 1],
529            data[offset + 2],
530            data[offset + 3],
531            data[offset + 4],
532            data[offset + 5],
533            data[offset + 6],
534            data[offset + 7],
535        ]);
536        offset += 8;
537
538        let accessed_nanos = u64::from_le_bytes([
539            data[offset],
540            data[offset + 1],
541            data[offset + 2],
542            data[offset + 3],
543            data[offset + 4],
544            data[offset + 5],
545            data[offset + 6],
546            data[offset + 7],
547        ]);
548        offset += 8;
549
550        // Reconstruct timestamps (approximation - will be recent)
551        let now = Instant::now();
552        let created_at = now - Duration::from_nanos(created_nanos);
553        let last_accessed = now - Duration::from_nanos(accessed_nanos);
554
555        // Deserialize other fields
556        let access_count = u64::from_le_bytes([
557            data[offset],
558            data[offset + 1],
559            data[offset + 2],
560            data[offset + 3],
561            data[offset + 4],
562            data[offset + 5],
563            data[offset + 6],
564            data[offset + 7],
565        ]);
566        offset += 8;
567
568        let size_bytes = u64::from_le_bytes([
569            data[offset],
570            data[offset + 1],
571            data[offset + 2],
572            data[offset + 3],
573            data[offset + 4],
574            data[offset + 5],
575            data[offset + 6],
576            data[offset + 7],
577        ]) as usize;
578        offset += 8;
579
580        // Deserialize TTL
581        let ttl = if data[offset] == 1 {
582            offset += 1;
583            let ttl_nanos = u128::from_le_bytes([
584                data[offset],
585                data[offset + 1],
586                data[offset + 2],
587                data[offset + 3],
588                data[offset + 4],
589                data[offset + 5],
590                data[offset + 6],
591                data[offset + 7],
592                data[offset + 8],
593                data[offset + 9],
594                data[offset + 10],
595                data[offset + 11],
596                data[offset + 12],
597                data[offset + 13],
598                data[offset + 14],
599                data[offset + 15],
600            ]);
601            offset += 16;
602            Some(Duration::from_nanos(ttl_nanos as u64))
603        } else {
604            offset += 1;
605            None
606        };
607
608        // Deserialize tags
609        let tags_len = u32::from_le_bytes([
610            data[offset],
611            data[offset + 1],
612            data[offset + 2],
613            data[offset + 3],
614        ]) as usize;
615        offset += 4;
616
617        let mut tags = HashMap::new();
618        for _ in 0..tags_len {
619            let key_len = u32::from_le_bytes([
620                data[offset],
621                data[offset + 1],
622                data[offset + 2],
623                data[offset + 3],
624            ]) as usize;
625            offset += 4;
626            let key = String::from_utf8(data[offset..offset + key_len].to_vec())?;
627            offset += key_len;
628
629            let value_len = u32::from_le_bytes([
630                data[offset],
631                data[offset + 1],
632                data[offset + 2],
633                data[offset + 3],
634            ]) as usize;
635            offset += 4;
636            let value = String::from_utf8(data[offset..offset + value_len].to_vec())?;
637            offset += value_len;
638
639            tags.insert(key, value);
640        }
641
642        Ok(CacheEntry {
643            data: vector,
644            created_at,
645            last_accessed,
646            access_count,
647            size_bytes,
648            ttl,
649            tags,
650        })
651    }
652
653    /// Compress data using simple RLE compression
654    pub(super) fn compress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
655        // Simple run-length encoding for demonstration
656        let mut compressed = Vec::new();
657
658        if data.is_empty() {
659            return Ok(compressed);
660        }
661
662        let mut current_byte = data[0];
663        let mut count = 1u8;
664
665        for &byte in &data[1..] {
666            if byte == current_byte && count < 255 {
667                count += 1;
668            } else {
669                compressed.push(count);
670                compressed.push(current_byte);
671                current_byte = byte;
672                count = 1;
673            }
674        }
675
676        // Add the last run
677        compressed.push(count);
678        compressed.push(current_byte);
679
680        Ok(compressed)
681    }
682
683    /// Decompress data using RLE decompression
684    pub(super) fn decompress_data(&self, data: &[u8]) -> Result<Vec<u8>> {
685        let mut decompressed = Vec::new();
686
687        if data.len() % 2 != 0 {
688            return Err(anyhow!("Invalid compressed data length"));
689        }
690
691        for chunk in data.chunks(2) {
692            let count = chunk[0];
693            let byte = chunk[1];
694
695            for _ in 0..count {
696                decompressed.push(byte);
697            }
698        }
699
700        Ok(decompressed)
701    }
702}