1use crate::{NetworkError, Result};
4use bytes::Bytes;
5use dashmap::DashMap;
6use haagenti_fragments::FragmentId;
7use serde::{Deserialize, Serialize};
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use tokio::fs;
12use tracing::{debug, info};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct CacheConfig {
17 pub path: PathBuf,
19 pub max_size: u64,
21 pub max_memory_size: u64,
23 pub eviction_threshold: f32,
25}
26
27impl Default for CacheConfig {
28 fn default() -> Self {
29 Self {
30 path: PathBuf::from("./fragment_cache"),
31 max_size: 10 * 1024 * 1024 * 1024, max_memory_size: 512 * 1024 * 1024, eviction_threshold: 0.9,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct CacheEntry {
41 pub fragment_id: FragmentId,
43 pub size: u64,
45 pub etag: Option<String>,
47 pub last_modified: Option<String>,
49 pub cached_at: u64,
51 pub last_accessed: u64,
53 pub access_count: u32,
55}
56
57impl CacheEntry {
58 pub fn new(fragment_id: FragmentId, size: u64) -> Self {
60 let now = std::time::SystemTime::now()
61 .duration_since(std::time::UNIX_EPOCH)
62 .unwrap()
63 .as_secs();
64
65 Self {
66 fragment_id,
67 size,
68 etag: None,
69 last_modified: None,
70 cached_at: now,
71 last_accessed: now,
72 access_count: 1,
73 }
74 }
75
76 pub fn with_etag(mut self, etag: impl Into<String>) -> Self {
78 self.etag = Some(etag.into());
79 self
80 }
81
82 pub fn with_last_modified(mut self, last_modified: impl Into<String>) -> Self {
84 self.last_modified = Some(last_modified.into());
85 self
86 }
87
88 pub fn touch(&mut self) {
90 self.last_accessed = std::time::SystemTime::now()
91 .duration_since(std::time::UNIX_EPOCH)
92 .unwrap()
93 .as_secs();
94 self.access_count += 1;
95 }
96
97 pub fn eviction_score(&self) -> f64 {
99 let age = std::time::SystemTime::now()
100 .duration_since(std::time::UNIX_EPOCH)
101 .unwrap()
102 .as_secs()
103 - self.last_accessed;
104
105 let recency = 1.0 / (age as f64 + 1.0);
107 let frequency = (self.access_count as f64).ln().max(1.0);
108
109 recency * frequency
110 }
111}
112
113#[derive(Debug, Clone, Default)]
115pub struct CacheStats {
116 pub entries: usize,
118 pub disk_size: u64,
120 pub memory_size: u64,
122 pub hits: u64,
124 pub misses: u64,
126 pub evictions: u64,
128}
129
130impl CacheStats {
131 pub fn hit_rate(&self) -> f64 {
133 let total = self.hits + self.misses;
134 if total == 0 {
135 0.0
136 } else {
137 self.hits as f64 / total as f64
138 }
139 }
140}
141
142pub struct FragmentCache {
144 config: CacheConfig,
145 memory: DashMap<FragmentId, Arc<Bytes>>,
147 metadata: DashMap<FragmentId, CacheEntry>,
149 disk_size: AtomicU64,
151 memory_size: AtomicU64,
153 stats: Arc<CacheStatsInner>,
155}
156
157struct CacheStatsInner {
158 hits: AtomicU64,
159 misses: AtomicU64,
160 evictions: AtomicU64,
161}
162
163impl FragmentCache {
164 pub async fn open(config: CacheConfig) -> Result<Self> {
166 fs::create_dir_all(&config.path).await?;
167
168 let cache = Self {
169 config,
170 memory: DashMap::new(),
171 metadata: DashMap::new(),
172 disk_size: AtomicU64::new(0),
173 memory_size: AtomicU64::new(0),
174 stats: Arc::new(CacheStatsInner {
175 hits: AtomicU64::new(0),
176 misses: AtomicU64::new(0),
177 evictions: AtomicU64::new(0),
178 }),
179 };
180
181 cache.load_metadata().await?;
183
184 Ok(cache)
185 }
186
187 async fn load_metadata(&self) -> Result<()> {
189 let meta_path = self.config.path.join("metadata.bin");
190
191 if !meta_path.exists() {
192 return Ok(());
193 }
194
195 let data = fs::read(&meta_path).await?;
196 let entries: Vec<CacheEntry> =
197 bincode::deserialize(&data).map_err(|e| NetworkError::Cache(e.to_string()))?;
198
199 let mut total_size = 0u64;
200 for entry in entries {
201 total_size += entry.size;
202 self.metadata.insert(entry.fragment_id, entry);
203 }
204
205 self.disk_size.store(total_size, Ordering::Relaxed);
206 info!(
207 "Loaded cache metadata: {} entries, {} bytes",
208 self.metadata.len(),
209 total_size
210 );
211
212 Ok(())
213 }
214
215 async fn save_metadata(&self) -> Result<()> {
217 let entries: Vec<CacheEntry> = self.metadata.iter().map(|e| e.value().clone()).collect();
218 let data = bincode::serialize(&entries).map_err(|e| NetworkError::Cache(e.to_string()))?;
219
220 let meta_path = self.config.path.join("metadata.bin");
221 let tmp_path = meta_path.with_extension("tmp");
222
223 fs::write(&tmp_path, &data).await?;
224 fs::rename(&tmp_path, &meta_path).await?;
225
226 Ok(())
227 }
228
229 pub async fn get(&self, fragment_id: &FragmentId) -> Option<Bytes> {
231 if let Some(data) = self.memory.get(fragment_id) {
233 self.stats.hits.fetch_add(1, Ordering::Relaxed);
234 if let Some(mut entry) = self.metadata.get_mut(fragment_id) {
235 entry.touch();
236 }
237 return Some(data.as_ref().clone());
238 }
239
240 if let Some(mut entry) = self.metadata.get_mut(fragment_id) {
242 let path = self.fragment_path(fragment_id);
243 if let Ok(data) = fs::read(&path).await {
244 let bytes = Bytes::from(data);
245 entry.touch();
246
247 self.promote_to_memory(fragment_id, bytes.clone());
249
250 self.stats.hits.fetch_add(1, Ordering::Relaxed);
251 return Some(bytes);
252 }
253 }
254
255 self.stats.misses.fetch_add(1, Ordering::Relaxed);
256 None
257 }
258
259 pub async fn put(&self, fragment_id: FragmentId, data: Bytes, entry: CacheEntry) -> Result<()> {
261 let size = data.len() as u64;
262
263 self.maybe_evict(size).await?;
265
266 let path = self.fragment_path(&fragment_id);
268 if let Some(parent) = path.parent() {
269 fs::create_dir_all(parent).await?;
270 }
271 fs::write(&path, &data).await?;
272
273 self.metadata.insert(fragment_id, entry);
275 self.disk_size.fetch_add(size, Ordering::Relaxed);
276
277 self.promote_to_memory(&fragment_id, data);
279
280 Ok(())
281 }
282
283 fn promote_to_memory(&self, fragment_id: &FragmentId, data: Bytes) {
285 let size = data.len() as u64;
286 let current = self.memory_size.load(Ordering::Relaxed);
287
288 if current + size <= self.config.max_memory_size {
289 self.memory.insert(*fragment_id, Arc::new(data));
290 self.memory_size.fetch_add(size, Ordering::Relaxed);
291 }
292 }
293
294 async fn maybe_evict(&self, needed_size: u64) -> Result<()> {
296 let current = self.disk_size.load(Ordering::Relaxed);
297 let threshold =
298 (self.config.max_size as f64 * self.config.eviction_threshold as f64) as u64;
299
300 if current + needed_size < threshold {
301 return Ok(());
302 }
303
304 let mut entries: Vec<_> = self.metadata.iter().map(|e| e.value().clone()).collect();
306 entries.sort_by(|a, b| a.eviction_score().partial_cmp(&b.eviction_score()).unwrap());
307
308 let target = self.config.max_size - needed_size - (self.config.max_size / 10); let mut freed = 0u64;
311
312 for entry in entries {
313 if current - freed <= target {
314 break;
315 }
316
317 if self.evict(&entry.fragment_id).await.is_ok() {
318 freed += entry.size;
319 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
320 }
321 }
322
323 debug!("Evicted {} bytes from cache", freed);
324 Ok(())
325 }
326
327 async fn evict(&self, fragment_id: &FragmentId) -> Result<()> {
329 if let Some((_, data)) = self.memory.remove(fragment_id) {
331 self.memory_size
332 .fetch_sub(data.len() as u64, Ordering::Relaxed);
333 }
334
335 if let Some((_, entry)) = self.metadata.remove(fragment_id) {
337 let path = self.fragment_path(fragment_id);
338 if path.exists() {
339 fs::remove_file(&path).await?;
340 }
341 self.disk_size.fetch_sub(entry.size, Ordering::Relaxed);
342 }
343
344 Ok(())
345 }
346
347 pub fn contains(&self, fragment_id: &FragmentId) -> bool {
349 self.metadata.contains_key(fragment_id)
350 }
351
352 pub fn get_entry(&self, fragment_id: &FragmentId) -> Option<CacheEntry> {
354 self.metadata.get(fragment_id).map(|e| e.value().clone())
355 }
356
357 pub fn needs_revalidation(&self, fragment_id: &FragmentId, etag: Option<&str>) -> bool {
359 if let Some(entry) = self.metadata.get(fragment_id) {
360 if let (Some(cached_etag), Some(remote_etag)) = (&entry.etag, etag) {
361 return cached_etag != remote_etag;
362 }
363 }
364 true
365 }
366
367 pub fn stats(&self) -> CacheStats {
369 CacheStats {
370 entries: self.metadata.len(),
371 disk_size: self.disk_size.load(Ordering::Relaxed),
372 memory_size: self.memory_size.load(Ordering::Relaxed),
373 hits: self.stats.hits.load(Ordering::Relaxed),
374 misses: self.stats.misses.load(Ordering::Relaxed),
375 evictions: self.stats.evictions.load(Ordering::Relaxed),
376 }
377 }
378
379 pub async fn clear(&self) -> Result<()> {
381 self.memory.clear();
382 self.metadata.clear();
383 self.disk_size.store(0, Ordering::Relaxed);
384 self.memory_size.store(0, Ordering::Relaxed);
385
386 let fragments_dir = self.config.path.join("fragments");
388 if fragments_dir.exists() {
389 fs::remove_dir_all(&fragments_dir).await?;
390 }
391
392 info!("Cache cleared");
393 Ok(())
394 }
395
396 pub async fn sync(&self) -> Result<()> {
398 self.save_metadata().await
399 }
400
401 fn fragment_path(&self, id: &FragmentId) -> PathBuf {
403 let hex = id.to_hex();
404 self.config
405 .path
406 .join("fragments")
407 .join(&hex[..2])
408 .join(format!("{}.bin", hex))
409 }
410}
411
412impl Drop for FragmentCache {
413 fn drop(&mut self) {
414 let meta = self
416 .metadata
417 .iter()
418 .map(|e| e.value().clone())
419 .collect::<Vec<_>>();
420 if let Ok(data) = bincode::serialize(&meta) {
421 let _ = std::fs::write(self.config.path.join("metadata.bin"), data);
422 }
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use tempfile::tempdir;
430
431 #[tokio::test]
432 async fn test_cache_put_get() {
433 let dir = tempdir().unwrap();
434 let config = CacheConfig {
435 path: dir.path().to_path_buf(),
436 ..Default::default()
437 };
438
439 let cache = FragmentCache::open(config).await.unwrap();
440
441 let fragment_id = FragmentId::new([1; 16]);
442 let data = Bytes::from(vec![42u8; 1024]);
443 let entry = CacheEntry::new(fragment_id, 1024);
444
445 cache.put(fragment_id, data.clone(), entry).await.unwrap();
446
447 let retrieved = cache.get(&fragment_id).await.unwrap();
448 assert_eq!(retrieved, data);
449 }
450
451 #[tokio::test]
452 async fn test_cache_miss() {
453 let dir = tempdir().unwrap();
454 let config = CacheConfig {
455 path: dir.path().to_path_buf(),
456 ..Default::default()
457 };
458
459 let cache = FragmentCache::open(config).await.unwrap();
460
461 let fragment_id = FragmentId::new([99; 16]);
462 assert!(cache.get(&fragment_id).await.is_none());
463
464 let stats = cache.stats();
465 assert_eq!(stats.misses, 1);
466 }
467}