ant_core/bootstrap/
cache.rs

1//! Bootstrap Cache Implementation
2//!
3//! Manages a persistent cache of peer contacts with quality-based selection,
4//! automatic cleanup, and multi-instance coordination.
5
6use crate::{PeerId, Result, P2PError};
7use crate::bootstrap::{ContactEntry, QualityMetrics, QualityCalculator, CacheStats};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::time::{Duration, SystemTime};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use serde::{Deserialize, Serialize};
14use tracing::{debug, info, warn, error};
15
16/// Bootstrap cache configuration
17#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
18pub struct CacheConfig {
19    /// Directory where cache files are stored
20    pub cache_dir: PathBuf,
21    /// Maximum number of contacts to keep in cache
22    pub max_contacts: usize,
23    /// Interval between cache merge operations
24    pub merge_interval: Duration,
25    /// Interval between cache cleanup operations
26    pub cleanup_interval: Duration,
27    /// Interval between quality score updates
28    pub quality_update_interval: Duration,
29    /// Age threshold for considering contacts stale
30    pub stale_threshold: Duration,
31    /// Interval between connectivity checks
32    pub connectivity_check_interval: Duration,
33    /// Number of peers to check connectivity with
34    pub connectivity_check_count: usize,
35}
36
37impl Default for CacheConfig {
38    fn default() -> Self {
39        Self {
40            cache_dir: PathBuf::from(".cache/p2p_foundation"),
41            max_contacts: crate::bootstrap::DEFAULT_MAX_CONTACTS,
42            merge_interval: crate::bootstrap::DEFAULT_MERGE_INTERVAL,
43            cleanup_interval: crate::bootstrap::DEFAULT_CLEANUP_INTERVAL,
44            quality_update_interval: crate::bootstrap::DEFAULT_QUALITY_UPDATE_INTERVAL,
45            stale_threshold: Duration::from_secs(86400 * 7), // 7 days
46            connectivity_check_interval: Duration::from_secs(900), // 15 minutes
47            connectivity_check_count: 100, // Check top 100 peers
48        }
49    }
50}
51
52/// Bootstrap cache errors
53#[derive(Debug, thiserror::Error)]
54pub enum CacheError {
55    /// File I/O operation failed
56    #[error("I/O error: {0}")]
57    Io(#[from] std::io::Error),
58    
59    /// JSON serialization/deserialization failed
60    #[error("Serialization error: {0}")]
61    Serialization(#[from] serde_json::Error),
62    
63    /// Failed to acquire lock on cache
64    #[error("Lock error: {0}")]
65    Lock(String),
66    
67    /// Cache file corruption detected
68    #[error("Cache corruption: {0}")]
69    Corruption(String),
70    
71    /// Configuration error
72    #[error("Configuration error: {0}")]
73    Configuration(String),
74}
75
76/// Main bootstrap cache implementation
77#[derive(Clone)]
78pub struct BootstrapCache {
79    config: CacheConfig,
80    contacts: Arc<RwLock<HashMap<PeerId, ContactEntry>>>,
81    instance_id: String,
82    cache_file: PathBuf,
83    instance_cache_file: PathBuf,
84    lock_file: PathBuf,
85    metadata_file: PathBuf,
86    quality_calculator: QualityCalculator,
87    stats: Arc<RwLock<CacheStats>>,
88}
89
90/// Cached data structure for persistence
91#[derive(Debug, Serialize, Deserialize)]
92struct CacheData {
93    version: u32,
94    instance_id: String,
95    timestamp: chrono::DateTime<chrono::Utc>,
96    contacts: HashMap<PeerId, ContactEntry>,
97    checksum: u64,
98}
99
100/// Cache metadata for health monitoring
101#[derive(Debug, Serialize, Deserialize)]
102struct CacheMetadata {
103    last_merge: chrono::DateTime<chrono::Utc>,
104    last_cleanup: chrono::DateTime<chrono::Utc>,
105    last_quality_update: chrono::DateTime<chrono::Utc>,
106    total_merges: u64,
107    total_cleanups: u64,
108    corruption_count: u64,
109    instance_count: u64,
110}
111
112impl BootstrapCache {
113    /// Create a new bootstrap cache
114    pub async fn new(cache_dir: PathBuf, config: CacheConfig) -> Result<Self> {
115        // Ensure cache directory exists
116        std::fs::create_dir_all(&cache_dir)
117            .map_err(|e| P2PError::Bootstrap(format!("Failed to create cache directory: {}", e)))?;
118        
119        let instance_id = generate_instance_id();
120        
121        let cache_file = cache_dir.join("bootstrap_cache.json");
122        let instance_cache_file = cache_dir.join("instance_caches").join(format!("{}.cache", instance_id));
123        let lock_file = cache_dir.join("bootstrap_cache.lock");
124        let metadata_file = cache_dir.join("metadata.json");
125        
126        // Ensure instance cache directory exists
127        std::fs::create_dir_all(instance_cache_file.parent().unwrap())
128            .map_err(|e| P2PError::Bootstrap(format!("Failed to create instance cache directory: {}", e)))?;
129        
130        let mut cache = Self {
131            config: config.clone(),
132            contacts: Arc::new(RwLock::new(HashMap::new())),
133            instance_id,
134            cache_file,
135            instance_cache_file,
136            lock_file,
137            metadata_file,
138            quality_calculator: QualityCalculator::new(),
139            stats: Arc::new(RwLock::new(CacheStats::default())),
140        };
141        
142        // Load existing cache
143        cache.load_from_disk().await?;
144        
145        info!("Bootstrap cache initialized with {} contacts", cache.contacts.read().await.len());
146        
147        Ok(cache)
148    }
149    
150    /// Get bootstrap peers for initial connection
151    pub async fn get_bootstrap_peers(&self, count: usize) -> Result<Vec<ContactEntry>> {
152        let contacts = self.contacts.read().await;
153        
154        let mut sorted_contacts: Vec<&ContactEntry> = contacts.values().collect();
155        
156        // Sort by quality score in descending order
157        sorted_contacts.sort_by(|a, b| {
158            b.quality_metrics.quality_score
159                .partial_cmp(&a.quality_metrics.quality_score)
160                .unwrap_or(std::cmp::Ordering::Equal)
161        });
162        
163        let selected: Vec<ContactEntry> = sorted_contacts
164            .into_iter()
165            .take(count)
166            .cloned()
167            .collect();
168        
169        // Update stats
170        {
171            let mut stats = self.stats.write().await;
172            stats.cache_hit_rate = if !contacts.is_empty() {
173                selected.len() as f64 / count.min(contacts.len()) as f64
174            } else {
175                0.0
176            };
177        }
178        
179        debug!("Selected {} bootstrap peers from {} available contacts", selected.len(), contacts.len());
180        
181        Ok(selected)
182    }
183    
184    /// Add or update a contact
185    pub async fn add_contact(&mut self, contact: ContactEntry) -> Result<()> {
186        let mut contacts = self.contacts.write().await;
187        
188        // Check if we need to evict contacts
189        if contacts.len() >= self.config.max_contacts && !contacts.contains_key(&contact.peer_id) {
190            self.evict_lowest_quality_contacts(&mut contacts).await?;
191        }
192        
193        contacts.insert(contact.peer_id.clone(), contact.clone());
194        drop(contacts);
195        
196        // Save to instance cache
197        self.save_to_instance_cache().await?;
198        
199        debug!("Added contact: {}", contact.summary());
200        
201        Ok(())
202    }
203    
204    /// Update contact metrics
205    pub async fn update_contact_metrics(&mut self, peer_id: &PeerId, metrics: QualityMetrics) -> Result<()> {
206        let mut contacts = self.contacts.write().await;
207        
208        if let Some(contact) = contacts.get_mut(peer_id) {
209            contact.quality_metrics = metrics;
210            contact.recalculate_quality_score();
211            
212            debug!("Updated metrics for peer {}: {}", peer_id, contact.summary());
213        }
214        
215        Ok(())
216    }
217    
218    /// Update quality scores for all contacts
219    pub async fn update_quality_scores(&self) -> Result<()> {
220        let mut contacts = self.contacts.write().await;
221        let mut updated_count = 0;
222        
223        for contact in contacts.values_mut() {
224            let old_score = contact.quality_metrics.quality_score;
225            
226            // Apply age decay
227            let age_seconds = contact.age_seconds() as f64;
228            let decay_factor = (-age_seconds / 86400.0).exp(); // 24 hour half-life
229            contact.quality_metrics.apply_age_decay(decay_factor);
230            
231            // Recalculate quality score
232            contact.recalculate_quality_score();
233            
234            if (contact.quality_metrics.quality_score - old_score).abs() > 0.01 {
235                updated_count += 1;
236            }
237        }
238        
239        // Update metadata
240        self.update_metadata(|meta| {
241            meta.last_quality_update = chrono::Utc::now();
242        }).await?;
243        
244        debug!("Updated quality scores for {} contacts", updated_count);
245        
246        Ok(())
247    }
248    
249    /// Clean up stale entries
250    pub async fn cleanup_stale_entries(&self) -> Result<()> {
251        let mut contacts = self.contacts.write().await;
252        let initial_count = contacts.len();
253        
254        // Remove stale contacts
255        contacts.retain(|_peer_id, contact| {
256            !contact.is_stale(self.config.stale_threshold)
257        });
258        
259        let removed_count = initial_count - contacts.len();
260        
261        if removed_count > 0 {
262            info!("Cleaned up {} stale contacts", removed_count);
263            
264            // Save updated cache
265            drop(contacts);
266            self.save_to_disk().await?;
267        }
268        
269        // Update metadata
270        self.update_metadata(|meta| {
271            meta.last_cleanup = chrono::Utc::now();
272            meta.total_cleanups += 1;
273        }).await?;
274        
275        Ok(())
276    }
277    
278    /// Get all contacts (for merge operations)
279    pub async fn get_all_contacts(&self) -> HashMap<PeerId, ContactEntry> {
280        self.contacts.read().await.clone()
281    }
282    
283    /// Set all contacts (for merge operations)
284    pub async fn set_all_contacts(&self, contacts: HashMap<PeerId, ContactEntry>) {
285        let mut current_contacts = self.contacts.write().await;
286        *current_contacts = contacts;
287    }
288    
289    /// Get cache statistics
290    pub async fn get_stats(&self) -> Result<CacheStats> {
291        let contacts = self.contacts.read().await;
292        let mut stats = self.stats.write().await;
293        
294        stats.total_contacts = contacts.len();
295        stats.high_quality_contacts = contacts.values()
296            .filter(|c| c.quality_metrics.quality_score > 0.7)
297            .count();
298        stats.verified_contacts = contacts.values()
299            .filter(|c| c.ipv6_identity_verified)
300            .count();
301        
302        if !contacts.is_empty() {
303            stats.average_quality_score = contacts.values()
304                .map(|c| c.quality_metrics.quality_score)
305                .sum::<f64>() / contacts.len() as f64;
306        }
307        
308        Ok(stats.clone())
309    }
310    
311    /// Load cache from disk
312    async fn load_from_disk(&mut self) -> Result<()> {
313        if !self.cache_file.exists() {
314            debug!("No existing cache file found, starting with empty cache");
315            return Ok(());
316        }
317        
318        let _lock = self.acquire_file_lock().await?;
319        
320        match self.load_cache_data().await {
321            Ok(cache_data) => {
322                if self.verify_cache_integrity(&cache_data) {
323                    let mut contacts = self.contacts.write().await;
324                    *contacts = cache_data.contacts;
325                    info!("Loaded {} contacts from cache", contacts.len());
326                } else {
327                    warn!("Cache integrity check failed, starting with empty cache");
328                    self.handle_cache_corruption().await?;
329                }
330            }
331            Err(e) => {
332                warn!("Failed to load cache: {}, starting with empty cache", e);
333                self.handle_cache_corruption().await?;
334            }
335        }
336        
337        Ok(())
338    }
339    
340    /// Save cache to disk
341    pub async fn save_to_disk(&self) -> Result<()> {
342        let _lock = self.acquire_file_lock().await?;
343        
344        let contacts = self.contacts.read().await;
345        let cache_data = CacheData {
346            version: 1,
347            instance_id: self.instance_id.clone(),
348            timestamp: chrono::Utc::now(),
349            contacts: contacts.clone(),
350            checksum: self.calculate_checksum(&contacts),
351        };
352        
353        // Write to temporary file first for atomic operation
354        let temp_file = self.cache_file.with_extension("tmp");
355        let json_data = serde_json::to_string_pretty(&cache_data)
356            .map_err(|e| P2PError::Bootstrap(format!("Failed to serialize cache: {}", e)))?;
357        
358        std::fs::write(&temp_file, json_data)
359            .map_err(|e| P2PError::Bootstrap(format!("Failed to write cache file: {}", e)))?;
360        
361        // Atomic rename
362        std::fs::rename(temp_file, &self.cache_file)
363            .map_err(|e| P2PError::Bootstrap(format!("Failed to rename cache file: {}", e)))?;
364        
365        debug!("Saved {} contacts to cache", contacts.len());
366        
367        Ok(())
368    }
369    
370    /// Save to instance-specific cache
371    async fn save_to_instance_cache(&self) -> Result<()> {
372        let contacts = self.contacts.read().await;
373        let cache_data = CacheData {
374            version: 1,
375            instance_id: self.instance_id.clone(),
376            timestamp: chrono::Utc::now(),
377            contacts: contacts.clone(),
378            checksum: self.calculate_checksum(&contacts),
379        };
380        
381        let json_data = serde_json::to_string(&cache_data)
382            .map_err(|e| P2PError::Bootstrap(format!("Failed to serialize instance cache: {}", e)))?;
383        
384        std::fs::write(&self.instance_cache_file, json_data)
385            .map_err(|e| P2PError::Bootstrap(format!("Failed to write instance cache: {}", e)))?;
386        
387        Ok(())
388    }
389    
390    /// Acquire file lock for atomic operations
391    async fn acquire_file_lock(&self) -> Result<FileLock> {
392        FileLock::acquire(&self.lock_file).await
393    }
394    
395    /// Load cache data from file
396    async fn load_cache_data(&self) -> Result<CacheData> {
397        let json_data = std::fs::read_to_string(&self.cache_file)
398            .map_err(|e| P2PError::Bootstrap(format!("Failed to read cache file: {}", e)))?;
399        
400        let cache_data: CacheData = serde_json::from_str(&json_data)
401            .map_err(|e| P2PError::Bootstrap(format!("Failed to parse cache file: {}", e)))?;
402        
403        Ok(cache_data)
404    }
405    
406    /// Verify cache integrity
407    fn verify_cache_integrity(&self, cache_data: &CacheData) -> bool {
408        let calculated_checksum = self.calculate_checksum(&cache_data.contacts);
409        cache_data.checksum == calculated_checksum
410    }
411    
412    /// Calculate checksum for cache integrity
413    fn calculate_checksum(&self, contacts: &HashMap<PeerId, ContactEntry>) -> u64 {
414        use std::collections::hash_map::DefaultHasher;
415        use std::hash::{Hash, Hasher};
416        
417        let mut hasher = DefaultHasher::new();
418        
419        // Sort by peer ID for consistent hashing
420        let mut sorted_contacts: Vec<_> = contacts.iter().collect();
421        sorted_contacts.sort_by_key(|(peer_id, _)| *peer_id);
422        
423        for (peer_id, contact) in sorted_contacts {
424            peer_id.hash(&mut hasher);
425            contact.quality_metrics.success_rate.to_bits().hash(&mut hasher);
426            contact.addresses.len().hash(&mut hasher);
427        }
428        
429        hasher.finish()
430    }
431    
432    /// Handle cache corruption
433    async fn handle_cache_corruption(&self) -> Result<()> {
434        warn!("Handling cache corruption, backing up corrupted file");
435        
436        if self.cache_file.exists() {
437            let backup_file = self.cache_file.with_extension("corrupted");
438            if let Err(e) = std::fs::rename(&self.cache_file, backup_file) {
439                error!("Failed to backup corrupted cache: {}", e);
440            }
441        }
442        
443        // Update corruption count in metadata
444        self.update_metadata(|meta| {
445            meta.corruption_count += 1;
446        }).await?;
447        
448        Ok(())
449    }
450    
451    /// Evict lowest quality contacts to make room
452    async fn evict_lowest_quality_contacts(&self, contacts: &mut HashMap<PeerId, ContactEntry>) -> Result<()> {
453        let eviction_count = (self.config.max_contacts / 10).max(1); // Evict 10% or at least 1
454        
455        let mut sorted_contacts: Vec<_> = contacts.iter().collect();
456        sorted_contacts.sort_by(|a, b| {
457            a.1.quality_metrics.quality_score
458                .partial_cmp(&b.1.quality_metrics.quality_score)
459                .unwrap_or(std::cmp::Ordering::Equal)
460        });
461        
462        let to_evict: Vec<PeerId> = sorted_contacts
463            .into_iter()
464            .take(eviction_count)
465            .map(|(peer_id, _)| peer_id.clone())
466            .collect();
467        
468        for peer_id in to_evict {
469            contacts.remove(&peer_id);
470        }
471        
472        debug!("Evicted {} lowest quality contacts", eviction_count);
473        
474        Ok(())
475    }
476    
477    /// Update metadata
478    async fn update_metadata<F>(&self, updater: F) -> Result<()>
479    where
480        F: FnOnce(&mut CacheMetadata),
481    {
482        let mut metadata = if self.metadata_file.exists() {
483            let json_data = std::fs::read_to_string(&self.metadata_file)?;
484            serde_json::from_str(&json_data).unwrap_or_default()
485        } else {
486            CacheMetadata::default()
487        };
488        
489        updater(&mut metadata);
490        
491        let json_data = serde_json::to_string_pretty(&metadata)?;
492        std::fs::write(&self.metadata_file, json_data)?;
493        
494        Ok(())
495    }
496}
497
498impl Default for CacheStats {
499    fn default() -> Self {
500        Self {
501            total_contacts: 0,
502            high_quality_contacts: 0,
503            verified_contacts: 0,
504            last_merge: chrono::Utc::now(),
505            last_cleanup: chrono::Utc::now(),
506            cache_hit_rate: 0.0,
507            average_quality_score: 0.0,
508        }
509    }
510}
511
512impl Default for CacheMetadata {
513    fn default() -> Self {
514        let now = chrono::Utc::now();
515        Self {
516            last_merge: now,
517            last_cleanup: now,
518            last_quality_update: now,
519            total_merges: 0,
520            total_cleanups: 0,
521            corruption_count: 0,
522            instance_count: 0,
523        }
524    }
525}
526
527/// File locking for atomic operations
528struct FileLock {
529    _file: std::fs::File,
530}
531
532impl FileLock {
533    async fn acquire(lock_file: &PathBuf) -> Result<Self> {
534        use std::fs::OpenOptions;
535        
536        let file = OpenOptions::new()
537            .create(true)
538            .write(true)
539            .open(lock_file)
540            .map_err(|e| P2PError::Bootstrap(format!("Failed to create lock file: {}", e)))?;
541        
542        // In a production system, you'd use proper file locking here
543        // For now, we'll rely on atomic file operations
544        
545        Ok(Self { _file: file })
546    }
547}
548
549/// Generate unique instance ID
550fn generate_instance_id() -> String {
551    format!("{}_{}", std::process::id(), SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis())
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557    use tempfile::TempDir;
558
559    #[tokio::test]
560    async fn test_cache_creation() {
561        let temp_dir = TempDir::new().unwrap();
562        let config = CacheConfig {
563            cache_dir: temp_dir.path().to_path_buf(),
564            max_contacts: 100,
565            ..CacheConfig::default()
566        };
567        
568        let cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await;
569        assert!(cache.is_ok());
570    }
571    
572    #[tokio::test]
573    async fn test_add_and_retrieve_contacts() {
574        let temp_dir = TempDir::new().unwrap();
575        let config = CacheConfig {
576            cache_dir: temp_dir.path().to_path_buf(),
577            max_contacts: 100,
578            ..CacheConfig::default()
579        };
580        
581        let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
582        
583        let contact = ContactEntry::new(
584            PeerId::from("test-peer"),
585            vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
586        );
587        
588        cache.add_contact(contact).await.unwrap();
589        
590        let bootstrap_peers = cache.get_bootstrap_peers(10).await.unwrap();
591        assert_eq!(bootstrap_peers.len(), 1);
592    }
593    
594    #[tokio::test]
595    async fn test_cache_persistence() {
596        let temp_dir = TempDir::new().unwrap();
597        let config = CacheConfig {
598            cache_dir: temp_dir.path().to_path_buf(),
599            max_contacts: 100,
600            ..CacheConfig::default()
601        };
602        
603        // Create cache and add contact
604        {
605            let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config.clone()).await.unwrap();
606            let contact = ContactEntry::new(
607                PeerId::from("test-peer"),
608                vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
609            );
610            cache.add_contact(contact).await.unwrap();
611            cache.save_to_disk().await.unwrap();
612        }
613        
614        // Create new cache and verify contact is loaded
615        {
616            let cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
617            let bootstrap_peers = cache.get_bootstrap_peers(10).await.unwrap();
618            assert_eq!(bootstrap_peers.len(), 1);
619        }
620    }
621    
622    #[tokio::test]
623    async fn test_cache_eviction() {
624        let temp_dir = TempDir::new().unwrap();
625        let config = CacheConfig {
626            cache_dir: temp_dir.path().to_path_buf(),
627            max_contacts: 5,
628            ..CacheConfig::default()
629        };
630        
631        let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
632        
633        // Add contacts exceeding the limit
634        for i in 0..10 {
635            let contact = ContactEntry::new(
636                PeerId::from(format!("test-peer-{}", i)),
637                vec![format!("/ip4/127.0.0.1/tcp/{}", 9000 + i)]
638            );
639            cache.add_contact(contact).await.unwrap();
640        }
641        
642        let stats = cache.get_stats().await.unwrap();
643        assert!(stats.total_contacts <= 5);
644    }
645}