1use crate::{PeerId, Result, P2PError};
7use crate::bootstrap::{ContactEntry, QualityMetrics, QualityCalculator, CacheStats};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::time::{Duration, SystemTime};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use serde::{Deserialize, Serialize};
14use tracing::{debug, info, warn, error};
15
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
18pub struct CacheConfig {
19 pub cache_dir: PathBuf,
21 pub max_contacts: usize,
23 pub merge_interval: Duration,
25 pub cleanup_interval: Duration,
27 pub quality_update_interval: Duration,
29 pub stale_threshold: Duration,
31 pub connectivity_check_interval: Duration,
33 pub connectivity_check_count: usize,
35}
36
37impl Default for CacheConfig {
38 fn default() -> Self {
39 Self {
40 cache_dir: PathBuf::from(".cache/p2p_foundation"),
41 max_contacts: crate::bootstrap::DEFAULT_MAX_CONTACTS,
42 merge_interval: crate::bootstrap::DEFAULT_MERGE_INTERVAL,
43 cleanup_interval: crate::bootstrap::DEFAULT_CLEANUP_INTERVAL,
44 quality_update_interval: crate::bootstrap::DEFAULT_QUALITY_UPDATE_INTERVAL,
45 stale_threshold: Duration::from_secs(86400 * 7), connectivity_check_interval: Duration::from_secs(900), connectivity_check_count: 100, }
49 }
50}
51
52#[derive(Debug, thiserror::Error)]
54pub enum CacheError {
55 #[error("I/O error: {0}")]
57 Io(#[from] std::io::Error),
58
59 #[error("Serialization error: {0}")]
61 Serialization(#[from] serde_json::Error),
62
63 #[error("Lock error: {0}")]
65 Lock(String),
66
67 #[error("Cache corruption: {0}")]
69 Corruption(String),
70
71 #[error("Configuration error: {0}")]
73 Configuration(String),
74}
75
76#[derive(Clone)]
78pub struct BootstrapCache {
79 config: CacheConfig,
80 contacts: Arc<RwLock<HashMap<PeerId, ContactEntry>>>,
81 instance_id: String,
82 cache_file: PathBuf,
83 instance_cache_file: PathBuf,
84 lock_file: PathBuf,
85 metadata_file: PathBuf,
86 quality_calculator: QualityCalculator,
87 stats: Arc<RwLock<CacheStats>>,
88}
89
90#[derive(Debug, Serialize, Deserialize)]
92struct CacheData {
93 version: u32,
94 instance_id: String,
95 timestamp: chrono::DateTime<chrono::Utc>,
96 contacts: HashMap<PeerId, ContactEntry>,
97 checksum: u64,
98}
99
100#[derive(Debug, Serialize, Deserialize)]
102struct CacheMetadata {
103 last_merge: chrono::DateTime<chrono::Utc>,
104 last_cleanup: chrono::DateTime<chrono::Utc>,
105 last_quality_update: chrono::DateTime<chrono::Utc>,
106 total_merges: u64,
107 total_cleanups: u64,
108 corruption_count: u64,
109 instance_count: u64,
110}
111
112impl BootstrapCache {
113 pub async fn new(cache_dir: PathBuf, config: CacheConfig) -> Result<Self> {
115 std::fs::create_dir_all(&cache_dir)
117 .map_err(|e| P2PError::Bootstrap(format!("Failed to create cache directory: {}", e)))?;
118
119 let instance_id = generate_instance_id();
120
121 let cache_file = cache_dir.join("bootstrap_cache.json");
122 let instance_cache_file = cache_dir.join("instance_caches").join(format!("{}.cache", instance_id));
123 let lock_file = cache_dir.join("bootstrap_cache.lock");
124 let metadata_file = cache_dir.join("metadata.json");
125
126 std::fs::create_dir_all(instance_cache_file.parent().unwrap())
128 .map_err(|e| P2PError::Bootstrap(format!("Failed to create instance cache directory: {}", e)))?;
129
130 let mut cache = Self {
131 config: config.clone(),
132 contacts: Arc::new(RwLock::new(HashMap::new())),
133 instance_id,
134 cache_file,
135 instance_cache_file,
136 lock_file,
137 metadata_file,
138 quality_calculator: QualityCalculator::new(),
139 stats: Arc::new(RwLock::new(CacheStats::default())),
140 };
141
142 cache.load_from_disk().await?;
144
145 info!("Bootstrap cache initialized with {} contacts", cache.contacts.read().await.len());
146
147 Ok(cache)
148 }
149
150 pub async fn get_bootstrap_peers(&self, count: usize) -> Result<Vec<ContactEntry>> {
152 let contacts = self.contacts.read().await;
153
154 let mut sorted_contacts: Vec<&ContactEntry> = contacts.values().collect();
155
156 sorted_contacts.sort_by(|a, b| {
158 b.quality_metrics.quality_score
159 .partial_cmp(&a.quality_metrics.quality_score)
160 .unwrap_or(std::cmp::Ordering::Equal)
161 });
162
163 let selected: Vec<ContactEntry> = sorted_contacts
164 .into_iter()
165 .take(count)
166 .cloned()
167 .collect();
168
169 {
171 let mut stats = self.stats.write().await;
172 stats.cache_hit_rate = if !contacts.is_empty() {
173 selected.len() as f64 / count.min(contacts.len()) as f64
174 } else {
175 0.0
176 };
177 }
178
179 debug!("Selected {} bootstrap peers from {} available contacts", selected.len(), contacts.len());
180
181 Ok(selected)
182 }
183
184 pub async fn add_contact(&mut self, contact: ContactEntry) -> Result<()> {
186 let mut contacts = self.contacts.write().await;
187
188 if contacts.len() >= self.config.max_contacts && !contacts.contains_key(&contact.peer_id) {
190 self.evict_lowest_quality_contacts(&mut contacts).await?;
191 }
192
193 contacts.insert(contact.peer_id.clone(), contact.clone());
194 drop(contacts);
195
196 self.save_to_instance_cache().await?;
198
199 debug!("Added contact: {}", contact.summary());
200
201 Ok(())
202 }
203
204 pub async fn update_contact_metrics(&mut self, peer_id: &PeerId, metrics: QualityMetrics) -> Result<()> {
206 let mut contacts = self.contacts.write().await;
207
208 if let Some(contact) = contacts.get_mut(peer_id) {
209 contact.quality_metrics = metrics;
210 contact.recalculate_quality_score();
211
212 debug!("Updated metrics for peer {}: {}", peer_id, contact.summary());
213 }
214
215 Ok(())
216 }
217
218 pub async fn update_quality_scores(&self) -> Result<()> {
220 let mut contacts = self.contacts.write().await;
221 let mut updated_count = 0;
222
223 for contact in contacts.values_mut() {
224 let old_score = contact.quality_metrics.quality_score;
225
226 let age_seconds = contact.age_seconds() as f64;
228 let decay_factor = (-age_seconds / 86400.0).exp(); contact.quality_metrics.apply_age_decay(decay_factor);
230
231 contact.recalculate_quality_score();
233
234 if (contact.quality_metrics.quality_score - old_score).abs() > 0.01 {
235 updated_count += 1;
236 }
237 }
238
239 self.update_metadata(|meta| {
241 meta.last_quality_update = chrono::Utc::now();
242 }).await?;
243
244 debug!("Updated quality scores for {} contacts", updated_count);
245
246 Ok(())
247 }
248
249 pub async fn cleanup_stale_entries(&self) -> Result<()> {
251 let mut contacts = self.contacts.write().await;
252 let initial_count = contacts.len();
253
254 contacts.retain(|_peer_id, contact| {
256 !contact.is_stale(self.config.stale_threshold)
257 });
258
259 let removed_count = initial_count - contacts.len();
260
261 if removed_count > 0 {
262 info!("Cleaned up {} stale contacts", removed_count);
263
264 drop(contacts);
266 self.save_to_disk().await?;
267 }
268
269 self.update_metadata(|meta| {
271 meta.last_cleanup = chrono::Utc::now();
272 meta.total_cleanups += 1;
273 }).await?;
274
275 Ok(())
276 }
277
278 pub async fn get_all_contacts(&self) -> HashMap<PeerId, ContactEntry> {
280 self.contacts.read().await.clone()
281 }
282
283 pub async fn set_all_contacts(&self, contacts: HashMap<PeerId, ContactEntry>) {
285 let mut current_contacts = self.contacts.write().await;
286 *current_contacts = contacts;
287 }
288
289 pub async fn get_stats(&self) -> Result<CacheStats> {
291 let contacts = self.contacts.read().await;
292 let mut stats = self.stats.write().await;
293
294 stats.total_contacts = contacts.len();
295 stats.high_quality_contacts = contacts.values()
296 .filter(|c| c.quality_metrics.quality_score > 0.7)
297 .count();
298 stats.verified_contacts = contacts.values()
299 .filter(|c| c.ipv6_identity_verified)
300 .count();
301
302 if !contacts.is_empty() {
303 stats.average_quality_score = contacts.values()
304 .map(|c| c.quality_metrics.quality_score)
305 .sum::<f64>() / contacts.len() as f64;
306 }
307
308 Ok(stats.clone())
309 }
310
311 async fn load_from_disk(&mut self) -> Result<()> {
313 if !self.cache_file.exists() {
314 debug!("No existing cache file found, starting with empty cache");
315 return Ok(());
316 }
317
318 let _lock = self.acquire_file_lock().await?;
319
320 match self.load_cache_data().await {
321 Ok(cache_data) => {
322 if self.verify_cache_integrity(&cache_data) {
323 let mut contacts = self.contacts.write().await;
324 *contacts = cache_data.contacts;
325 info!("Loaded {} contacts from cache", contacts.len());
326 } else {
327 warn!("Cache integrity check failed, starting with empty cache");
328 self.handle_cache_corruption().await?;
329 }
330 }
331 Err(e) => {
332 warn!("Failed to load cache: {}, starting with empty cache", e);
333 self.handle_cache_corruption().await?;
334 }
335 }
336
337 Ok(())
338 }
339
340 pub async fn save_to_disk(&self) -> Result<()> {
342 let _lock = self.acquire_file_lock().await?;
343
344 let contacts = self.contacts.read().await;
345 let cache_data = CacheData {
346 version: 1,
347 instance_id: self.instance_id.clone(),
348 timestamp: chrono::Utc::now(),
349 contacts: contacts.clone(),
350 checksum: self.calculate_checksum(&contacts),
351 };
352
353 let temp_file = self.cache_file.with_extension("tmp");
355 let json_data = serde_json::to_string_pretty(&cache_data)
356 .map_err(|e| P2PError::Bootstrap(format!("Failed to serialize cache: {}", e)))?;
357
358 std::fs::write(&temp_file, json_data)
359 .map_err(|e| P2PError::Bootstrap(format!("Failed to write cache file: {}", e)))?;
360
361 std::fs::rename(temp_file, &self.cache_file)
363 .map_err(|e| P2PError::Bootstrap(format!("Failed to rename cache file: {}", e)))?;
364
365 debug!("Saved {} contacts to cache", contacts.len());
366
367 Ok(())
368 }
369
370 async fn save_to_instance_cache(&self) -> Result<()> {
372 let contacts = self.contacts.read().await;
373 let cache_data = CacheData {
374 version: 1,
375 instance_id: self.instance_id.clone(),
376 timestamp: chrono::Utc::now(),
377 contacts: contacts.clone(),
378 checksum: self.calculate_checksum(&contacts),
379 };
380
381 let json_data = serde_json::to_string(&cache_data)
382 .map_err(|e| P2PError::Bootstrap(format!("Failed to serialize instance cache: {}", e)))?;
383
384 std::fs::write(&self.instance_cache_file, json_data)
385 .map_err(|e| P2PError::Bootstrap(format!("Failed to write instance cache: {}", e)))?;
386
387 Ok(())
388 }
389
390 async fn acquire_file_lock(&self) -> Result<FileLock> {
392 FileLock::acquire(&self.lock_file).await
393 }
394
395 async fn load_cache_data(&self) -> Result<CacheData> {
397 let json_data = std::fs::read_to_string(&self.cache_file)
398 .map_err(|e| P2PError::Bootstrap(format!("Failed to read cache file: {}", e)))?;
399
400 let cache_data: CacheData = serde_json::from_str(&json_data)
401 .map_err(|e| P2PError::Bootstrap(format!("Failed to parse cache file: {}", e)))?;
402
403 Ok(cache_data)
404 }
405
406 fn verify_cache_integrity(&self, cache_data: &CacheData) -> bool {
408 let calculated_checksum = self.calculate_checksum(&cache_data.contacts);
409 cache_data.checksum == calculated_checksum
410 }
411
412 fn calculate_checksum(&self, contacts: &HashMap<PeerId, ContactEntry>) -> u64 {
414 use std::collections::hash_map::DefaultHasher;
415 use std::hash::{Hash, Hasher};
416
417 let mut hasher = DefaultHasher::new();
418
419 let mut sorted_contacts: Vec<_> = contacts.iter().collect();
421 sorted_contacts.sort_by_key(|(peer_id, _)| *peer_id);
422
423 for (peer_id, contact) in sorted_contacts {
424 peer_id.hash(&mut hasher);
425 contact.quality_metrics.success_rate.to_bits().hash(&mut hasher);
426 contact.addresses.len().hash(&mut hasher);
427 }
428
429 hasher.finish()
430 }
431
432 async fn handle_cache_corruption(&self) -> Result<()> {
434 warn!("Handling cache corruption, backing up corrupted file");
435
436 if self.cache_file.exists() {
437 let backup_file = self.cache_file.with_extension("corrupted");
438 if let Err(e) = std::fs::rename(&self.cache_file, backup_file) {
439 error!("Failed to backup corrupted cache: {}", e);
440 }
441 }
442
443 self.update_metadata(|meta| {
445 meta.corruption_count += 1;
446 }).await?;
447
448 Ok(())
449 }
450
451 async fn evict_lowest_quality_contacts(&self, contacts: &mut HashMap<PeerId, ContactEntry>) -> Result<()> {
453 let eviction_count = (self.config.max_contacts / 10).max(1); let mut sorted_contacts: Vec<_> = contacts.iter().collect();
456 sorted_contacts.sort_by(|a, b| {
457 a.1.quality_metrics.quality_score
458 .partial_cmp(&b.1.quality_metrics.quality_score)
459 .unwrap_or(std::cmp::Ordering::Equal)
460 });
461
462 let to_evict: Vec<PeerId> = sorted_contacts
463 .into_iter()
464 .take(eviction_count)
465 .map(|(peer_id, _)| peer_id.clone())
466 .collect();
467
468 for peer_id in to_evict {
469 contacts.remove(&peer_id);
470 }
471
472 debug!("Evicted {} lowest quality contacts", eviction_count);
473
474 Ok(())
475 }
476
477 async fn update_metadata<F>(&self, updater: F) -> Result<()>
479 where
480 F: FnOnce(&mut CacheMetadata),
481 {
482 let mut metadata = if self.metadata_file.exists() {
483 let json_data = std::fs::read_to_string(&self.metadata_file)?;
484 serde_json::from_str(&json_data).unwrap_or_default()
485 } else {
486 CacheMetadata::default()
487 };
488
489 updater(&mut metadata);
490
491 let json_data = serde_json::to_string_pretty(&metadata)?;
492 std::fs::write(&self.metadata_file, json_data)?;
493
494 Ok(())
495 }
496}
497
498impl Default for CacheStats {
499 fn default() -> Self {
500 Self {
501 total_contacts: 0,
502 high_quality_contacts: 0,
503 verified_contacts: 0,
504 last_merge: chrono::Utc::now(),
505 last_cleanup: chrono::Utc::now(),
506 cache_hit_rate: 0.0,
507 average_quality_score: 0.0,
508 }
509 }
510}
511
512impl Default for CacheMetadata {
513 fn default() -> Self {
514 let now = chrono::Utc::now();
515 Self {
516 last_merge: now,
517 last_cleanup: now,
518 last_quality_update: now,
519 total_merges: 0,
520 total_cleanups: 0,
521 corruption_count: 0,
522 instance_count: 0,
523 }
524 }
525}
526
527struct FileLock {
529 _file: std::fs::File,
530}
531
532impl FileLock {
533 async fn acquire(lock_file: &PathBuf) -> Result<Self> {
534 use std::fs::OpenOptions;
535
536 let file = OpenOptions::new()
537 .create(true)
538 .write(true)
539 .open(lock_file)
540 .map_err(|e| P2PError::Bootstrap(format!("Failed to create lock file: {}", e)))?;
541
542 Ok(Self { _file: file })
546 }
547}
548
549fn generate_instance_id() -> String {
551 format!("{}_{}", std::process::id(), SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis())
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557 use tempfile::TempDir;
558
559 #[tokio::test]
560 async fn test_cache_creation() {
561 let temp_dir = TempDir::new().unwrap();
562 let config = CacheConfig {
563 cache_dir: temp_dir.path().to_path_buf(),
564 max_contacts: 100,
565 ..CacheConfig::default()
566 };
567
568 let cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await;
569 assert!(cache.is_ok());
570 }
571
572 #[tokio::test]
573 async fn test_add_and_retrieve_contacts() {
574 let temp_dir = TempDir::new().unwrap();
575 let config = CacheConfig {
576 cache_dir: temp_dir.path().to_path_buf(),
577 max_contacts: 100,
578 ..CacheConfig::default()
579 };
580
581 let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
582
583 let contact = ContactEntry::new(
584 PeerId::from("test-peer"),
585 vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
586 );
587
588 cache.add_contact(contact).await.unwrap();
589
590 let bootstrap_peers = cache.get_bootstrap_peers(10).await.unwrap();
591 assert_eq!(bootstrap_peers.len(), 1);
592 }
593
594 #[tokio::test]
595 async fn test_cache_persistence() {
596 let temp_dir = TempDir::new().unwrap();
597 let config = CacheConfig {
598 cache_dir: temp_dir.path().to_path_buf(),
599 max_contacts: 100,
600 ..CacheConfig::default()
601 };
602
603 {
605 let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config.clone()).await.unwrap();
606 let contact = ContactEntry::new(
607 PeerId::from("test-peer"),
608 vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
609 );
610 cache.add_contact(contact).await.unwrap();
611 cache.save_to_disk().await.unwrap();
612 }
613
614 {
616 let cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
617 let bootstrap_peers = cache.get_bootstrap_peers(10).await.unwrap();
618 assert_eq!(bootstrap_peers.len(), 1);
619 }
620 }
621
622 #[tokio::test]
623 async fn test_cache_eviction() {
624 let temp_dir = TempDir::new().unwrap();
625 let config = CacheConfig {
626 cache_dir: temp_dir.path().to_path_buf(),
627 max_contacts: 5,
628 ..CacheConfig::default()
629 };
630
631 let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
632
633 for i in 0..10 {
635 let contact = ContactEntry::new(
636 PeerId::from(format!("test-peer-{}", i)),
637 vec![format!("/ip4/127.0.0.1/tcp/{}", 9000 + i)]
638 );
639 cache.add_contact(contact).await.unwrap();
640 }
641
642 let stats = cache.get_stats().await.unwrap();
643 assert!(stats.total_contacts <= 5);
644 }
645}