1use crate::{PeerId, Result, P2PError};
7use crate::bootstrap::{ContactEntry, BootstrapCache};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::time::SystemTime;
11use serde::{Deserialize, Serialize};
12use tracing::{debug, info, warn};
13
14#[derive(Clone)]
16pub struct MergeCoordinator {
17 cache_dir: PathBuf,
19 instance_cache_dir: PathBuf,
21 merge_strategy: MergeStrategy,
23}
24
25#[derive(Debug, Clone)]
27pub enum MergeStrategy {
28 QualityBased,
30 TimestampBased,
32 MetricsCombined,
34 SuccessRateBased,
36}
37
38#[derive(Debug, Serialize, Deserialize)]
40struct InstanceCacheData {
41 instance_id: String,
42 timestamp: chrono::DateTime<chrono::Utc>,
43 process_id: u32,
44 contacts: HashMap<PeerId, ContactEntry>,
45 version: u32,
46}
47
48#[derive(Debug)]
50pub struct MergeResult {
51 pub contacts_merged: usize,
53 pub contacts_updated: usize,
55 pub contacts_added: usize,
57 pub conflicts_resolved: usize,
59 pub instances_processed: usize,
61 pub merge_duration_ms: u64,
63}
64
65#[derive(Debug)]
67struct ConflictInfo {
68 peer_id: PeerId,
69 main_contact: ContactEntry,
70 instance_contact: ContactEntry,
71 resolution_strategy: MergeStrategy,
72}
73
74impl MergeCoordinator {
75 pub fn new(cache_dir: PathBuf) -> Result<Self> {
77 let instance_cache_dir = cache_dir.join("instance_caches");
78
79 std::fs::create_dir_all(&instance_cache_dir)
81 .map_err(|e| P2PError::Bootstrap(format!("Failed to create instance cache directory: {}", e)))?;
82
83 Ok(Self {
84 cache_dir,
85 instance_cache_dir,
86 merge_strategy: MergeStrategy::QualityBased,
87 })
88 }
89
90 pub fn with_strategy(cache_dir: PathBuf, strategy: MergeStrategy) -> Result<Self> {
92 let mut coordinator = Self::new(cache_dir)?;
93 coordinator.merge_strategy = strategy;
94 Ok(coordinator)
95 }
96
97 pub async fn merge_instance_caches(&self, main_cache: &BootstrapCache) -> Result<MergeResult> {
99 let merge_start = SystemTime::now();
100
101 debug!("Starting merge of instance caches");
102
103 let instance_files = self.discover_instance_caches()?;
105
106 if instance_files.is_empty() {
107 debug!("No instance caches found to merge");
108 return Ok(MergeResult::empty());
109 }
110
111 let instance_caches = self.load_instance_caches(instance_files).await?;
113
114 let merge_result = self.perform_merge(main_cache, instance_caches).await?;
116
117 self.cleanup_processed_caches().await?;
119
120 let merge_duration = merge_start.elapsed()
121 .unwrap_or_default()
122 .as_millis() as u64;
123
124 info!("Merge completed: {} contacts processed, {} conflicts resolved in {}ms",
125 merge_result.contacts_merged, merge_result.conflicts_resolved, merge_duration);
126
127 Ok(MergeResult {
128 merge_duration_ms: merge_duration,
129 ..merge_result
130 })
131 }
132
133 fn discover_instance_caches(&self) -> Result<Vec<PathBuf>> {
135 let mut cache_files = Vec::new();
136
137 if !self.instance_cache_dir.exists() {
138 return Ok(cache_files);
139 }
140
141 let entries = std::fs::read_dir(&self.instance_cache_dir)
142 .map_err(|e| P2PError::Bootstrap(format!("Failed to read instance cache directory: {}", e)))?;
143
144 for entry in entries {
145 let entry = entry?;
146 let path = entry.path();
147
148 if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("cache") {
149 if let Some(process_id) = self.extract_process_id(&path) {
151 if self.is_process_running(process_id) {
152 cache_files.push(path);
153 } else {
154 cache_files.push(path);
156 }
157 }
158 }
159 }
160
161 debug!("Discovered {} instance cache files", cache_files.len());
162
163 Ok(cache_files)
164 }
165
166 async fn load_instance_caches(&self, cache_files: Vec<PathBuf>) -> Result<Vec<InstanceCacheData>> {
168 let mut instance_caches = Vec::new();
169
170 for cache_file in cache_files {
171 match self.load_instance_cache(&cache_file).await {
172 Ok(cache_data) => {
173 if self.validate_instance_cache(&cache_data) {
174 instance_caches.push(cache_data);
175 } else {
176 warn!("Invalid instance cache found: {:?}", cache_file);
177 }
178 }
179 Err(e) => {
180 warn!("Failed to load instance cache {:?}: {}", cache_file, e);
181 }
182 }
183 }
184
185 debug!("Loaded {} valid instance caches", instance_caches.len());
186
187 Ok(instance_caches)
188 }
189
190 async fn load_instance_cache(&self, cache_file: &PathBuf) -> Result<InstanceCacheData> {
192 let json_data = std::fs::read_to_string(cache_file)
193 .map_err(|e| P2PError::Bootstrap(format!("Failed to read instance cache: {}", e)))?;
194
195 let cache_data: InstanceCacheData = serde_json::from_str(&json_data)
196 .map_err(|e| P2PError::Bootstrap(format!("Failed to parse instance cache: {}", e)))?;
197
198 Ok(cache_data)
199 }
200
201 fn validate_instance_cache(&self, cache_data: &InstanceCacheData) -> bool {
203 if cache_data.version != 1 {
205 return false;
206 }
207
208 let now = chrono::Utc::now();
210 let age = now.signed_duration_since(cache_data.timestamp);
211
212 if age.num_hours() > 24 {
213 debug!("Instance cache too old: {} hours", age.num_hours());
214 return false;
215 }
216
217 true
218 }
219
220 async fn perform_merge(&self, main_cache: &BootstrapCache, instance_caches: Vec<InstanceCacheData>) -> Result<MergeResult> {
222 let mut result = MergeResult::empty();
223 result.instances_processed = instance_caches.len();
224
225 let mut merged_contacts = main_cache.get_all_contacts().await;
227
228 for instance_cache in instance_caches {
230 let instance_result = self.merge_single_instance(&mut merged_contacts, instance_cache).await?;
231 result.combine(instance_result);
232 }
233
234 main_cache.set_all_contacts(merged_contacts).await;
236
237 main_cache.save_to_disk().await?;
239
240 Ok(result)
241 }
242
243 async fn merge_single_instance(&self, main_contacts: &mut HashMap<PeerId, ContactEntry>, instance_cache: InstanceCacheData) -> Result<MergeResult> {
245 let mut result = MergeResult::empty();
246
247 for (peer_id, instance_contact) in instance_cache.contacts {
248 match main_contacts.get(&peer_id) {
249 Some(main_contact) => {
250 let resolved_contact = self.resolve_conflict(main_contact, &instance_contact)?;
252
253 if resolved_contact.quality_metrics.quality_score != main_contact.quality_metrics.quality_score {
254 result.contacts_updated += 1;
255 result.conflicts_resolved += 1;
256 }
257
258 main_contacts.insert(peer_id, resolved_contact);
259 }
260 None => {
261 main_contacts.insert(peer_id, instance_contact);
263 result.contacts_added += 1;
264 }
265 }
266
267 result.contacts_merged += 1;
268 }
269
270 Ok(result)
271 }
272
273 fn resolve_conflict(&self, main_contact: &ContactEntry, instance_contact: &ContactEntry) -> Result<ContactEntry> {
275 match self.merge_strategy {
276 MergeStrategy::QualityBased => {
277 if instance_contact.quality_metrics.quality_score > main_contact.quality_metrics.quality_score {
278 Ok(instance_contact.clone())
279 } else {
280 Ok(main_contact.clone())
281 }
282 }
283
284 MergeStrategy::TimestampBased => {
285 if instance_contact.last_seen > main_contact.last_seen {
286 Ok(instance_contact.clone())
287 } else {
288 Ok(main_contact.clone())
289 }
290 }
291
292 MergeStrategy::MetricsCombined => {
293 self.combine_contact_metrics(main_contact, instance_contact)
294 }
295
296 MergeStrategy::SuccessRateBased => {
297 if instance_contact.quality_metrics.success_rate > main_contact.quality_metrics.success_rate {
298 Ok(instance_contact.clone())
299 } else {
300 Ok(main_contact.clone())
301 }
302 }
303 }
304 }
305
306 fn combine_contact_metrics(&self, main_contact: &ContactEntry, instance_contact: &ContactEntry) -> Result<ContactEntry> {
308 let mut combined_contact = main_contact.clone();
309
310 if instance_contact.last_seen > main_contact.last_seen {
312 combined_contact.last_seen = instance_contact.last_seen;
313 }
314
315 combined_contact.connection_history.total_attempts +=
317 instance_contact.connection_history.total_attempts;
318 combined_contact.connection_history.successful_connections +=
319 instance_contact.connection_history.successful_connections;
320 combined_contact.connection_history.failed_connections +=
321 instance_contact.connection_history.failed_connections;
322
323 for addr in &instance_contact.addresses {
325 if !combined_contact.addresses.contains(addr) {
326 combined_contact.addresses.push(addr.clone());
327 }
328 }
329
330 for capability in &instance_contact.capabilities {
332 if !combined_contact.capabilities.contains(capability) {
333 combined_contact.capabilities.push(capability.clone());
334 }
335 }
336
337 if instance_contact.reputation_score > combined_contact.reputation_score {
339 combined_contact.reputation_score = instance_contact.reputation_score;
340 }
341
342 combined_contact.ipv6_identity_verified =
344 combined_contact.ipv6_identity_verified || instance_contact.ipv6_identity_verified;
345
346 combined_contact.update_success_rate();
348 combined_contact.recalculate_quality_score();
349
350 Ok(combined_contact)
351 }
352
353 async fn cleanup_processed_caches(&self) -> Result<()> {
355 let cache_files = self.discover_instance_caches()?;
356 let mut cleaned_count = 0;
357
358 for cache_file in cache_files {
359 if let Some(process_id) = self.extract_process_id(&cache_file) {
361 if !self.is_process_running(process_id) {
362 if let Err(e) = std::fs::remove_file(&cache_file) {
364 warn!("Failed to remove old instance cache {:?}: {}", cache_file, e);
365 } else {
366 cleaned_count += 1;
367 }
368 }
369 }
370 }
371
372 if cleaned_count > 0 {
373 debug!("Cleaned up {} old instance cache files", cleaned_count);
374 }
375
376 Ok(())
377 }
378
379 fn extract_process_id(&self, cache_file: &PathBuf) -> Option<u32> {
381 cache_file
382 .file_stem()
383 .and_then(|name| name.to_str())
384 .and_then(|name| {
385 let parts: Vec<&str> = name.split('_').collect();
386 if parts.len() >= 2 {
387 parts[0].parse().ok()
388 } else {
389 None
390 }
391 })
392 }
393
394 fn is_process_running(&self, process_id: u32) -> bool {
396 #[cfg(unix)]
397 {
398 use std::process::Command;
399 Command::new("kill")
400 .args(&["-0", &process_id.to_string()])
401 .output()
402 .map(|output| output.status.success())
403 .unwrap_or(false)
404 }
405
406 #[cfg(windows)]
407 {
408 use std::process::Command;
409 Command::new("tasklist")
410 .args(&["/FI", &format!("PID eq {}", process_id)])
411 .output()
412 .map(|output| {
413 String::from_utf8_lossy(&output.stdout)
414 .contains(&process_id.to_string())
415 })
416 .unwrap_or(false)
417 }
418
419 #[cfg(not(any(unix, windows)))]
420 {
421 true
423 }
424 }
425
426 pub fn get_strategy(&self) -> &MergeStrategy {
428 &self.merge_strategy
429 }
430
431 pub fn set_strategy(&mut self, strategy: MergeStrategy) {
433 self.merge_strategy = strategy;
434 }
435}
436
437impl MergeResult {
438 fn empty() -> Self {
440 Self {
441 contacts_merged: 0,
442 contacts_updated: 0,
443 contacts_added: 0,
444 conflicts_resolved: 0,
445 instances_processed: 0,
446 merge_duration_ms: 0,
447 }
448 }
449
450 fn combine(&mut self, other: MergeResult) {
452 self.contacts_merged += other.contacts_merged;
453 self.contacts_updated += other.contacts_updated;
454 self.contacts_added += other.contacts_added;
455 self.conflicts_resolved += other.conflicts_resolved;
456 }
457}
458
459impl std::fmt::Display for MergeResult {
460 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
461 write!(
462 f,
463 "MergeResult {{ merged: {}, updated: {}, added: {}, conflicts: {}, instances: {}, duration: {}ms }}",
464 self.contacts_merged,
465 self.contacts_updated,
466 self.contacts_added,
467 self.conflicts_resolved,
468 self.instances_processed,
469 self.merge_duration_ms
470 )
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477 use tempfile::TempDir;
478
479 #[tokio::test]
480 async fn test_merge_coordinator_creation() {
481 let temp_dir = TempDir::new().unwrap();
482 let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf());
483 assert!(coordinator.is_ok());
484 }
485
486 #[tokio::test]
487 async fn test_conflict_resolution_quality_based() {
488 let temp_dir = TempDir::new().unwrap();
489 let coordinator = MergeCoordinator::with_strategy(
490 temp_dir.path().to_path_buf(),
491 MergeStrategy::QualityBased
492 ).unwrap();
493
494 let mut main_contact = ContactEntry::new(
495 PeerId::from("test-peer"),
496 vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
497 );
498 main_contact.quality_metrics.quality_score = 0.5;
499
500 let mut instance_contact = ContactEntry::new(
501 PeerId::from("test-peer"),
502 vec!["/ip4/127.0.0.1/tcp/9001".to_string()]
503 );
504 instance_contact.quality_metrics.quality_score = 0.8;
505
506 let resolved = coordinator.resolve_conflict(&main_contact, &instance_contact).unwrap();
507 assert_eq!(resolved.quality_metrics.quality_score, 0.8);
508 }
509
510 #[tokio::test]
511 async fn test_metrics_combination() {
512 let temp_dir = TempDir::new().unwrap();
513 let coordinator = MergeCoordinator::with_strategy(
514 temp_dir.path().to_path_buf(),
515 MergeStrategy::MetricsCombined
516 ).unwrap();
517
518 let mut main_contact = ContactEntry::new(
519 PeerId::from("test-peer"),
520 vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
521 );
522 main_contact.connection_history.total_attempts = 10;
523 main_contact.connection_history.successful_connections = 8;
524
525 let mut instance_contact = ContactEntry::new(
526 PeerId::from("test-peer"),
527 vec!["/ip4/127.0.0.1/tcp/9001".to_string()]
528 );
529 instance_contact.connection_history.total_attempts = 5;
530 instance_contact.connection_history.successful_connections = 4;
531
532 let combined = coordinator.combine_contact_metrics(&main_contact, &instance_contact).unwrap();
533 assert_eq!(combined.connection_history.total_attempts, 15);
534 assert_eq!(combined.connection_history.successful_connections, 12);
535 }
536
537 #[test]
538 fn test_process_id_extraction() {
539 let temp_dir = TempDir::new().unwrap();
540 let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf()).unwrap();
541
542 let cache_file = PathBuf::from("12345_1234567890.cache");
543 let process_id = coordinator.extract_process_id(&cache_file);
544 assert_eq!(process_id, Some(12345));
545 }
546}