1use crate::bootstrap::{CacheStats, ContactEntry, QualityCalculator, QualityMetrics};
20use crate::error::BootstrapError;
21use crate::{P2PError, PeerId, Result};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::net::SocketAddr;
25use std::path::PathBuf;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28use tokio::sync::RwLock;
29use tracing::{debug, error, info, warn};
30
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
33pub struct CacheConfig {
34 pub cache_dir: PathBuf,
36 pub max_contacts: usize,
38 pub merge_interval: Duration,
40 pub cleanup_interval: Duration,
42 pub quality_update_interval: Duration,
44 pub stale_threshold: Duration,
46 pub connectivity_check_interval: Duration,
48 pub connectivity_check_count: usize,
50}
51
52impl Default for CacheConfig {
53 fn default() -> Self {
54 Self {
55 cache_dir: PathBuf::from(".cache/p2p_foundation"),
56 max_contacts: crate::bootstrap::DEFAULT_MAX_CONTACTS,
57 merge_interval: crate::bootstrap::DEFAULT_MERGE_INTERVAL,
58 cleanup_interval: crate::bootstrap::DEFAULT_CLEANUP_INTERVAL,
59 quality_update_interval: crate::bootstrap::DEFAULT_QUALITY_UPDATE_INTERVAL,
60 stale_threshold: Duration::from_secs(86400 * 7), connectivity_check_interval: Duration::from_secs(900), connectivity_check_count: 100, }
64 }
65}
66
67#[derive(Debug, thiserror::Error)]
69pub enum CacheError {
70 #[error("I/O error: {0}")]
72 Io(#[from] std::io::Error),
73
74 #[error("Serialization error: {0}")]
76 Serialization(#[from] serde_json::Error),
77
78 #[error("Lock error: {0}")]
80 Lock(String),
81
82 #[error("Cache corruption: {0}")]
84 Corruption(String),
85
86 #[error("Configuration error: {0}")]
88 Configuration(String),
89}
90
91#[derive(Clone)]
93pub struct BootstrapCache {
94 config: CacheConfig,
95 contacts: Arc<RwLock<HashMap<PeerId, ContactEntry>>>,
96 instance_id: String,
97 cache_file: PathBuf,
98 instance_cache_file: PathBuf,
99 lock_file: PathBuf,
100 metadata_file: PathBuf,
101 _quality_calculator: QualityCalculator,
102 stats: Arc<RwLock<CacheStats>>,
103}
104
105#[derive(Debug, Serialize, Deserialize)]
107struct CacheData {
108 version: u32,
109 instance_id: String,
110 timestamp: chrono::DateTime<chrono::Utc>,
111 contacts: HashMap<PeerId, ContactEntry>,
112 checksum: u64,
113}
114
115#[derive(Debug, Serialize, Deserialize)]
117struct CacheMetadata {
118 last_merge: chrono::DateTime<chrono::Utc>,
119 last_cleanup: chrono::DateTime<chrono::Utc>,
120 last_quality_update: chrono::DateTime<chrono::Utc>,
121 total_merges: u64,
122 total_cleanups: u64,
123 corruption_count: u64,
124 instance_count: u64,
125}
126
127impl BootstrapCache {
128 pub async fn new(cache_dir: PathBuf, config: CacheConfig) -> Result<Self> {
130 std::fs::create_dir_all(&cache_dir).map_err(|e| {
132 P2PError::Bootstrap(BootstrapError::CacheError(
133 format!("Failed to create cache directory: {e}").into(),
134 ))
135 })?;
136
137 let instance_id = generate_instance_id();
138
139 let cache_file = cache_dir.join("bootstrap_cache.json");
140 let instance_cache_file = cache_dir
141 .join("instance_caches")
142 .join(format!("{instance_id}.cache"));
143 let lock_file = cache_dir.join("bootstrap_cache.lock");
144 let metadata_file = cache_dir.join("metadata.json");
145
146 if let Some(parent) = instance_cache_file.parent() {
148 std::fs::create_dir_all(parent).map_err(|e| {
149 P2PError::Bootstrap(BootstrapError::CacheError(
150 format!("Failed to create instance cache directory: {e}").into(),
151 ))
152 })?;
153 } else {
154 return Err(P2PError::Bootstrap(BootstrapError::CacheError(
155 "Cache file has no parent directory".to_string().into(),
156 )));
157 }
158
159 let mut cache = Self {
160 config: config.clone(),
161 contacts: Arc::new(RwLock::new(HashMap::new())),
162 instance_id,
163 cache_file,
164 instance_cache_file,
165 lock_file,
166 metadata_file,
167 _quality_calculator: QualityCalculator::new(),
168 stats: Arc::new(RwLock::new(CacheStats::default())),
169 };
170
171 cache.load_from_disk().await?;
173
174 info!(
175 "Bootstrap cache initialized with {} contacts",
176 cache.contacts.read().await.len()
177 );
178
179 Ok(cache)
180 }
181
182 pub async fn get_bootstrap_peers(&self, count: usize) -> Result<Vec<ContactEntry>> {
184 let contacts = self.contacts.read().await;
185
186 let mut sorted_contacts: Vec<&ContactEntry> = contacts.values().collect();
187
188 sorted_contacts.sort_by(|a, b| {
190 b.quality_metrics
191 .quality_score
192 .partial_cmp(&a.quality_metrics.quality_score)
193 .unwrap_or(std::cmp::Ordering::Equal)
194 });
195
196 let selected: Vec<ContactEntry> =
197 sorted_contacts.into_iter().take(count).cloned().collect();
198
199 {
201 let mut stats = self.stats.write().await;
202 stats.cache_hit_rate = if !contacts.is_empty() {
203 selected.len() as f64 / count.min(contacts.len()) as f64
204 } else {
205 0.0
206 };
207 }
208
209 debug!(
210 "Selected {} bootstrap peers from {} available contacts",
211 selected.len(),
212 contacts.len()
213 );
214
215 Ok(selected)
216 }
217
218 pub async fn get_quic_bootstrap_peers(&self, count: usize) -> Result<Vec<ContactEntry>> {
220 let contacts = self.contacts.read().await;
221
222 let mut quic_contacts: Vec<&ContactEntry> = contacts
224 .values()
225 .filter(|contact| contact.quic_contact.is_some())
226 .collect();
227
228 quic_contacts.sort_by(|a, b| {
230 let score_a = a.quality_metrics.quality_score + a.quic_quality_score() * 0.3;
231 let score_b = b.quality_metrics.quality_score + b.quic_quality_score() * 0.3;
232 score_b
233 .partial_cmp(&score_a)
234 .unwrap_or(std::cmp::Ordering::Equal)
235 });
236
237 let selected: Vec<ContactEntry> = quic_contacts.into_iter().take(count).cloned().collect();
238
239 debug!(
240 "Selected {} QUIC bootstrap peers from {} available QUIC contacts",
241 selected.len(),
242 contacts
243 .values()
244 .filter(|c| c.quic_contact.is_some())
245 .count()
246 );
247
248 Ok(selected)
249 }
250
251 pub async fn get_contact_by_addresses(
253 &self,
254 target_addresses: &[SocketAddr],
255 ) -> Option<ContactEntry> {
256 let contacts = self.contacts.read().await;
257 contacts
258 .values()
259 .find(|contact| {
260 if let Some(quic_addrs) = contact.quic_direct_addresses() {
261 quic_addrs
262 .iter()
263 .any(|addr| target_addresses.contains(addr))
264 } else {
265 false
266 }
267 })
268 .cloned()
269 }
270
271 pub async fn update_quic_metrics(
273 &mut self,
274 peer_id: &PeerId,
275 connection_type: crate::bootstrap::contact::QuicConnectionType,
276 success: bool,
277 setup_time_ms: Option<u64>,
278 ) -> Result<()> {
279 let mut contacts = self.contacts.write().await;
280
281 if let Some(contact) = contacts.get_mut(peer_id) {
282 contact.update_quic_connection_result(connection_type, success, setup_time_ms);
283
284 debug!(
285 "Updated QUIC metrics for peer {}: {}",
286 peer_id,
287 contact.summary()
288 );
289 }
290
291 Ok(())
292 }
293
294 pub async fn add_contact(&mut self, contact: ContactEntry) -> Result<()> {
296 let mut contacts = self.contacts.write().await;
297
298 if contacts.len() >= self.config.max_contacts && !contacts.contains_key(&contact.peer_id) {
300 self.evict_lowest_quality_contacts(&mut contacts).await?;
301 }
302
303 contacts.insert(contact.peer_id.clone(), contact.clone());
304 drop(contacts);
305
306 self.save_to_instance_cache().await?;
308
309 debug!("Added contact: {}", contact.summary());
310
311 Ok(())
312 }
313
314 pub async fn update_contact_metrics(
316 &mut self,
317 peer_id: &PeerId,
318 metrics: QualityMetrics,
319 ) -> Result<()> {
320 let mut contacts = self.contacts.write().await;
321
322 if let Some(contact) = contacts.get_mut(peer_id) {
323 contact.quality_metrics = metrics;
324 contact.recalculate_quality_score();
325
326 debug!(
327 "Updated metrics for peer {}: {}",
328 peer_id,
329 contact.summary()
330 );
331 }
332
333 Ok(())
334 }
335
336 pub async fn update_quality_scores(&self) -> Result<()> {
338 let mut contacts = self.contacts.write().await;
339 let mut updated_count = 0;
340
341 for contact in contacts.values_mut() {
342 let old_score = contact.quality_metrics.quality_score;
343
344 let age_seconds = contact.age_seconds() as f64;
346 let decay_factor = (-age_seconds / 86400.0).exp(); contact.quality_metrics.apply_age_decay(decay_factor);
348
349 contact.recalculate_quality_score();
351
352 if (contact.quality_metrics.quality_score - old_score).abs() > 0.01 {
353 updated_count += 1;
354 }
355 }
356
357 self.update_metadata(|meta| {
359 meta.last_quality_update = chrono::Utc::now();
360 })
361 .await?;
362
363 debug!("Updated quality scores for {} contacts", updated_count);
364
365 Ok(())
366 }
367
368 pub async fn cleanup_stale_entries(&self) -> Result<()> {
370 let mut contacts = self.contacts.write().await;
371 let initial_count = contacts.len();
372
373 contacts.retain(|_peer_id, contact| !contact.is_stale(self.config.stale_threshold));
375
376 let removed_count = initial_count - contacts.len();
377
378 if removed_count > 0 {
379 info!("Cleaned up {} stale contacts", removed_count);
380
381 drop(contacts);
383 self.save_to_disk().await?;
384 }
385
386 self.update_metadata(|meta| {
388 meta.last_cleanup = chrono::Utc::now();
389 meta.total_cleanups += 1;
390 })
391 .await?;
392
393 Ok(())
394 }
395
396 pub async fn get_all_contacts(&self) -> HashMap<PeerId, ContactEntry> {
398 self.contacts.read().await.clone()
399 }
400
401 pub async fn set_all_contacts(&self, contacts: HashMap<PeerId, ContactEntry>) {
403 let mut current_contacts = self.contacts.write().await;
404 *current_contacts = contacts;
405 }
406
407 pub async fn get_stats(&self) -> Result<CacheStats> {
409 let contacts = self.contacts.read().await;
410 let mut stats = self.stats.write().await;
411
412 stats.total_contacts = contacts.len();
413 stats.high_quality_contacts = contacts
414 .values()
415 .filter(|c| c.quality_metrics.quality_score > 0.7)
416 .count();
417 stats.verified_contacts = contacts
418 .values()
419 .filter(|c| c.ipv6_identity_verified)
420 .count();
421
422 stats.iroh_contacts = contacts
424 .values()
425 .filter(|c| c.quic_contact.is_some())
426 .count();
427 stats.nat_traversal_contacts = 0; let quic_setup_times: Vec<f64> = contacts
431 .values()
432 .filter_map(|c| c.quic_contact.as_ref())
433 .filter(|quic| quic.quic_quality.avg_connection_setup_time_ms > 0.0)
434 .map(|quic| quic.quic_quality.avg_connection_setup_time_ms)
435 .collect();
436 stats.avg_iroh_setup_time_ms = if !quic_setup_times.is_empty() {
437 quic_setup_times.iter().sum::<f64>() / quic_setup_times.len() as f64
438 } else {
439 0.0
440 };
441
442 let mut connection_type_counts = std::collections::HashMap::new();
444 for contact in contacts.values() {
445 if let Some(ref quic) = contact.quic_contact {
446 for conn_type in &quic.successful_connection_types {
447 *connection_type_counts
448 .entry(format!("{conn_type:?}"))
449 .or_insert(0) += 1;
450 }
451 }
452 }
453 stats.preferred_iroh_connection_type = connection_type_counts
454 .into_iter()
455 .max_by_key(|(_, count)| *count)
456 .map(|(conn_type, _)| conn_type);
457
458 if !contacts.is_empty() {
459 stats.average_quality_score = contacts
460 .values()
461 .map(|c| c.quality_metrics.quality_score)
462 .sum::<f64>()
463 / contacts.len() as f64;
464 }
465
466 Ok(stats.clone())
467 }
468
469 async fn load_from_disk(&mut self) -> Result<()> {
471 if !self.cache_file.exists() {
472 debug!("No existing cache file found, starting with empty cache");
473 return Ok(());
474 }
475
476 let _lock = self.acquire_file_lock().await?;
477
478 match self.load_cache_data().await {
479 Ok(cache_data) => {
480 if self.verify_cache_integrity(&cache_data) {
481 let mut contacts = self.contacts.write().await;
482 *contacts = cache_data.contacts;
483 info!("Loaded {} contacts from cache", contacts.len());
484 } else {
485 warn!("Cache integrity check failed, starting with empty cache");
486 self.handle_cache_corruption().await?;
487 }
488 }
489 Err(e) => {
490 warn!("Failed to load cache: {}, starting with empty cache", e);
491 self.handle_cache_corruption().await?;
492 }
493 }
494
495 Ok(())
496 }
497
498 pub async fn save_to_disk(&self) -> Result<()> {
500 let _lock = self.acquire_file_lock().await?;
501
502 let contacts = self.contacts.read().await;
503 let cache_data = CacheData {
504 version: 1,
505 instance_id: self.instance_id.clone(),
506 timestamp: chrono::Utc::now(),
507 contacts: contacts.clone(),
508 checksum: self.calculate_checksum(&contacts),
509 };
510
511 let temp_file = self.cache_file.with_extension("tmp");
513 let json_data = serde_json::to_string_pretty(&cache_data).map_err(|e| {
514 P2PError::Bootstrap(BootstrapError::CacheError(
515 format!("Failed to serialize cache: {e}").into(),
516 ))
517 })?;
518
519 std::fs::write(&temp_file, json_data).map_err(|e| {
520 P2PError::Bootstrap(BootstrapError::CacheError(
521 format!("Failed to write cache file: {e}").into(),
522 ))
523 })?;
524
525 std::fs::rename(temp_file, &self.cache_file).map_err(|e| {
527 P2PError::Bootstrap(BootstrapError::CacheError(
528 format!("Failed to rename cache file: {e}").into(),
529 ))
530 })?;
531
532 debug!("Saved {} contacts to cache", contacts.len());
533
534 Ok(())
535 }
536
537 async fn save_to_instance_cache(&self) -> Result<()> {
539 let contacts = self.contacts.read().await;
540 let cache_data = CacheData {
541 version: 1,
542 instance_id: self.instance_id.clone(),
543 timestamp: chrono::Utc::now(),
544 contacts: contacts.clone(),
545 checksum: self.calculate_checksum(&contacts),
546 };
547
548 let json_data = serde_json::to_string(&cache_data).map_err(|e| {
549 P2PError::Bootstrap(BootstrapError::CacheError(
550 format!("Failed to serialize instance cache: {e}").into(),
551 ))
552 })?;
553
554 std::fs::write(&self.instance_cache_file, json_data).map_err(|e| {
555 P2PError::Bootstrap(BootstrapError::CacheError(
556 format!("Failed to write instance cache: {e}").into(),
557 ))
558 })?;
559
560 Ok(())
561 }
562
563 async fn acquire_file_lock(&self) -> Result<FileLock> {
565 FileLock::acquire(&self.lock_file).await
566 }
567
568 async fn load_cache_data(&self) -> Result<CacheData> {
570 let json_data = std::fs::read_to_string(&self.cache_file).map_err(|e| {
571 P2PError::Bootstrap(BootstrapError::CacheError(
572 format!("Failed to read cache file: {e}").into(),
573 ))
574 })?;
575
576 let cache_data: CacheData = serde_json::from_str(&json_data).map_err(|e| {
577 P2PError::Bootstrap(BootstrapError::InvalidData(
578 format!("Failed to parse cache file: {e}").into(),
579 ))
580 })?;
581
582 Ok(cache_data)
583 }
584
585 fn verify_cache_integrity(&self, cache_data: &CacheData) -> bool {
587 let calculated_checksum = self.calculate_checksum(&cache_data.contacts);
588 cache_data.checksum == calculated_checksum
589 }
590
591 fn calculate_checksum(&self, contacts: &HashMap<PeerId, ContactEntry>) -> u64 {
593 use std::collections::hash_map::DefaultHasher;
594 use std::hash::{Hash, Hasher};
595
596 let mut hasher = DefaultHasher::new();
597
598 let mut sorted_contacts: Vec<_> = contacts.iter().collect();
600 sorted_contacts.sort_by_key(|(peer_id, _)| *peer_id);
601
602 for (peer_id, contact) in sorted_contacts {
603 peer_id.hash(&mut hasher);
604 contact
605 .quality_metrics
606 .success_rate
607 .to_bits()
608 .hash(&mut hasher);
609 contact.addresses.len().hash(&mut hasher);
610 }
611
612 hasher.finish()
613 }
614
615 async fn handle_cache_corruption(&self) -> Result<()> {
617 warn!("Handling cache corruption, backing up corrupted file");
618
619 if self.cache_file.exists() {
620 let backup_file = self.cache_file.with_extension("corrupted");
621 if let Err(e) = std::fs::rename(&self.cache_file, backup_file) {
622 error!("Failed to backup corrupted cache: {}", e);
623 }
624 }
625
626 self.update_metadata(|meta| {
628 meta.corruption_count += 1;
629 })
630 .await?;
631
632 Ok(())
633 }
634
635 async fn evict_lowest_quality_contacts(
637 &self,
638 contacts: &mut HashMap<PeerId, ContactEntry>,
639 ) -> Result<()> {
640 let eviction_count = (self.config.max_contacts / 10).max(1); let mut sorted_contacts: Vec<_> = contacts.iter().collect();
643 sorted_contacts.sort_by(|a, b| {
644 a.1.quality_metrics
645 .quality_score
646 .partial_cmp(&b.1.quality_metrics.quality_score)
647 .unwrap_or(std::cmp::Ordering::Equal)
648 });
649
650 let to_evict: Vec<PeerId> = sorted_contacts
651 .into_iter()
652 .take(eviction_count)
653 .map(|(peer_id, _)| peer_id.clone())
654 .collect();
655
656 for peer_id in to_evict {
657 contacts.remove(&peer_id);
658 }
659
660 debug!("Evicted {} lowest quality contacts", eviction_count);
661
662 Ok(())
663 }
664
665 async fn update_metadata<F>(&self, updater: F) -> Result<()>
667 where
668 F: FnOnce(&mut CacheMetadata),
669 {
670 let mut metadata = if self.metadata_file.exists() {
671 let json_data = std::fs::read_to_string(&self.metadata_file)?;
672 serde_json::from_str(&json_data).unwrap_or_default()
673 } else {
674 CacheMetadata::default()
675 };
676
677 updater(&mut metadata);
678
679 let json_data = serde_json::to_string_pretty(&metadata)?;
680 std::fs::write(&self.metadata_file, json_data)?;
681
682 Ok(())
683 }
684}
685
686impl Default for CacheStats {
687 fn default() -> Self {
688 Self {
689 total_contacts: 0,
690 high_quality_contacts: 0,
691 verified_contacts: 0,
692 last_merge: chrono::Utc::now(),
693 last_cleanup: chrono::Utc::now(),
694 cache_hit_rate: 0.0,
695 average_quality_score: 0.0,
696 iroh_contacts: 0,
697 nat_traversal_contacts: 0,
698 avg_iroh_setup_time_ms: 0.0,
699 preferred_iroh_connection_type: None,
700 }
701 }
702}
703
704impl Default for CacheMetadata {
705 fn default() -> Self {
706 let now = chrono::Utc::now();
707 Self {
708 last_merge: now,
709 last_cleanup: now,
710 last_quality_update: now,
711 total_merges: 0,
712 total_cleanups: 0,
713 corruption_count: 0,
714 instance_count: 0,
715 }
716 }
717}
718
719struct FileLock {
721 _file: std::fs::File,
722}
723
724impl FileLock {
725 async fn acquire(lock_file: &PathBuf) -> Result<Self> {
726 use std::fs::OpenOptions;
727
728 let file = OpenOptions::new()
729 .create(true)
730 .truncate(true)
731 .write(true)
732 .open(lock_file)
733 .map_err(|e| {
734 P2PError::Bootstrap(BootstrapError::CacheError(
735 format!("Failed to create lock file: {e}").into(),
736 ))
737 })?;
738
739 Ok(Self { _file: file })
743 }
744}
745
746fn generate_instance_id() -> String {
748 format!(
749 "{}_{}",
750 std::process::id(),
751 SystemTime::now()
752 .duration_since(SystemTime::UNIX_EPOCH)
753 .map(|d| d.as_millis())
754 .unwrap_or(0)
755 )
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use tempfile::TempDir;
762
763 #[tokio::test]
764 async fn test_cache_creation() {
765 let temp_dir = TempDir::new().unwrap();
766 let config = CacheConfig {
767 cache_dir: temp_dir.path().to_path_buf(),
768 max_contacts: 100,
769 ..CacheConfig::default()
770 };
771
772 let cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await;
773 assert!(cache.is_ok());
774 }
775
776 #[tokio::test]
777 async fn test_add_and_retrieve_contacts() {
778 let temp_dir = TempDir::new().unwrap();
779 let config = CacheConfig {
780 cache_dir: temp_dir.path().to_path_buf(),
781 max_contacts: 100,
782 ..CacheConfig::default()
783 };
784
785 let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config)
786 .await
787 .unwrap();
788
789 let contact = ContactEntry::new(
790 PeerId::from("test-peer"),
791 vec!["127.0.0.1:9000".parse().unwrap()],
792 );
793
794 cache.add_contact(contact).await.unwrap();
795
796 let bootstrap_peers = cache.get_bootstrap_peers(10).await.unwrap();
797 assert_eq!(bootstrap_peers.len(), 1);
798 }
799
800 #[tokio::test]
801 async fn test_cache_persistence() {
802 let temp_dir = TempDir::new().unwrap();
803 let config = CacheConfig {
804 cache_dir: temp_dir.path().to_path_buf(),
805 max_contacts: 100,
806 ..CacheConfig::default()
807 };
808
809 {
811 let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config.clone())
812 .await
813 .unwrap();
814 let contact = ContactEntry::new(
815 PeerId::from("test-peer"),
816 vec!["127.0.0.1:9000".parse().unwrap()],
817 );
818 cache.add_contact(contact).await.unwrap();
819 cache.save_to_disk().await.unwrap();
820 }
821
822 {
824 let cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config)
825 .await
826 .unwrap();
827 let bootstrap_peers = cache.get_bootstrap_peers(10).await.unwrap();
828 assert_eq!(bootstrap_peers.len(), 1);
829 }
830 }
831
832 #[tokio::test]
833 async fn test_cache_eviction() {
834 let temp_dir = TempDir::new().unwrap();
835 let config = CacheConfig {
836 cache_dir: temp_dir.path().to_path_buf(),
837 max_contacts: 5,
838 ..CacheConfig::default()
839 };
840
841 let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config)
842 .await
843 .unwrap();
844
845 for i in 0..10 {
847 let contact = ContactEntry::new(
848 PeerId::from(format!("test-peer-{}", i)),
849 vec![format!("127.0.0.1:{}", 9000 + i).parse().unwrap()],
850 );
851 cache.add_contact(contact).await.unwrap();
852 }
853
854 let stats = cache.get_stats().await.unwrap();
855 assert!(stats.total_contacts <= 5);
856 }
857}