Skip to main content

haagenti_network/
cache.rs

1//! Fragment caching with disk and memory tiers
2
3use crate::{NetworkError, Result};
4use bytes::Bytes;
5use dashmap::DashMap;
6use haagenti_fragments::FragmentId;
7use serde::{Deserialize, Serialize};
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use tokio::fs;
12use tracing::{debug, info};
13
14/// Cache configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct CacheConfig {
17    /// Cache directory
18    pub path: PathBuf,
19    /// Maximum cache size (bytes)
20    pub max_size: u64,
21    /// Maximum memory cache size (bytes)
22    pub max_memory_size: u64,
23    /// Eviction threshold (0.0 - 1.0)
24    pub eviction_threshold: f32,
25}
26
27impl Default for CacheConfig {
28    fn default() -> Self {
29        Self {
30            path: PathBuf::from("./fragment_cache"),
31            max_size: 10 * 1024 * 1024 * 1024,  // 10GB
32            max_memory_size: 512 * 1024 * 1024, // 512MB
33            eviction_threshold: 0.9,
34        }
35    }
36}
37
38/// Cache entry metadata
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct CacheEntry {
41    /// Fragment ID
42    pub fragment_id: FragmentId,
43    /// Size in bytes
44    pub size: u64,
45    /// ETag for validation
46    pub etag: Option<String>,
47    /// Last modified timestamp
48    pub last_modified: Option<String>,
49    /// Cache timestamp
50    pub cached_at: u64,
51    /// Last access timestamp
52    pub last_accessed: u64,
53    /// Access count
54    pub access_count: u32,
55}
56
57impl CacheEntry {
58    /// Create a new cache entry
59    pub fn new(fragment_id: FragmentId, size: u64) -> Self {
60        let now = std::time::SystemTime::now()
61            .duration_since(std::time::UNIX_EPOCH)
62            .unwrap()
63            .as_secs();
64
65        Self {
66            fragment_id,
67            size,
68            etag: None,
69            last_modified: None,
70            cached_at: now,
71            last_accessed: now,
72            access_count: 1,
73        }
74    }
75
76    /// With etag
77    pub fn with_etag(mut self, etag: impl Into<String>) -> Self {
78        self.etag = Some(etag.into());
79        self
80    }
81
82    /// With last modified
83    pub fn with_last_modified(mut self, last_modified: impl Into<String>) -> Self {
84        self.last_modified = Some(last_modified.into());
85        self
86    }
87
88    /// Update access timestamp
89    pub fn touch(&mut self) {
90        self.last_accessed = std::time::SystemTime::now()
91            .duration_since(std::time::UNIX_EPOCH)
92            .unwrap()
93            .as_secs();
94        self.access_count += 1;
95    }
96
97    /// Compute eviction score (lower = evict first)
98    pub fn eviction_score(&self) -> f64 {
99        let age = std::time::SystemTime::now()
100            .duration_since(std::time::UNIX_EPOCH)
101            .unwrap()
102            .as_secs()
103            - self.last_accessed;
104
105        // LRU-K hybrid: consider both recency and frequency
106        let recency = 1.0 / (age as f64 + 1.0);
107        let frequency = (self.access_count as f64).ln().max(1.0);
108
109        recency * frequency
110    }
111}
112
113/// Cache statistics
114#[derive(Debug, Clone, Default)]
115pub struct CacheStats {
116    /// Total entries
117    pub entries: usize,
118    /// Disk cache size (bytes)
119    pub disk_size: u64,
120    /// Memory cache size (bytes)
121    pub memory_size: u64,
122    /// Cache hits
123    pub hits: u64,
124    /// Cache misses
125    pub misses: u64,
126    /// Evictions
127    pub evictions: u64,
128}
129
130impl CacheStats {
131    /// Hit rate
132    pub fn hit_rate(&self) -> f64 {
133        let total = self.hits + self.misses;
134        if total == 0 {
135            0.0
136        } else {
137            self.hits as f64 / total as f64
138        }
139    }
140}
141
142/// Two-tier fragment cache (memory + disk)
143pub struct FragmentCache {
144    config: CacheConfig,
145    /// Memory cache
146    memory: DashMap<FragmentId, Arc<Bytes>>,
147    /// Disk cache metadata
148    metadata: DashMap<FragmentId, CacheEntry>,
149    /// Current disk size
150    disk_size: AtomicU64,
151    /// Current memory size
152    memory_size: AtomicU64,
153    /// Statistics
154    stats: Arc<CacheStatsInner>,
155}
156
157struct CacheStatsInner {
158    hits: AtomicU64,
159    misses: AtomicU64,
160    evictions: AtomicU64,
161}
162
163impl FragmentCache {
164    /// Create or open a cache
165    pub async fn open(config: CacheConfig) -> Result<Self> {
166        fs::create_dir_all(&config.path).await?;
167
168        let cache = Self {
169            config,
170            memory: DashMap::new(),
171            metadata: DashMap::new(),
172            disk_size: AtomicU64::new(0),
173            memory_size: AtomicU64::new(0),
174            stats: Arc::new(CacheStatsInner {
175                hits: AtomicU64::new(0),
176                misses: AtomicU64::new(0),
177                evictions: AtomicU64::new(0),
178            }),
179        };
180
181        // Load existing metadata
182        cache.load_metadata().await?;
183
184        Ok(cache)
185    }
186
187    /// Load cache metadata from disk
188    async fn load_metadata(&self) -> Result<()> {
189        let meta_path = self.config.path.join("metadata.bin");
190
191        if !meta_path.exists() {
192            return Ok(());
193        }
194
195        let data = fs::read(&meta_path).await?;
196        let entries: Vec<CacheEntry> =
197            bincode::deserialize(&data).map_err(|e| NetworkError::Cache(e.to_string()))?;
198
199        let mut total_size = 0u64;
200        for entry in entries {
201            total_size += entry.size;
202            self.metadata.insert(entry.fragment_id, entry);
203        }
204
205        self.disk_size.store(total_size, Ordering::Relaxed);
206        info!(
207            "Loaded cache metadata: {} entries, {} bytes",
208            self.metadata.len(),
209            total_size
210        );
211
212        Ok(())
213    }
214
215    /// Save cache metadata to disk
216    async fn save_metadata(&self) -> Result<()> {
217        let entries: Vec<CacheEntry> = self.metadata.iter().map(|e| e.value().clone()).collect();
218        let data = bincode::serialize(&entries).map_err(|e| NetworkError::Cache(e.to_string()))?;
219
220        let meta_path = self.config.path.join("metadata.bin");
221        let tmp_path = meta_path.with_extension("tmp");
222
223        fs::write(&tmp_path, &data).await?;
224        fs::rename(&tmp_path, &meta_path).await?;
225
226        Ok(())
227    }
228
229    /// Get a fragment from cache
230    pub async fn get(&self, fragment_id: &FragmentId) -> Option<Bytes> {
231        // Check memory cache first
232        if let Some(data) = self.memory.get(fragment_id) {
233            self.stats.hits.fetch_add(1, Ordering::Relaxed);
234            if let Some(mut entry) = self.metadata.get_mut(fragment_id) {
235                entry.touch();
236            }
237            return Some(data.as_ref().clone());
238        }
239
240        // Check disk cache
241        if let Some(mut entry) = self.metadata.get_mut(fragment_id) {
242            let path = self.fragment_path(fragment_id);
243            if let Ok(data) = fs::read(&path).await {
244                let bytes = Bytes::from(data);
245                entry.touch();
246
247                // Promote to memory cache if space available
248                self.promote_to_memory(fragment_id, bytes.clone());
249
250                self.stats.hits.fetch_add(1, Ordering::Relaxed);
251                return Some(bytes);
252            }
253        }
254
255        self.stats.misses.fetch_add(1, Ordering::Relaxed);
256        None
257    }
258
259    /// Put a fragment into cache
260    pub async fn put(&self, fragment_id: FragmentId, data: Bytes, entry: CacheEntry) -> Result<()> {
261        let size = data.len() as u64;
262
263        // Evict if needed
264        self.maybe_evict(size).await?;
265
266        // Write to disk
267        let path = self.fragment_path(&fragment_id);
268        if let Some(parent) = path.parent() {
269            fs::create_dir_all(parent).await?;
270        }
271        fs::write(&path, &data).await?;
272
273        // Update metadata
274        self.metadata.insert(fragment_id, entry);
275        self.disk_size.fetch_add(size, Ordering::Relaxed);
276
277        // Add to memory cache if space available
278        self.promote_to_memory(&fragment_id, data);
279
280        Ok(())
281    }
282
283    /// Promote to memory cache
284    fn promote_to_memory(&self, fragment_id: &FragmentId, data: Bytes) {
285        let size = data.len() as u64;
286        let current = self.memory_size.load(Ordering::Relaxed);
287
288        if current + size <= self.config.max_memory_size {
289            self.memory.insert(*fragment_id, Arc::new(data));
290            self.memory_size.fetch_add(size, Ordering::Relaxed);
291        }
292    }
293
294    /// Maybe evict entries
295    async fn maybe_evict(&self, needed_size: u64) -> Result<()> {
296        let current = self.disk_size.load(Ordering::Relaxed);
297        let threshold =
298            (self.config.max_size as f64 * self.config.eviction_threshold as f64) as u64;
299
300        if current + needed_size < threshold {
301            return Ok(());
302        }
303
304        // Collect entries sorted by eviction score
305        let mut entries: Vec<_> = self.metadata.iter().map(|e| e.value().clone()).collect();
306        entries.sort_by(|a, b| a.eviction_score().partial_cmp(&b.eviction_score()).unwrap());
307
308        // Evict until we have enough space
309        let target = self.config.max_size - needed_size - (self.config.max_size / 10); // 10% buffer
310        let mut freed = 0u64;
311
312        for entry in entries {
313            if current - freed <= target {
314                break;
315            }
316
317            if self.evict(&entry.fragment_id).await.is_ok() {
318                freed += entry.size;
319                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
320            }
321        }
322
323        debug!("Evicted {} bytes from cache", freed);
324        Ok(())
325    }
326
327    /// Evict a single entry
328    async fn evict(&self, fragment_id: &FragmentId) -> Result<()> {
329        // Remove from memory
330        if let Some((_, data)) = self.memory.remove(fragment_id) {
331            self.memory_size
332                .fetch_sub(data.len() as u64, Ordering::Relaxed);
333        }
334
335        // Remove from disk
336        if let Some((_, entry)) = self.metadata.remove(fragment_id) {
337            let path = self.fragment_path(fragment_id);
338            if path.exists() {
339                fs::remove_file(&path).await?;
340            }
341            self.disk_size.fetch_sub(entry.size, Ordering::Relaxed);
342        }
343
344        Ok(())
345    }
346
347    /// Check if fragment exists in cache
348    pub fn contains(&self, fragment_id: &FragmentId) -> bool {
349        self.metadata.contains_key(fragment_id)
350    }
351
352    /// Get cache entry metadata
353    pub fn get_entry(&self, fragment_id: &FragmentId) -> Option<CacheEntry> {
354        self.metadata.get(fragment_id).map(|e| e.value().clone())
355    }
356
357    /// Validate cache entry against remote
358    pub fn needs_revalidation(&self, fragment_id: &FragmentId, etag: Option<&str>) -> bool {
359        if let Some(entry) = self.metadata.get(fragment_id) {
360            if let (Some(cached_etag), Some(remote_etag)) = (&entry.etag, etag) {
361                return cached_etag != remote_etag;
362            }
363        }
364        true
365    }
366
367    /// Get cache statistics
368    pub fn stats(&self) -> CacheStats {
369        CacheStats {
370            entries: self.metadata.len(),
371            disk_size: self.disk_size.load(Ordering::Relaxed),
372            memory_size: self.memory_size.load(Ordering::Relaxed),
373            hits: self.stats.hits.load(Ordering::Relaxed),
374            misses: self.stats.misses.load(Ordering::Relaxed),
375            evictions: self.stats.evictions.load(Ordering::Relaxed),
376        }
377    }
378
379    /// Clear all cached data
380    pub async fn clear(&self) -> Result<()> {
381        self.memory.clear();
382        self.metadata.clear();
383        self.disk_size.store(0, Ordering::Relaxed);
384        self.memory_size.store(0, Ordering::Relaxed);
385
386        // Remove all files
387        let fragments_dir = self.config.path.join("fragments");
388        if fragments_dir.exists() {
389            fs::remove_dir_all(&fragments_dir).await?;
390        }
391
392        info!("Cache cleared");
393        Ok(())
394    }
395
396    /// Persist cache state
397    pub async fn sync(&self) -> Result<()> {
398        self.save_metadata().await
399    }
400
401    /// Get fragment path
402    fn fragment_path(&self, id: &FragmentId) -> PathBuf {
403        let hex = id.to_hex();
404        self.config
405            .path
406            .join("fragments")
407            .join(&hex[..2])
408            .join(format!("{}.bin", hex))
409    }
410}
411
412impl Drop for FragmentCache {
413    fn drop(&mut self) {
414        // Best effort sync on drop
415        let meta = self
416            .metadata
417            .iter()
418            .map(|e| e.value().clone())
419            .collect::<Vec<_>>();
420        if let Ok(data) = bincode::serialize(&meta) {
421            let _ = std::fs::write(self.config.path.join("metadata.bin"), data);
422        }
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use tempfile::tempdir;
430
431    #[tokio::test]
432    async fn test_cache_put_get() {
433        let dir = tempdir().unwrap();
434        let config = CacheConfig {
435            path: dir.path().to_path_buf(),
436            ..Default::default()
437        };
438
439        let cache = FragmentCache::open(config).await.unwrap();
440
441        let fragment_id = FragmentId::new([1; 16]);
442        let data = Bytes::from(vec![42u8; 1024]);
443        let entry = CacheEntry::new(fragment_id, 1024);
444
445        cache.put(fragment_id, data.clone(), entry).await.unwrap();
446
447        let retrieved = cache.get(&fragment_id).await.unwrap();
448        assert_eq!(retrieved, data);
449    }
450
451    #[tokio::test]
452    async fn test_cache_miss() {
453        let dir = tempdir().unwrap();
454        let config = CacheConfig {
455            path: dir.path().to_path_buf(),
456            ..Default::default()
457        };
458
459        let cache = FragmentCache::open(config).await.unwrap();
460
461        let fragment_id = FragmentId::new([99; 16]);
462        assert!(cache.get(&fragment_id).await.is_none());
463
464        let stats = cache.stats();
465        assert_eq!(stats.misses, 1);
466    }
467}