1use crate::error::{GuardianError, Result};
6use blake3::Hasher;
7use bytes::Bytes;
8use iroh_blobs::BlobFormat;
9use lru::LruCache;
10use std::collections::HashMap;
11use std::num::NonZeroUsize;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::{Mutex, RwLock};
15use tracing::{debug, info, instrument, warn};
16
17pub struct OptimizedCache {
19 data_cache: Arc<RwLock<LruCache<String, CacheEntry>>>,
21 metadata_cache: Arc<RwLock<HashMap<String, MetadataEntry>>>,
23 compressed_cache: Arc<RwLock<LruCache<String, CompressedEntry>>>,
25 stats: Arc<RwLock<CacheStats>>,
27 config: CacheConfig,
29 access_predictor: Arc<Mutex<AccessPredictor>>,
31}
32
33#[derive(Debug, Clone)]
35pub struct CacheEntry {
36 pub data: Bytes,
38 pub created_at: Instant,
40 pub last_accessed: Instant,
42 pub access_count: u64,
44 pub priority: u8,
46 pub original_size: usize,
48 pub integrity_hash: [u8; 32],
50}
51
52#[derive(Debug, Clone)]
54pub struct CompressedEntry {
55 pub compressed_data: Bytes,
57 pub original_size: usize,
59 pub compression_level: i32,
61 pub compressed_at: Instant,
63 pub compression_ratio: f64,
65}
66
67#[derive(Debug, Clone)]
69pub struct MetadataEntry {
70 pub size: u64,
72 pub format: BlobFormat,
74 pub providers: Vec<String>,
76 pub discovered_at: Instant,
78 pub avg_access_latency_ms: f64,
80 pub popularity_score: f64,
82}
83
84#[derive(Debug, Clone, Default)]
86pub struct CacheStats {
87 pub data_cache_hits: u64,
89 pub data_cache_misses: u64,
91 pub compressed_cache_hits: u64,
93 pub compressed_cache_misses: u64,
95 pub total_bytes_cached: u64,
97 pub bytes_saved_compression: u64,
99 pub bytes_saved_network: u64,
101 pub avg_access_time_us: f64,
103 pub hit_rate: f64,
105 pub evictions_count: u64,
107 pub compressions_count: u64,
109}
110
111#[derive(Debug, Clone)]
113pub struct CacheConfig {
114 pub max_data_cache_size: usize,
116 pub max_data_entries: usize,
118 pub max_compressed_cache_size: usize,
120 pub max_compressed_entries: usize,
122 pub default_ttl_secs: u64,
124 pub compression_threshold: usize,
126 pub compression_level: i32,
128 pub eviction_threshold: f64,
130 pub enable_access_prediction: bool,
132}
133
134#[derive(Debug)]
136pub struct AccessPredictor {
137 access_history: HashMap<String, Vec<Instant>>,
139 #[allow(dead_code)]
141 patterns: HashMap<String, AccessPattern>,
142 analysis_window_secs: u64,
144}
145
146#[derive(Debug, Clone)]
148pub struct AccessPattern {
149 pub avg_frequency: f64,
151 pub peak_hours: Vec<u8>,
153 pub reaccess_probability: f64,
155 pub pattern_type: PatternType,
157}
158
159#[derive(Debug, Clone, PartialEq)]
161pub enum PatternType {
162 OneTime,
164 Regular,
166 Burst,
168 Seasonal,
170}
171
172impl Default for CacheConfig {
173 fn default() -> Self {
174 Self {
175 max_data_cache_size: 256 * 1024 * 1024, max_data_entries: 10_000,
177 max_compressed_cache_size: 1024 * 1024 * 1024, max_compressed_entries: 50_000,
179 default_ttl_secs: 3600, compression_threshold: 64 * 1024, compression_level: 6, eviction_threshold: 0.85,
183 enable_access_prediction: true,
184 }
185 }
186}
187
188impl OptimizedCache {
189 pub fn new(config: CacheConfig) -> Self {
191 let data_cache_size = NonZeroUsize::new(config.max_data_entries)
192 .unwrap_or(NonZeroUsize::new(10_000).unwrap());
193 let compressed_cache_size = NonZeroUsize::new(config.max_compressed_entries)
194 .unwrap_or(NonZeroUsize::new(50_000).unwrap());
195
196 Self {
197 data_cache: Arc::new(RwLock::new(LruCache::new(data_cache_size))),
198 metadata_cache: Arc::new(RwLock::new(HashMap::new())),
199 compressed_cache: Arc::new(RwLock::new(LruCache::new(compressed_cache_size))),
200 stats: Arc::new(RwLock::new(CacheStats::default())),
201 config,
202 access_predictor: Arc::new(Mutex::new(AccessPredictor {
203 access_history: HashMap::new(),
204 patterns: HashMap::new(),
205 analysis_window_secs: 3600 * 24, })),
207 }
208 }
209
210 #[instrument(skip(self))]
212 pub async fn get(&self, cid: &str) -> Option<Bytes> {
213 let start_time = Instant::now();
214
215 if self.config.enable_access_prediction {
217 self.update_access_history(cid).await;
218 }
219
220 {
222 let mut cache = self.data_cache.write().await;
223 if let Some(entry) = cache.get_mut(cid) {
224 entry.last_accessed = Instant::now();
225 entry.access_count += 1;
226
227 let mut stats = self.stats.write().await;
229 stats.data_cache_hits += 1;
230 stats.avg_access_time_us =
231 (stats.avg_access_time_us + start_time.elapsed().as_micros() as f64) / 2.0;
232
233 debug!("Cache hit (data): {} ({} bytes)", cid, entry.data.len());
234 return Some(entry.data.clone());
235 }
236 }
237
238 {
240 let mut compressed_cache = self.compressed_cache.write().await;
241 if let Some(compressed_entry) = compressed_cache.get_mut(cid) {
242 match self
244 .decompress_data(
245 &compressed_entry.compressed_data,
246 compressed_entry.original_size,
247 )
248 .await
249 {
250 Ok(decompressed) => {
251 let cache_entry = CacheEntry {
253 data: decompressed.clone(),
254 created_at: compressed_entry.compressed_at,
255 last_accessed: Instant::now(),
256 access_count: 1,
257 priority: 7, original_size: compressed_entry.original_size,
259 integrity_hash: self.calculate_hash(&decompressed),
260 };
261
262 {
263 let mut data_cache = self.data_cache.write().await;
264 data_cache.put(cid.to_string(), cache_entry);
265 }
266
267 let mut stats = self.stats.write().await;
269 stats.compressed_cache_hits += 1;
270 stats.avg_access_time_us = (stats.avg_access_time_us
271 + start_time.elapsed().as_micros() as f64)
272 / 2.0;
273
274 debug!(
275 "Cache hit (compressed): {} ({} bytes decompressed)",
276 cid,
277 decompressed.len()
278 );
279 return Some(decompressed);
280 }
281 Err(e) => {
282 warn!("Falha ao descomprimir dados do cache para {}: {}", cid, e);
283 compressed_cache.pop(cid);
285 }
286 }
287 }
288 }
289
290 let mut stats = self.stats.write().await;
292 stats.data_cache_misses += 1;
293 stats.compressed_cache_misses += 1;
294
295 debug!("Cache miss: {}", cid);
296 None
297 }
298
299 #[instrument(skip(self, data))]
301 pub async fn put(&self, cid: &str, data: Bytes) -> Result<()> {
302 let data_size = data.len();
303 let integrity_hash = self.calculate_hash(&data);
304
305 let should_compress = data_size >= self.config.compression_threshold;
307
308 if should_compress {
309 match self.compress_data(&data).await {
311 Ok((compressed_data, compression_ratio)) => {
312 let compressed_entry = CompressedEntry {
313 compressed_data,
314 original_size: data_size,
315 compression_level: self.config.compression_level,
316 compressed_at: Instant::now(),
317 compression_ratio,
318 };
319
320 {
322 let mut compressed_cache = self.compressed_cache.write().await;
323 compressed_cache.put(cid.to_string(), compressed_entry);
324 }
325
326 let mut stats = self.stats.write().await;
328 stats.compressions_count += 1;
329 stats.bytes_saved_compression +=
330 (data_size as f64 * (1.0 - compression_ratio)) as u64;
331 stats.total_bytes_cached += data_size as u64;
332
333 info!(
334 "Dados comprimidos e armazenados: {} ({} bytes -> {} bytes, ratio: {:.2})",
335 cid,
336 data_size,
337 (data_size as f64 * compression_ratio) as usize,
338 compression_ratio
339 );
340 }
341 Err(e) => {
342 warn!(
343 "Falha na compressão para {}: {}. Armazenando sem compressão.",
344 cid, e
345 );
346 self.store_uncompressed(cid, data, integrity_hash).await?;
347 }
348 }
349 } else {
350 self.store_uncompressed(cid, data, integrity_hash).await?;
352 }
353
354 self.check_and_evict().await?;
356
357 Ok(())
358 }
359
360 async fn store_uncompressed(
362 &self,
363 cid: &str,
364 data: Bytes,
365 integrity_hash: [u8; 32],
366 ) -> Result<()> {
367 let cache_entry = CacheEntry {
368 data: data.clone(),
369 created_at: Instant::now(),
370 last_accessed: Instant::now(),
371 access_count: 1,
372 priority: 5, original_size: data.len(),
374 integrity_hash,
375 };
376
377 {
378 let mut data_cache = self.data_cache.write().await;
379 data_cache.put(cid.to_string(), cache_entry);
380 }
381
382 let mut stats = self.stats.write().await;
384 stats.total_bytes_cached += data.len() as u64;
385
386 debug!(
387 "Dados armazenados (sem compressão): {} ({} bytes)",
388 cid,
389 data.len()
390 );
391 Ok(())
392 }
393
394 async fn compress_data(&self, data: &Bytes) -> Result<(Bytes, f64)> {
396 let original_size = data.len();
397
398 let compressed = tokio::task::spawn_blocking({
399 let data = data.clone();
400 let compression_level = self.config.compression_level;
401 move || {
402 zstd::bulk::compress(&data, compression_level)
403 .map_err(|e| GuardianError::Other(format!("Falha na compressão: {}", e)))
404 }
405 })
406 .await
407 .map_err(|e| GuardianError::Other(format!("Task de compressão falhou: {}", e)))??;
408
409 let compressed_size = compressed.len();
410 let compression_ratio = compressed_size as f64 / original_size as f64;
411
412 Ok((Bytes::from(compressed), compression_ratio))
413 }
414
415 async fn decompress_data(
417 &self,
418 compressed_data: &Bytes,
419 expected_size: usize,
420 ) -> Result<Bytes> {
421 let decompressed = tokio::task::spawn_blocking({
422 let compressed_data = compressed_data.clone();
423 move || {
424 zstd::bulk::decompress(&compressed_data, expected_size)
425 .map_err(|e| GuardianError::Other(format!("Falha na descompressão: {}", e)))
426 }
427 })
428 .await
429 .map_err(|e| GuardianError::Other(format!("Task de descompressão falhou: {}", e)))??;
430
431 Ok(Bytes::from(decompressed))
432 }
433
434 fn calculate_hash(&self, data: &Bytes) -> [u8; 32] {
436 let mut hasher = Hasher::new();
437 hasher.update(data);
438 hasher.finalize().into()
439 }
440
441 async fn update_access_history(&self, cid: &str) {
443 let mut predictor = self.access_predictor.lock().await;
444 let now = Instant::now();
445
446 predictor
447 .access_history
448 .entry(cid.to_string())
449 .or_insert_with(Vec::new)
450 .push(now);
451
452 let analysis_window = predictor.analysis_window_secs; if let Some(history) = predictor.access_history.get_mut(cid) {
455 let cutoff = now - Duration::from_secs(analysis_window);
456 history.retain(|&access_time| access_time > cutoff);
457 }
458 }
459
460 async fn check_and_evict(&self) -> Result<()> {
462 let stats = self.stats.read().await;
463 let current_usage = stats.total_bytes_cached as f64;
464 let max_usage =
465 (self.config.max_data_cache_size + self.config.max_compressed_cache_size) as f64;
466
467 if current_usage / max_usage > self.config.eviction_threshold {
468 drop(stats); self.intelligent_eviction().await?;
470 }
471
472 Ok(())
473 }
474
475 async fn intelligent_eviction(&self) -> Result<()> {
477 debug!("Iniciando evicção inteligente do cache");
478
479 let candidates = {
481 let data_cache = self.data_cache.read().await;
482 data_cache
483 .iter()
484 .map(|(cid, entry)| {
485 let age_score =
486 Instant::now().duration_since(entry.last_accessed).as_secs() as f64;
487 let frequency_score = 1.0 / (entry.access_count as f64 + 1.0);
488 let priority_score = (10 - entry.priority) as f64;
489
490 let eviction_score =
492 age_score * 0.4 + frequency_score * 0.3 + priority_score * 0.3;
493
494 (cid.clone(), eviction_score, entry.data.len())
495 })
496 .collect::<Vec<_>>()
497 };
498
499 let mut sorted_candidates = candidates;
501 sorted_candidates
502 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
503
504 let eviction_count = (sorted_candidates.len() as f64 * 0.2).ceil() as usize;
506 let mut bytes_freed = 0u64;
507
508 {
509 let mut data_cache = self.data_cache.write().await;
510 for (cid, _score, size) in sorted_candidates.iter().take(eviction_count) {
511 if data_cache.pop(cid).is_some() {
512 bytes_freed += *size as u64;
513 }
514 }
515 }
516
517 {
519 let mut stats = self.stats.write().await;
520 stats.evictions_count += eviction_count as u64;
521 stats.total_bytes_cached = stats.total_bytes_cached.saturating_sub(bytes_freed);
522 }
523
524 info!(
525 "Evicção concluída: {} entradas removidas, {} bytes liberados",
526 eviction_count, bytes_freed
527 );
528
529 Ok(())
530 }
531
532 pub async fn get_stats(&self) -> CacheStats {
534 let stats = self.stats.read().await;
535 let mut stats_copy = stats.clone();
536
537 let total_requests = stats_copy.data_cache_hits
539 + stats_copy.data_cache_misses
540 + stats_copy.compressed_cache_hits
541 + stats_copy.compressed_cache_misses;
542 let total_hits = stats_copy.data_cache_hits + stats_copy.compressed_cache_hits;
543
544 if total_requests > 0 {
545 stats_copy.hit_rate = total_hits as f64 / total_requests as f64;
546 }
547
548 stats_copy
549 }
550
551 pub async fn clear(&self) -> Result<()> {
553 {
554 let mut data_cache = self.data_cache.write().await;
555 data_cache.clear();
556 }
557
558 {
559 let mut compressed_cache = self.compressed_cache.write().await;
560 compressed_cache.clear();
561 }
562
563 {
564 let mut metadata_cache = self.metadata_cache.write().await;
565 metadata_cache.clear();
566 }
567
568 {
569 let mut stats = self.stats.write().await;
570 *stats = CacheStats::default();
571 }
572
573 info!("Cache limpo completamente");
574 Ok(())
575 }
576}