use crate::{PeerId, Result, P2PError};
use crate::bootstrap::{ContactEntry, BootstrapCache};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::SystemTime;
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};
#[derive(Clone)]
pub struct MergeCoordinator {
cache_dir: PathBuf,
instance_cache_dir: PathBuf,
merge_strategy: MergeStrategy,
}
#[derive(Debug, Clone)]
pub enum MergeStrategy {
QualityBased,
TimestampBased,
MetricsCombined,
SuccessRateBased,
}
#[derive(Debug, Serialize, Deserialize)]
struct InstanceCacheData {
instance_id: String,
timestamp: chrono::DateTime<chrono::Utc>,
process_id: u32,
contacts: HashMap<PeerId, ContactEntry>,
version: u32,
}
#[derive(Debug)]
pub struct MergeResult {
pub contacts_merged: usize,
pub contacts_updated: usize,
pub contacts_added: usize,
pub conflicts_resolved: usize,
pub instances_processed: usize,
pub merge_duration_ms: u64,
}
#[derive(Debug)]
struct ConflictInfo {
peer_id: PeerId,
main_contact: ContactEntry,
instance_contact: ContactEntry,
resolution_strategy: MergeStrategy,
}
impl MergeCoordinator {
pub fn new(cache_dir: PathBuf) -> Result<Self> {
let instance_cache_dir = cache_dir.join("instance_caches");
std::fs::create_dir_all(&instance_cache_dir)
.map_err(|e| P2PError::Bootstrap(format!("Failed to create instance cache directory: {}", e)))?;
Ok(Self {
cache_dir,
instance_cache_dir,
merge_strategy: MergeStrategy::QualityBased,
})
}
pub fn with_strategy(cache_dir: PathBuf, strategy: MergeStrategy) -> Result<Self> {
let mut coordinator = Self::new(cache_dir)?;
coordinator.merge_strategy = strategy;
Ok(coordinator)
}
pub async fn merge_instance_caches(&self, main_cache: &BootstrapCache) -> Result<MergeResult> {
let merge_start = SystemTime::now();
debug!("Starting merge of instance caches");
let instance_files = self.discover_instance_caches()?;
if instance_files.is_empty() {
debug!("No instance caches found to merge");
return Ok(MergeResult::empty());
}
let instance_caches = self.load_instance_caches(instance_files).await?;
let merge_result = self.perform_merge(main_cache, instance_caches).await?;
self.cleanup_processed_caches().await?;
let merge_duration = merge_start.elapsed()
.unwrap_or_default()
.as_millis() as u64;
info!("Merge completed: {} contacts processed, {} conflicts resolved in {}ms",
merge_result.contacts_merged, merge_result.conflicts_resolved, merge_duration);
Ok(MergeResult {
merge_duration_ms: merge_duration,
..merge_result
})
}
fn discover_instance_caches(&self) -> Result<Vec<PathBuf>> {
let mut cache_files = Vec::new();
if !self.instance_cache_dir.exists() {
return Ok(cache_files);
}
let entries = std::fs::read_dir(&self.instance_cache_dir)
.map_err(|e| P2PError::Bootstrap(format!("Failed to read instance cache directory: {}", e)))?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("cache") {
if let Some(process_id) = self.extract_process_id(&path) {
if self.is_process_running(process_id) {
cache_files.push(path);
} else {
cache_files.push(path);
}
}
}
}
debug!("Discovered {} instance cache files", cache_files.len());
Ok(cache_files)
}
async fn load_instance_caches(&self, cache_files: Vec<PathBuf>) -> Result<Vec<InstanceCacheData>> {
let mut instance_caches = Vec::new();
for cache_file in cache_files {
match self.load_instance_cache(&cache_file).await {
Ok(cache_data) => {
if self.validate_instance_cache(&cache_data) {
instance_caches.push(cache_data);
} else {
warn!("Invalid instance cache found: {:?}", cache_file);
}
}
Err(e) => {
warn!("Failed to load instance cache {:?}: {}", cache_file, e);
}
}
}
debug!("Loaded {} valid instance caches", instance_caches.len());
Ok(instance_caches)
}
async fn load_instance_cache(&self, cache_file: &PathBuf) -> Result<InstanceCacheData> {
let json_data = std::fs::read_to_string(cache_file)
.map_err(|e| P2PError::Bootstrap(format!("Failed to read instance cache: {}", e)))?;
let cache_data: InstanceCacheData = serde_json::from_str(&json_data)
.map_err(|e| P2PError::Bootstrap(format!("Failed to parse instance cache: {}", e)))?;
Ok(cache_data)
}
fn validate_instance_cache(&self, cache_data: &InstanceCacheData) -> bool {
if cache_data.version != 1 {
return false;
}
let now = chrono::Utc::now();
let age = now.signed_duration_since(cache_data.timestamp);
if age.num_hours() > 24 {
debug!("Instance cache too old: {} hours", age.num_hours());
return false;
}
true
}
async fn perform_merge(&self, main_cache: &BootstrapCache, instance_caches: Vec<InstanceCacheData>) -> Result<MergeResult> {
let mut result = MergeResult::empty();
result.instances_processed = instance_caches.len();
let mut merged_contacts = main_cache.get_all_contacts().await;
for instance_cache in instance_caches {
let instance_result = self.merge_single_instance(&mut merged_contacts, instance_cache).await?;
result.combine(instance_result);
}
main_cache.set_all_contacts(merged_contacts).await;
main_cache.save_to_disk().await?;
Ok(result)
}
async fn merge_single_instance(&self, main_contacts: &mut HashMap<PeerId, ContactEntry>, instance_cache: InstanceCacheData) -> Result<MergeResult> {
let mut result = MergeResult::empty();
for (peer_id, instance_contact) in instance_cache.contacts {
match main_contacts.get(&peer_id) {
Some(main_contact) => {
let resolved_contact = self.resolve_conflict(main_contact, &instance_contact)?;
if resolved_contact.quality_metrics.quality_score != main_contact.quality_metrics.quality_score {
result.contacts_updated += 1;
result.conflicts_resolved += 1;
}
main_contacts.insert(peer_id, resolved_contact);
}
None => {
main_contacts.insert(peer_id, instance_contact);
result.contacts_added += 1;
}
}
result.contacts_merged += 1;
}
Ok(result)
}
fn resolve_conflict(&self, main_contact: &ContactEntry, instance_contact: &ContactEntry) -> Result<ContactEntry> {
match self.merge_strategy {
MergeStrategy::QualityBased => {
if instance_contact.quality_metrics.quality_score > main_contact.quality_metrics.quality_score {
Ok(instance_contact.clone())
} else {
Ok(main_contact.clone())
}
}
MergeStrategy::TimestampBased => {
if instance_contact.last_seen > main_contact.last_seen {
Ok(instance_contact.clone())
} else {
Ok(main_contact.clone())
}
}
MergeStrategy::MetricsCombined => {
self.combine_contact_metrics(main_contact, instance_contact)
}
MergeStrategy::SuccessRateBased => {
if instance_contact.quality_metrics.success_rate > main_contact.quality_metrics.success_rate {
Ok(instance_contact.clone())
} else {
Ok(main_contact.clone())
}
}
}
}
fn combine_contact_metrics(&self, main_contact: &ContactEntry, instance_contact: &ContactEntry) -> Result<ContactEntry> {
let mut combined_contact = main_contact.clone();
if instance_contact.last_seen > main_contact.last_seen {
combined_contact.last_seen = instance_contact.last_seen;
}
combined_contact.connection_history.total_attempts +=
instance_contact.connection_history.total_attempts;
combined_contact.connection_history.successful_connections +=
instance_contact.connection_history.successful_connections;
combined_contact.connection_history.failed_connections +=
instance_contact.connection_history.failed_connections;
for addr in &instance_contact.addresses {
if !combined_contact.addresses.contains(addr) {
combined_contact.addresses.push(addr.clone());
}
}
for capability in &instance_contact.capabilities {
if !combined_contact.capabilities.contains(capability) {
combined_contact.capabilities.push(capability.clone());
}
}
if instance_contact.reputation_score > combined_contact.reputation_score {
combined_contact.reputation_score = instance_contact.reputation_score;
}
combined_contact.ipv6_identity_verified =
combined_contact.ipv6_identity_verified || instance_contact.ipv6_identity_verified;
combined_contact.update_success_rate();
combined_contact.recalculate_quality_score();
Ok(combined_contact)
}
async fn cleanup_processed_caches(&self) -> Result<()> {
let cache_files = self.discover_instance_caches()?;
let mut cleaned_count = 0;
for cache_file in cache_files {
if let Some(process_id) = self.extract_process_id(&cache_file) {
if !self.is_process_running(process_id) {
if let Err(e) = std::fs::remove_file(&cache_file) {
warn!("Failed to remove old instance cache {:?}: {}", cache_file, e);
} else {
cleaned_count += 1;
}
}
}
}
if cleaned_count > 0 {
debug!("Cleaned up {} old instance cache files", cleaned_count);
}
Ok(())
}
fn extract_process_id(&self, cache_file: &PathBuf) -> Option<u32> {
cache_file
.file_stem()
.and_then(|name| name.to_str())
.and_then(|name| {
let parts: Vec<&str> = name.split('_').collect();
if parts.len() >= 2 {
parts[0].parse().ok()
} else {
None
}
})
}
fn is_process_running(&self, process_id: u32) -> bool {
#[cfg(unix)]
{
use std::process::Command;
Command::new("kill")
.args(&["-0", &process_id.to_string()])
.output()
.map(|output| output.status.success())
.unwrap_or(false)
}
#[cfg(windows)]
{
use std::process::Command;
Command::new("tasklist")
.args(&["/FI", &format!("PID eq {}", process_id)])
.output()
.map(|output| {
String::from_utf8_lossy(&output.stdout)
.contains(&process_id.to_string())
})
.unwrap_or(false)
}
#[cfg(not(any(unix, windows)))]
{
true
}
}
pub fn get_strategy(&self) -> &MergeStrategy {
&self.merge_strategy
}
pub fn set_strategy(&mut self, strategy: MergeStrategy) {
self.merge_strategy = strategy;
}
}
impl MergeResult {
fn empty() -> Self {
Self {
contacts_merged: 0,
contacts_updated: 0,
contacts_added: 0,
conflicts_resolved: 0,
instances_processed: 0,
merge_duration_ms: 0,
}
}
fn combine(&mut self, other: MergeResult) {
self.contacts_merged += other.contacts_merged;
self.contacts_updated += other.contacts_updated;
self.contacts_added += other.contacts_added;
self.conflicts_resolved += other.conflicts_resolved;
}
}
impl std::fmt::Display for MergeResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MergeResult {{ merged: {}, updated: {}, added: {}, conflicts: {}, instances: {}, duration: {}ms }}",
self.contacts_merged,
self.contacts_updated,
self.contacts_added,
self.conflicts_resolved,
self.instances_processed,
self.merge_duration_ms
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_merge_coordinator_creation() {
let temp_dir = TempDir::new().unwrap();
let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf());
assert!(coordinator.is_ok());
}
#[tokio::test]
async fn test_conflict_resolution_quality_based() {
let temp_dir = TempDir::new().unwrap();
let coordinator = MergeCoordinator::with_strategy(
temp_dir.path().to_path_buf(),
MergeStrategy::QualityBased
).unwrap();
let mut main_contact = ContactEntry::new(
PeerId::from("test-peer"),
vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
);
main_contact.quality_metrics.quality_score = 0.5;
let mut instance_contact = ContactEntry::new(
PeerId::from("test-peer"),
vec!["/ip4/127.0.0.1/tcp/9001".to_string()]
);
instance_contact.quality_metrics.quality_score = 0.8;
let resolved = coordinator.resolve_conflict(&main_contact, &instance_contact).unwrap();
assert_eq!(resolved.quality_metrics.quality_score, 0.8);
}
#[tokio::test]
async fn test_metrics_combination() {
let temp_dir = TempDir::new().unwrap();
let coordinator = MergeCoordinator::with_strategy(
temp_dir.path().to_path_buf(),
MergeStrategy::MetricsCombined
).unwrap();
let mut main_contact = ContactEntry::new(
PeerId::from("test-peer"),
vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
);
main_contact.connection_history.total_attempts = 10;
main_contact.connection_history.successful_connections = 8;
let mut instance_contact = ContactEntry::new(
PeerId::from("test-peer"),
vec!["/ip4/127.0.0.1/tcp/9001".to_string()]
);
instance_contact.connection_history.total_attempts = 5;
instance_contact.connection_history.successful_connections = 4;
let combined = coordinator.combine_contact_metrics(&main_contact, &instance_contact).unwrap();
assert_eq!(combined.connection_history.total_attempts, 15);
assert_eq!(combined.connection_history.successful_connections, 12);
}
#[test]
fn test_process_id_extraction() {
let temp_dir = TempDir::new().unwrap();
let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf()).unwrap();
let cache_file = PathBuf::from("12345_1234567890.cache");
let process_id = coordinator.extract_process_id(&cache_file);
assert_eq!(process_id, Some(12345));
}
}