guardian_db/ipfs_core_api/backends/
optimized_cache.rs

1/// Cache Layer Otimizado para Backend Iroh
2///
3/// Caching inteligente com compressão, TTL adaptativo e evicção preditiva
4/// para maximizar a performance do backend nativo Iroh.
5use crate::error::{GuardianError, Result};
6use blake3::Hasher;
7use bytes::Bytes;
8use iroh_blobs::BlobFormat;
9use lru::LruCache;
10use std::collections::HashMap;
11use std::num::NonZeroUsize;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::{Mutex, RwLock};
15use tracing::{debug, info, instrument, warn};
16
17/// Cache Layer otimizado para operações IPFS/Iroh
18pub struct OptimizedCache {
19    /// Cache LRU para dados recentes
20    data_cache: Arc<RwLock<LruCache<String, CacheEntry>>>,
21    /// Cache de metadados para CIDs
22    metadata_cache: Arc<RwLock<HashMap<String, MetadataEntry>>>,
23    /// Cache de compressão para dados grandes
24    compressed_cache: Arc<RwLock<LruCache<String, CompressedEntry>>>,
25    /// Estatísticas de performance
26    stats: Arc<RwLock<CacheStats>>,
27    /// Configuração do cache
28    config: CacheConfig,
29    /// Predictor de acesso para evicção inteligente
30    access_predictor: Arc<Mutex<AccessPredictor>>,
31}
32
33/// Entrada no cache com metadados de performance
34#[derive(Debug, Clone)]
35pub struct CacheEntry {
36    /// Dados do blob
37    pub data: Bytes,
38    /// Timestamp de criação
39    pub created_at: Instant,
40    /// Último acesso
41    pub last_accessed: Instant,
42    /// Número de acessos
43    pub access_count: u64,
44    /// Prioridade (0-10, maior = mais importante)
45    pub priority: u8,
46    /// Tamanho original (antes da compressão, se aplicável)
47    pub original_size: usize,
48    /// Hash verificação de integridade
49    pub integrity_hash: [u8; 32],
50}
51
52/// Entrada comprimida para dados grandes
53#[derive(Debug, Clone)]
54pub struct CompressedEntry {
55    /// Dados comprimidos com zstd
56    pub compressed_data: Bytes,
57    /// Tamanho original
58    pub original_size: usize,
59    /// Nível de compressão usado
60    pub compression_level: i32,
61    /// Timestamp de compressão
62    pub compressed_at: Instant,
63    /// Ratio de compressão (0.0-1.0)
64    pub compression_ratio: f64,
65}
66
67/// Metadados para CIDs
68#[derive(Debug, Clone)]
69pub struct MetadataEntry {
70    /// Tamanho do blob
71    pub size: u64,
72    /// Formato do blob (Raw, DagCbor, etc.)
73    pub format: BlobFormat,
74    /// Peers que possuem o conteúdo
75    pub providers: Vec<String>,
76    /// Timestamp de descoberta
77    pub discovered_at: Instant,
78    /// Latência média de acesso (ms)
79    pub avg_access_latency_ms: f64,
80    /// Popularidade (frequência de acesso)
81    pub popularity_score: f64,
82}
83
84/// Estatísticas avançadas de cache
85#[derive(Debug, Clone, Default)]
86pub struct CacheStats {
87    /// Hits no cache de dados
88    pub data_cache_hits: u64,
89    /// Misses no cache de dados
90    pub data_cache_misses: u64,
91    /// Hits no cache comprimido
92    pub compressed_cache_hits: u64,
93    /// Misses no cache comprimido
94    pub compressed_cache_misses: u64,
95    /// Total de bytes armazenados
96    pub total_bytes_cached: u64,
97    /// Bytes economizados com compressão
98    pub bytes_saved_compression: u64,
99    /// Bytes economizados evitando downloads
100    pub bytes_saved_network: u64,
101    /// Tempo médio de acesso (microsegundos)
102    pub avg_access_time_us: f64,
103    /// Taxa de hit global
104    pub hit_rate: f64,
105    /// Número de evicções
106    pub evictions_count: u64,
107    /// Número de compressões realizadas
108    pub compressions_count: u64,
109}
110
111/// Configuração do cache otimizado
112#[derive(Debug, Clone)]
113pub struct CacheConfig {
114    /// Tamanho máximo do cache de dados (bytes)
115    pub max_data_cache_size: usize,
116    /// Número máximo de entradas no cache de dados
117    pub max_data_entries: usize,
118    /// Tamanho máximo do cache comprimido (bytes)
119    pub max_compressed_cache_size: usize,
120    /// Número máximo de entradas no cache comprimido
121    pub max_compressed_entries: usize,
122    /// TTL padrão para entradas (segundos)
123    pub default_ttl_secs: u64,
124    /// Threshold para ativar compressão (bytes)
125    pub compression_threshold: usize,
126    /// Nível de compressão zstd (1-22)
127    pub compression_level: i32,
128    /// Threshold para evicção (0.0-1.0)
129    pub eviction_threshold: f64,
130    /// Habilitar predictor de acesso
131    pub enable_access_prediction: bool,
132}
133
134/// Predictor de acesso usando padrões de uso
135#[derive(Debug)]
136pub struct AccessPredictor {
137    /// Histórico de acessos por CID
138    access_history: HashMap<String, Vec<Instant>>,
139    /// Padrões identificados
140    #[allow(dead_code)]
141    patterns: HashMap<String, AccessPattern>,
142    /// Janela de análise (segundos)
143    analysis_window_secs: u64,
144}
145
146/// Padrão de acesso identificado
147#[derive(Debug, Clone)]
148pub struct AccessPattern {
149    /// Frequência média de acesso (acessos por hora)
150    pub avg_frequency: f64,
151    /// Horários de pico
152    pub peak_hours: Vec<u8>,
153    /// Probabilidade de reacesso nas próximas horas
154    pub reaccess_probability: f64,
155    /// Tipo de padrão identificado
156    pub pattern_type: PatternType,
157}
158
159/// Tipos de padrões de acesso
160#[derive(Debug, Clone, PartialEq)]
161pub enum PatternType {
162    /// Acesso único (pouco provável de reacesso)
163    OneTime,
164    /// Acesso regular (padrão consistente)
165    Regular,
166    /// Acesso burst (picos intensos)
167    Burst,
168    /// Acesso sazonal (por horário/dia)
169    Seasonal,
170}
171
172impl Default for CacheConfig {
173    fn default() -> Self {
174        Self {
175            max_data_cache_size: 256 * 1024 * 1024, // 256MB
176            max_data_entries: 10_000,
177            max_compressed_cache_size: 1024 * 1024 * 1024, // 1GB
178            max_compressed_entries: 50_000,
179            default_ttl_secs: 3600,           // 1 hora
180            compression_threshold: 64 * 1024, // 64KB
181            compression_level: 6,             // Balanço entre speed/compression
182            eviction_threshold: 0.85,
183            enable_access_prediction: true,
184        }
185    }
186}
187
188impl OptimizedCache {
189    /// Cria nova instância do cache otimizado
190    pub fn new(config: CacheConfig) -> Self {
191        let data_cache_size = NonZeroUsize::new(config.max_data_entries)
192            .unwrap_or(NonZeroUsize::new(10_000).unwrap());
193        let compressed_cache_size = NonZeroUsize::new(config.max_compressed_entries)
194            .unwrap_or(NonZeroUsize::new(50_000).unwrap());
195
196        Self {
197            data_cache: Arc::new(RwLock::new(LruCache::new(data_cache_size))),
198            metadata_cache: Arc::new(RwLock::new(HashMap::new())),
199            compressed_cache: Arc::new(RwLock::new(LruCache::new(compressed_cache_size))),
200            stats: Arc::new(RwLock::new(CacheStats::default())),
201            config,
202            access_predictor: Arc::new(Mutex::new(AccessPredictor {
203                access_history: HashMap::new(),
204                patterns: HashMap::new(),
205                analysis_window_secs: 3600 * 24, // 24 horas
206            })),
207        }
208    }
209
210    /// Busca dados no cache com otimizações inteligentes
211    #[instrument(skip(self))]
212    pub async fn get(&self, cid: &str) -> Option<Bytes> {
213        let start_time = Instant::now();
214
215        // Atualiza histórico de acesso
216        if self.config.enable_access_prediction {
217            self.update_access_history(cid).await;
218        }
219
220        // Tenta cache de dados primeiro (mais rápido)
221        {
222            let mut cache = self.data_cache.write().await;
223            if let Some(entry) = cache.get_mut(cid) {
224                entry.last_accessed = Instant::now();
225                entry.access_count += 1;
226
227                // Atualiza estatísticas
228                let mut stats = self.stats.write().await;
229                stats.data_cache_hits += 1;
230                stats.avg_access_time_us =
231                    (stats.avg_access_time_us + start_time.elapsed().as_micros() as f64) / 2.0;
232
233                debug!("Cache hit (data): {} ({} bytes)", cid, entry.data.len());
234                return Some(entry.data.clone());
235            }
236        }
237
238        // Tenta cache comprimido
239        {
240            let mut compressed_cache = self.compressed_cache.write().await;
241            if let Some(compressed_entry) = compressed_cache.get_mut(cid) {
242                // Descomprime os dados
243                match self
244                    .decompress_data(
245                        &compressed_entry.compressed_data,
246                        compressed_entry.original_size,
247                    )
248                    .await
249                {
250                    Ok(decompressed) => {
251                        // Move para cache de dados para acesso mais rápido
252                        let cache_entry = CacheEntry {
253                            data: decompressed.clone(),
254                            created_at: compressed_entry.compressed_at,
255                            last_accessed: Instant::now(),
256                            access_count: 1,
257                            priority: 7, // Prioridade alta para dados descomprimidos
258                            original_size: compressed_entry.original_size,
259                            integrity_hash: self.calculate_hash(&decompressed),
260                        };
261
262                        {
263                            let mut data_cache = self.data_cache.write().await;
264                            data_cache.put(cid.to_string(), cache_entry);
265                        }
266
267                        // Atualiza estatísticas
268                        let mut stats = self.stats.write().await;
269                        stats.compressed_cache_hits += 1;
270                        stats.avg_access_time_us = (stats.avg_access_time_us
271                            + start_time.elapsed().as_micros() as f64)
272                            / 2.0;
273
274                        debug!(
275                            "Cache hit (compressed): {} ({} bytes decompressed)",
276                            cid,
277                            decompressed.len()
278                        );
279                        return Some(decompressed);
280                    }
281                    Err(e) => {
282                        warn!("Falha ao descomprimir dados do cache para {}: {}", cid, e);
283                        // Remove entrada corrompida
284                        compressed_cache.pop(cid);
285                    }
286                }
287            }
288        }
289
290        // Miss em ambos os caches
291        let mut stats = self.stats.write().await;
292        stats.data_cache_misses += 1;
293        stats.compressed_cache_misses += 1;
294
295        debug!("Cache miss: {}", cid);
296        None
297    }
298
299    /// Armazena dados no cache com otimização automática
300    #[instrument(skip(self, data))]
301    pub async fn put(&self, cid: &str, data: Bytes) -> Result<()> {
302        let data_size = data.len();
303        let integrity_hash = self.calculate_hash(&data);
304
305        // Decide se deve comprimir baseado no tamanho
306        let should_compress = data_size >= self.config.compression_threshold;
307
308        if should_compress {
309            // Tenta compressão
310            match self.compress_data(&data).await {
311                Ok((compressed_data, compression_ratio)) => {
312                    let compressed_entry = CompressedEntry {
313                        compressed_data,
314                        original_size: data_size,
315                        compression_level: self.config.compression_level,
316                        compressed_at: Instant::now(),
317                        compression_ratio,
318                    };
319
320                    // Armazena no cache comprimido
321                    {
322                        let mut compressed_cache = self.compressed_cache.write().await;
323                        compressed_cache.put(cid.to_string(), compressed_entry);
324                    }
325
326                    // Atualiza estatísticas
327                    let mut stats = self.stats.write().await;
328                    stats.compressions_count += 1;
329                    stats.bytes_saved_compression +=
330                        (data_size as f64 * (1.0 - compression_ratio)) as u64;
331                    stats.total_bytes_cached += data_size as u64;
332
333                    info!(
334                        "Dados comprimidos e armazenados: {} ({} bytes -> {} bytes, ratio: {:.2})",
335                        cid,
336                        data_size,
337                        (data_size as f64 * compression_ratio) as usize,
338                        compression_ratio
339                    );
340                }
341                Err(e) => {
342                    warn!(
343                        "Falha na compressão para {}: {}. Armazenando sem compressão.",
344                        cid, e
345                    );
346                    self.store_uncompressed(cid, data, integrity_hash).await?;
347                }
348            }
349        } else {
350            // Armazena sem compressão
351            self.store_uncompressed(cid, data, integrity_hash).await?;
352        }
353
354        // Verifica se precisa fazer evicção
355        self.check_and_evict().await?;
356
357        Ok(())
358    }
359
360    /// Armazena dados sem compressão
361    async fn store_uncompressed(
362        &self,
363        cid: &str,
364        data: Bytes,
365        integrity_hash: [u8; 32],
366    ) -> Result<()> {
367        let cache_entry = CacheEntry {
368            data: data.clone(),
369            created_at: Instant::now(),
370            last_accessed: Instant::now(),
371            access_count: 1,
372            priority: 5, // Prioridade padrão
373            original_size: data.len(),
374            integrity_hash,
375        };
376
377        {
378            let mut data_cache = self.data_cache.write().await;
379            data_cache.put(cid.to_string(), cache_entry);
380        }
381
382        // Atualiza estatísticas
383        let mut stats = self.stats.write().await;
384        stats.total_bytes_cached += data.len() as u64;
385
386        debug!(
387            "Dados armazenados (sem compressão): {} ({} bytes)",
388            cid,
389            data.len()
390        );
391        Ok(())
392    }
393
394    /// Comprime dados usando zstd
395    async fn compress_data(&self, data: &Bytes) -> Result<(Bytes, f64)> {
396        let original_size = data.len();
397
398        let compressed = tokio::task::spawn_blocking({
399            let data = data.clone();
400            let compression_level = self.config.compression_level;
401            move || {
402                zstd::bulk::compress(&data, compression_level)
403                    .map_err(|e| GuardianError::Other(format!("Falha na compressão: {}", e)))
404            }
405        })
406        .await
407        .map_err(|e| GuardianError::Other(format!("Task de compressão falhou: {}", e)))??;
408
409        let compressed_size = compressed.len();
410        let compression_ratio = compressed_size as f64 / original_size as f64;
411
412        Ok((Bytes::from(compressed), compression_ratio))
413    }
414
415    /// Descomprime dados usando zstd
416    async fn decompress_data(
417        &self,
418        compressed_data: &Bytes,
419        expected_size: usize,
420    ) -> Result<Bytes> {
421        let decompressed = tokio::task::spawn_blocking({
422            let compressed_data = compressed_data.clone();
423            move || {
424                zstd::bulk::decompress(&compressed_data, expected_size)
425                    .map_err(|e| GuardianError::Other(format!("Falha na descompressão: {}", e)))
426            }
427        })
428        .await
429        .map_err(|e| GuardianError::Other(format!("Task de descompressão falhou: {}", e)))??;
430
431        Ok(Bytes::from(decompressed))
432    }
433
434    /// Calcula hash de integridade
435    fn calculate_hash(&self, data: &Bytes) -> [u8; 32] {
436        let mut hasher = Hasher::new();
437        hasher.update(data);
438        hasher.finalize().into()
439    }
440
441    /// Atualiza histórico de acesso para predição
442    async fn update_access_history(&self, cid: &str) {
443        let mut predictor = self.access_predictor.lock().await;
444        let now = Instant::now();
445
446        predictor
447            .access_history
448            .entry(cid.to_string())
449            .or_insert_with(Vec::new)
450            .push(now);
451
452        // Limita o histórico para não crescer indefinidamente
453        let analysis_window = predictor.analysis_window_secs; // Copia valor antes do empréstimo
454        if let Some(history) = predictor.access_history.get_mut(cid) {
455            let cutoff = now - Duration::from_secs(analysis_window);
456            history.retain(|&access_time| access_time > cutoff);
457        }
458    }
459
460    /// Verifica se precisa fazer evicção e executa se necessário
461    async fn check_and_evict(&self) -> Result<()> {
462        let stats = self.stats.read().await;
463        let current_usage = stats.total_bytes_cached as f64;
464        let max_usage =
465            (self.config.max_data_cache_size + self.config.max_compressed_cache_size) as f64;
466
467        if current_usage / max_usage > self.config.eviction_threshold {
468            drop(stats); // Libera o lock
469            self.intelligent_eviction().await?;
470        }
471
472        Ok(())
473    }
474
475    /// Executa evicção inteligente baseada em padrões de acesso
476    async fn intelligent_eviction(&self) -> Result<()> {
477        debug!("Iniciando evicção inteligente do cache");
478
479        // Coleta candidatos para evicção do cache de dados
480        let candidates = {
481            let data_cache = self.data_cache.read().await;
482            data_cache
483                .iter()
484                .map(|(cid, entry)| {
485                    let age_score =
486                        Instant::now().duration_since(entry.last_accessed).as_secs() as f64;
487                    let frequency_score = 1.0 / (entry.access_count as f64 + 1.0);
488                    let priority_score = (10 - entry.priority) as f64;
489
490                    // Score maior = melhor candidato para evicção
491                    let eviction_score =
492                        age_score * 0.4 + frequency_score * 0.3 + priority_score * 0.3;
493
494                    (cid.clone(), eviction_score, entry.data.len())
495                })
496                .collect::<Vec<_>>()
497        };
498
499        // Ordena por score de evicção (maior primeiro)
500        let mut sorted_candidates = candidates;
501        sorted_candidates
502            .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
503
504        // Remove 20% dos candidatos
505        let eviction_count = (sorted_candidates.len() as f64 * 0.2).ceil() as usize;
506        let mut bytes_freed = 0u64;
507
508        {
509            let mut data_cache = self.data_cache.write().await;
510            for (cid, _score, size) in sorted_candidates.iter().take(eviction_count) {
511                if data_cache.pop(cid).is_some() {
512                    bytes_freed += *size as u64;
513                }
514            }
515        }
516
517        // Atualiza estatísticas
518        {
519            let mut stats = self.stats.write().await;
520            stats.evictions_count += eviction_count as u64;
521            stats.total_bytes_cached = stats.total_bytes_cached.saturating_sub(bytes_freed);
522        }
523
524        info!(
525            "Evicção concluída: {} entradas removidas, {} bytes liberados",
526            eviction_count, bytes_freed
527        );
528
529        Ok(())
530    }
531
532    /// Obtém estatísticas atuais do cache
533    pub async fn get_stats(&self) -> CacheStats {
534        let stats = self.stats.read().await;
535        let mut stats_copy = stats.clone();
536
537        // Calcula taxa de hit
538        let total_requests = stats_copy.data_cache_hits
539            + stats_copy.data_cache_misses
540            + stats_copy.compressed_cache_hits
541            + stats_copy.compressed_cache_misses;
542        let total_hits = stats_copy.data_cache_hits + stats_copy.compressed_cache_hits;
543
544        if total_requests > 0 {
545            stats_copy.hit_rate = total_hits as f64 / total_requests as f64;
546        }
547
548        stats_copy
549    }
550
551    /// Limpa todo o cache
552    pub async fn clear(&self) -> Result<()> {
553        {
554            let mut data_cache = self.data_cache.write().await;
555            data_cache.clear();
556        }
557
558        {
559            let mut compressed_cache = self.compressed_cache.write().await;
560            compressed_cache.clear();
561        }
562
563        {
564            let mut metadata_cache = self.metadata_cache.write().await;
565            metadata_cache.clear();
566        }
567
568        {
569            let mut stats = self.stats.write().await;
570            *stats = CacheStats::default();
571        }
572
573        info!("Cache limpo completamente");
574        Ok(())
575    }
576}