use crate::{PeerId, Result, P2PError};
use crate::bootstrap::{ContactEntry, QualityMetrics, QualityCalculator, CacheStats};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, SystemTime};
use std::sync::Arc;
use tokio::sync::RwLock;
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn, error};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CacheConfig {
pub cache_dir: PathBuf,
pub max_contacts: usize,
pub merge_interval: Duration,
pub cleanup_interval: Duration,
pub quality_update_interval: Duration,
pub stale_threshold: Duration,
pub connectivity_check_interval: Duration,
pub connectivity_check_count: usize,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
cache_dir: PathBuf::from(".cache/p2p_foundation"),
max_contacts: crate::bootstrap::DEFAULT_MAX_CONTACTS,
merge_interval: crate::bootstrap::DEFAULT_MERGE_INTERVAL,
cleanup_interval: crate::bootstrap::DEFAULT_CLEANUP_INTERVAL,
quality_update_interval: crate::bootstrap::DEFAULT_QUALITY_UPDATE_INTERVAL,
stale_threshold: Duration::from_secs(86400 * 7), connectivity_check_interval: Duration::from_secs(900), connectivity_check_count: 100, }
}
}
#[derive(Debug, thiserror::Error)]
pub enum CacheError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Lock error: {0}")]
Lock(String),
#[error("Cache corruption: {0}")]
Corruption(String),
#[error("Configuration error: {0}")]
Configuration(String),
}
#[derive(Clone)]
pub struct BootstrapCache {
config: CacheConfig,
contacts: Arc<RwLock<HashMap<PeerId, ContactEntry>>>,
instance_id: String,
cache_file: PathBuf,
instance_cache_file: PathBuf,
lock_file: PathBuf,
metadata_file: PathBuf,
quality_calculator: QualityCalculator,
stats: Arc<RwLock<CacheStats>>,
}
#[derive(Debug, Serialize, Deserialize)]
struct CacheData {
version: u32,
instance_id: String,
timestamp: chrono::DateTime<chrono::Utc>,
contacts: HashMap<PeerId, ContactEntry>,
checksum: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct CacheMetadata {
last_merge: chrono::DateTime<chrono::Utc>,
last_cleanup: chrono::DateTime<chrono::Utc>,
last_quality_update: chrono::DateTime<chrono::Utc>,
total_merges: u64,
total_cleanups: u64,
corruption_count: u64,
instance_count: u64,
}
impl BootstrapCache {
pub async fn new(cache_dir: PathBuf, config: CacheConfig) -> Result<Self> {
std::fs::create_dir_all(&cache_dir)
.map_err(|e| P2PError::Bootstrap(format!("Failed to create cache directory: {}", e)))?;
let instance_id = generate_instance_id();
let cache_file = cache_dir.join("bootstrap_cache.json");
let instance_cache_file = cache_dir.join("instance_caches").join(format!("{}.cache", instance_id));
let lock_file = cache_dir.join("bootstrap_cache.lock");
let metadata_file = cache_dir.join("metadata.json");
std::fs::create_dir_all(instance_cache_file.parent().unwrap())
.map_err(|e| P2PError::Bootstrap(format!("Failed to create instance cache directory: {}", e)))?;
let mut cache = Self {
config: config.clone(),
contacts: Arc::new(RwLock::new(HashMap::new())),
instance_id,
cache_file,
instance_cache_file,
lock_file,
metadata_file,
quality_calculator: QualityCalculator::new(),
stats: Arc::new(RwLock::new(CacheStats::default())),
};
cache.load_from_disk().await?;
info!("Bootstrap cache initialized with {} contacts", cache.contacts.read().await.len());
Ok(cache)
}
pub async fn get_bootstrap_peers(&self, count: usize) -> Result<Vec<ContactEntry>> {
let contacts = self.contacts.read().await;
let mut sorted_contacts: Vec<&ContactEntry> = contacts.values().collect();
sorted_contacts.sort_by(|a, b| {
b.quality_metrics.quality_score
.partial_cmp(&a.quality_metrics.quality_score)
.unwrap_or(std::cmp::Ordering::Equal)
});
let selected: Vec<ContactEntry> = sorted_contacts
.into_iter()
.take(count)
.cloned()
.collect();
{
let mut stats = self.stats.write().await;
stats.cache_hit_rate = if !contacts.is_empty() {
selected.len() as f64 / count.min(contacts.len()) as f64
} else {
0.0
};
}
debug!("Selected {} bootstrap peers from {} available contacts", selected.len(), contacts.len());
Ok(selected)
}
pub async fn add_contact(&mut self, contact: ContactEntry) -> Result<()> {
let mut contacts = self.contacts.write().await;
if contacts.len() >= self.config.max_contacts && !contacts.contains_key(&contact.peer_id) {
self.evict_lowest_quality_contacts(&mut contacts).await?;
}
contacts.insert(contact.peer_id.clone(), contact.clone());
drop(contacts);
self.save_to_instance_cache().await?;
debug!("Added contact: {}", contact.summary());
Ok(())
}
pub async fn update_contact_metrics(&mut self, peer_id: &PeerId, metrics: QualityMetrics) -> Result<()> {
let mut contacts = self.contacts.write().await;
if let Some(contact) = contacts.get_mut(peer_id) {
contact.quality_metrics = metrics;
contact.recalculate_quality_score();
debug!("Updated metrics for peer {}: {}", peer_id, contact.summary());
}
Ok(())
}
pub async fn update_quality_scores(&self) -> Result<()> {
let mut contacts = self.contacts.write().await;
let mut updated_count = 0;
for contact in contacts.values_mut() {
let old_score = contact.quality_metrics.quality_score;
let age_seconds = contact.age_seconds() as f64;
let decay_factor = (-age_seconds / 86400.0).exp(); contact.quality_metrics.apply_age_decay(decay_factor);
contact.recalculate_quality_score();
if (contact.quality_metrics.quality_score - old_score).abs() > 0.01 {
updated_count += 1;
}
}
self.update_metadata(|meta| {
meta.last_quality_update = chrono::Utc::now();
}).await?;
debug!("Updated quality scores for {} contacts", updated_count);
Ok(())
}
pub async fn cleanup_stale_entries(&self) -> Result<()> {
let mut contacts = self.contacts.write().await;
let initial_count = contacts.len();
contacts.retain(|_peer_id, contact| {
!contact.is_stale(self.config.stale_threshold)
});
let removed_count = initial_count - contacts.len();
if removed_count > 0 {
info!("Cleaned up {} stale contacts", removed_count);
drop(contacts);
self.save_to_disk().await?;
}
self.update_metadata(|meta| {
meta.last_cleanup = chrono::Utc::now();
meta.total_cleanups += 1;
}).await?;
Ok(())
}
pub async fn get_all_contacts(&self) -> HashMap<PeerId, ContactEntry> {
self.contacts.read().await.clone()
}
pub async fn set_all_contacts(&self, contacts: HashMap<PeerId, ContactEntry>) {
let mut current_contacts = self.contacts.write().await;
*current_contacts = contacts;
}
pub async fn get_stats(&self) -> Result<CacheStats> {
let contacts = self.contacts.read().await;
let mut stats = self.stats.write().await;
stats.total_contacts = contacts.len();
stats.high_quality_contacts = contacts.values()
.filter(|c| c.quality_metrics.quality_score > 0.7)
.count();
stats.verified_contacts = contacts.values()
.filter(|c| c.ipv6_identity_verified)
.count();
if !contacts.is_empty() {
stats.average_quality_score = contacts.values()
.map(|c| c.quality_metrics.quality_score)
.sum::<f64>() / contacts.len() as f64;
}
Ok(stats.clone())
}
async fn load_from_disk(&mut self) -> Result<()> {
if !self.cache_file.exists() {
debug!("No existing cache file found, starting with empty cache");
return Ok(());
}
let _lock = self.acquire_file_lock().await?;
match self.load_cache_data().await {
Ok(cache_data) => {
if self.verify_cache_integrity(&cache_data) {
let mut contacts = self.contacts.write().await;
*contacts = cache_data.contacts;
info!("Loaded {} contacts from cache", contacts.len());
} else {
warn!("Cache integrity check failed, starting with empty cache");
self.handle_cache_corruption().await?;
}
}
Err(e) => {
warn!("Failed to load cache: {}, starting with empty cache", e);
self.handle_cache_corruption().await?;
}
}
Ok(())
}
pub async fn save_to_disk(&self) -> Result<()> {
let _lock = self.acquire_file_lock().await?;
let contacts = self.contacts.read().await;
let cache_data = CacheData {
version: 1,
instance_id: self.instance_id.clone(),
timestamp: chrono::Utc::now(),
contacts: contacts.clone(),
checksum: self.calculate_checksum(&contacts),
};
let temp_file = self.cache_file.with_extension("tmp");
let json_data = serde_json::to_string_pretty(&cache_data)
.map_err(|e| P2PError::Bootstrap(format!("Failed to serialize cache: {}", e)))?;
std::fs::write(&temp_file, json_data)
.map_err(|e| P2PError::Bootstrap(format!("Failed to write cache file: {}", e)))?;
std::fs::rename(temp_file, &self.cache_file)
.map_err(|e| P2PError::Bootstrap(format!("Failed to rename cache file: {}", e)))?;
debug!("Saved {} contacts to cache", contacts.len());
Ok(())
}
async fn save_to_instance_cache(&self) -> Result<()> {
let contacts = self.contacts.read().await;
let cache_data = CacheData {
version: 1,
instance_id: self.instance_id.clone(),
timestamp: chrono::Utc::now(),
contacts: contacts.clone(),
checksum: self.calculate_checksum(&contacts),
};
let json_data = serde_json::to_string(&cache_data)
.map_err(|e| P2PError::Bootstrap(format!("Failed to serialize instance cache: {}", e)))?;
std::fs::write(&self.instance_cache_file, json_data)
.map_err(|e| P2PError::Bootstrap(format!("Failed to write instance cache: {}", e)))?;
Ok(())
}
async fn acquire_file_lock(&self) -> Result<FileLock> {
FileLock::acquire(&self.lock_file).await
}
async fn load_cache_data(&self) -> Result<CacheData> {
let json_data = std::fs::read_to_string(&self.cache_file)
.map_err(|e| P2PError::Bootstrap(format!("Failed to read cache file: {}", e)))?;
let cache_data: CacheData = serde_json::from_str(&json_data)
.map_err(|e| P2PError::Bootstrap(format!("Failed to parse cache file: {}", e)))?;
Ok(cache_data)
}
fn verify_cache_integrity(&self, cache_data: &CacheData) -> bool {
let calculated_checksum = self.calculate_checksum(&cache_data.contacts);
cache_data.checksum == calculated_checksum
}
fn calculate_checksum(&self, contacts: &HashMap<PeerId, ContactEntry>) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
let mut sorted_contacts: Vec<_> = contacts.iter().collect();
sorted_contacts.sort_by_key(|(peer_id, _)| *peer_id);
for (peer_id, contact) in sorted_contacts {
peer_id.hash(&mut hasher);
contact.quality_metrics.success_rate.to_bits().hash(&mut hasher);
contact.addresses.len().hash(&mut hasher);
}
hasher.finish()
}
async fn handle_cache_corruption(&self) -> Result<()> {
warn!("Handling cache corruption, backing up corrupted file");
if self.cache_file.exists() {
let backup_file = self.cache_file.with_extension("corrupted");
if let Err(e) = std::fs::rename(&self.cache_file, backup_file) {
error!("Failed to backup corrupted cache: {}", e);
}
}
self.update_metadata(|meta| {
meta.corruption_count += 1;
}).await?;
Ok(())
}
async fn evict_lowest_quality_contacts(&self, contacts: &mut HashMap<PeerId, ContactEntry>) -> Result<()> {
let eviction_count = (self.config.max_contacts / 10).max(1);
let mut sorted_contacts: Vec<_> = contacts.iter().collect();
sorted_contacts.sort_by(|a, b| {
a.1.quality_metrics.quality_score
.partial_cmp(&b.1.quality_metrics.quality_score)
.unwrap_or(std::cmp::Ordering::Equal)
});
let to_evict: Vec<PeerId> = sorted_contacts
.into_iter()
.take(eviction_count)
.map(|(peer_id, _)| peer_id.clone())
.collect();
for peer_id in to_evict {
contacts.remove(&peer_id);
}
debug!("Evicted {} lowest quality contacts", eviction_count);
Ok(())
}
async fn update_metadata<F>(&self, updater: F) -> Result<()>
where
F: FnOnce(&mut CacheMetadata),
{
let mut metadata = if self.metadata_file.exists() {
let json_data = std::fs::read_to_string(&self.metadata_file)?;
serde_json::from_str(&json_data).unwrap_or_default()
} else {
CacheMetadata::default()
};
updater(&mut metadata);
let json_data = serde_json::to_string_pretty(&metadata)?;
std::fs::write(&self.metadata_file, json_data)?;
Ok(())
}
}
impl Default for CacheStats {
fn default() -> Self {
Self {
total_contacts: 0,
high_quality_contacts: 0,
verified_contacts: 0,
last_merge: chrono::Utc::now(),
last_cleanup: chrono::Utc::now(),
cache_hit_rate: 0.0,
average_quality_score: 0.0,
}
}
}
impl Default for CacheMetadata {
fn default() -> Self {
let now = chrono::Utc::now();
Self {
last_merge: now,
last_cleanup: now,
last_quality_update: now,
total_merges: 0,
total_cleanups: 0,
corruption_count: 0,
instance_count: 0,
}
}
}
struct FileLock {
_file: std::fs::File,
}
impl FileLock {
async fn acquire(lock_file: &PathBuf) -> Result<Self> {
use std::fs::OpenOptions;
let file = OpenOptions::new()
.create(true)
.write(true)
.open(lock_file)
.map_err(|e| P2PError::Bootstrap(format!("Failed to create lock file: {}", e)))?;
Ok(Self { _file: file })
}
}
fn generate_instance_id() -> String {
format!("{}_{}", std::process::id(), SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_cache_creation() {
let temp_dir = TempDir::new().unwrap();
let config = CacheConfig {
cache_dir: temp_dir.path().to_path_buf(),
max_contacts: 100,
..CacheConfig::default()
};
let cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await;
assert!(cache.is_ok());
}
#[tokio::test]
async fn test_add_and_retrieve_contacts() {
let temp_dir = TempDir::new().unwrap();
let config = CacheConfig {
cache_dir: temp_dir.path().to_path_buf(),
max_contacts: 100,
..CacheConfig::default()
};
let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
let contact = ContactEntry::new(
PeerId::from("test-peer"),
vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
);
cache.add_contact(contact).await.unwrap();
let bootstrap_peers = cache.get_bootstrap_peers(10).await.unwrap();
assert_eq!(bootstrap_peers.len(), 1);
}
#[tokio::test]
async fn test_cache_persistence() {
let temp_dir = TempDir::new().unwrap();
let config = CacheConfig {
cache_dir: temp_dir.path().to_path_buf(),
max_contacts: 100,
..CacheConfig::default()
};
{
let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config.clone()).await.unwrap();
let contact = ContactEntry::new(
PeerId::from("test-peer"),
vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
);
cache.add_contact(contact).await.unwrap();
cache.save_to_disk().await.unwrap();
}
{
let cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
let bootstrap_peers = cache.get_bootstrap_peers(10).await.unwrap();
assert_eq!(bootstrap_peers.len(), 1);
}
}
#[tokio::test]
async fn test_cache_eviction() {
let temp_dir = TempDir::new().unwrap();
let config = CacheConfig {
cache_dir: temp_dir.path().to_path_buf(),
max_contacts: 5,
..CacheConfig::default()
};
let mut cache = BootstrapCache::new(temp_dir.path().to_path_buf(), config).await.unwrap();
for i in 0..10 {
let contact = ContactEntry::new(
PeerId::from(format!("test-peer-{}", i)),
vec![format!("/ip4/127.0.0.1/tcp/{}", 9000 + i)]
);
cache.add_contact(contact).await.unwrap();
}
let stats = cache.get_stats().await.unwrap();
assert!(stats.total_contacts <= 5);
}
}