1use crate::bootstrap::{BootstrapCache, ContactEntry};
20use crate::error::BootstrapError;
21use crate::{P2PError, PeerId, Result};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::time::SystemTime;
26use tracing::{debug, info, warn};
27
28#[derive(Clone)]
30pub struct MergeCoordinator {
31 _cache_dir: PathBuf,
33 instance_cache_dir: PathBuf,
35 merge_strategy: MergeStrategy,
37}
38
39#[derive(Debug, Clone)]
41pub enum MergeStrategy {
42 QualityBased,
44 TimestampBased,
46 MetricsCombined,
48 SuccessRateBased,
50}
51
52#[derive(Debug, Serialize, Deserialize)]
54struct InstanceCacheData {
55 instance_id: String,
56 timestamp: chrono::DateTime<chrono::Utc>,
57 process_id: u32,
58 contacts: HashMap<PeerId, ContactEntry>,
59 version: u32,
60}
61
62#[derive(Debug)]
64pub struct MergeResult {
65 pub contacts_merged: usize,
67 pub contacts_updated: usize,
69 pub contacts_added: usize,
71 pub conflicts_resolved: usize,
73 pub instances_processed: usize,
75 pub merge_duration_ms: u64,
77}
78
79#[derive(Debug)]
81#[allow(dead_code)]
82struct ConflictInfo {
83 peer_id: PeerId,
84 main_contact: ContactEntry,
85 instance_contact: ContactEntry,
86 resolution_strategy: MergeStrategy,
87}
88
89impl MergeCoordinator {
90 pub fn new(cache_dir: PathBuf) -> Result<Self> {
92 let instance_cache_dir = cache_dir.join("instance_caches");
93
94 std::fs::create_dir_all(&instance_cache_dir).map_err(|e| {
96 P2PError::Bootstrap(BootstrapError::CacheError(
97 format!("Failed to create instance cache directory: {e}").into(),
98 ))
99 })?;
100
101 Ok(Self {
102 _cache_dir: cache_dir,
103 instance_cache_dir,
104 merge_strategy: MergeStrategy::QualityBased,
105 })
106 }
107
108 pub fn with_strategy(cache_dir: PathBuf, strategy: MergeStrategy) -> Result<Self> {
110 let mut coordinator = Self::new(cache_dir)?;
111 coordinator.merge_strategy = strategy;
112 Ok(coordinator)
113 }
114
115 pub async fn merge_instance_caches(&self, main_cache: &BootstrapCache) -> Result<MergeResult> {
117 let merge_start = SystemTime::now();
118
119 debug!("Starting merge of instance caches");
120
121 let instance_files = self.discover_instance_caches()?;
123
124 if instance_files.is_empty() {
125 debug!("No instance caches found to merge");
126 return Ok(MergeResult::empty());
127 }
128
129 let instance_caches = self.load_instance_caches(instance_files).await?;
131
132 let merge_result = self.perform_merge(main_cache, instance_caches).await?;
134
135 self.cleanup_processed_caches().await?;
137
138 let merge_duration = merge_start.elapsed().unwrap_or_default().as_millis() as u64;
139
140 info!(
141 "Merge completed: {} contacts processed, {} conflicts resolved in {}ms",
142 merge_result.contacts_merged, merge_result.conflicts_resolved, merge_duration
143 );
144
145 Ok(MergeResult {
146 merge_duration_ms: merge_duration,
147 ..merge_result
148 })
149 }
150
151 fn discover_instance_caches(&self) -> Result<Vec<PathBuf>> {
153 let mut cache_files = Vec::new();
154
155 if !self.instance_cache_dir.exists() {
156 return Ok(cache_files);
157 }
158
159 let entries = std::fs::read_dir(&self.instance_cache_dir).map_err(|e| {
160 P2PError::Bootstrap(BootstrapError::CacheError(
161 format!("Failed to read instance cache directory: {e}").into(),
162 ))
163 })?;
164
165 for entry in entries {
166 let entry = entry?;
167 let path = entry.path();
168
169 if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("cache") {
170 if let Some(process_id) = self.extract_process_id(&path) {
172 if self.is_process_running(process_id) {
173 cache_files.push(path);
174 } else {
175 cache_files.push(path);
177 }
178 }
179 }
180 }
181
182 debug!("Discovered {} instance cache files", cache_files.len());
183
184 Ok(cache_files)
185 }
186
187 async fn load_instance_caches(
189 &self,
190 cache_files: Vec<PathBuf>,
191 ) -> Result<Vec<InstanceCacheData>> {
192 let mut instance_caches = Vec::new();
193
194 for cache_file in cache_files {
195 match self.load_instance_cache(&cache_file).await {
196 Ok(cache_data) => {
197 if self.validate_instance_cache(&cache_data) {
198 instance_caches.push(cache_data);
199 } else {
200 warn!("Invalid instance cache found: {:?}", cache_file);
201 }
202 }
203 Err(e) => {
204 warn!("Failed to load instance cache {:?}: {}", cache_file, e);
205 }
206 }
207 }
208
209 debug!("Loaded {} valid instance caches", instance_caches.len());
210
211 Ok(instance_caches)
212 }
213
214 async fn load_instance_cache(&self, cache_file: &PathBuf) -> Result<InstanceCacheData> {
216 let json_data = std::fs::read_to_string(cache_file).map_err(|e| {
217 P2PError::Bootstrap(BootstrapError::CacheError(
218 format!("Failed to read instance cache: {e}").into(),
219 ))
220 })?;
221
222 let cache_data: InstanceCacheData = serde_json::from_str(&json_data).map_err(|e| {
223 P2PError::Bootstrap(BootstrapError::InvalidData(
224 format!("Failed to parse instance cache: {e}").into(),
225 ))
226 })?;
227
228 Ok(cache_data)
229 }
230
231 fn validate_instance_cache(&self, cache_data: &InstanceCacheData) -> bool {
233 if cache_data.version != 1 {
235 return false;
236 }
237
238 let now = chrono::Utc::now();
240 let age = now.signed_duration_since(cache_data.timestamp);
241
242 if age.num_hours() > 24 {
243 debug!("Instance cache too old: {} hours", age.num_hours());
244 return false;
245 }
246
247 true
248 }
249
250 async fn perform_merge(
252 &self,
253 main_cache: &BootstrapCache,
254 instance_caches: Vec<InstanceCacheData>,
255 ) -> Result<MergeResult> {
256 let mut result = MergeResult::empty();
257 result.instances_processed = instance_caches.len();
258
259 let mut merged_contacts = main_cache.get_all_contacts().await;
261
262 for instance_cache in instance_caches {
264 let instance_result = self
265 .merge_single_instance(&mut merged_contacts, instance_cache)
266 .await?;
267 result.combine(instance_result);
268 }
269
270 main_cache.set_all_contacts(merged_contacts).await;
272
273 main_cache.save_to_disk().await?;
275
276 Ok(result)
277 }
278
279 async fn merge_single_instance(
281 &self,
282 main_contacts: &mut HashMap<PeerId, ContactEntry>,
283 instance_cache: InstanceCacheData,
284 ) -> Result<MergeResult> {
285 let mut result = MergeResult::empty();
286
287 for (peer_id, instance_contact) in instance_cache.contacts {
288 match main_contacts.get(&peer_id) {
289 Some(main_contact) => {
290 let resolved_contact =
292 self.resolve_conflict(main_contact, &instance_contact)?;
293
294 if resolved_contact.quality_metrics.quality_score
295 != main_contact.quality_metrics.quality_score
296 {
297 result.contacts_updated += 1;
298 result.conflicts_resolved += 1;
299 }
300
301 main_contacts.insert(peer_id, resolved_contact);
302 }
303 None => {
304 main_contacts.insert(peer_id, instance_contact);
306 result.contacts_added += 1;
307 }
308 }
309
310 result.contacts_merged += 1;
311 }
312
313 Ok(result)
314 }
315
316 fn resolve_conflict(
318 &self,
319 main_contact: &ContactEntry,
320 instance_contact: &ContactEntry,
321 ) -> Result<ContactEntry> {
322 match self.merge_strategy {
323 MergeStrategy::QualityBased => {
324 if instance_contact.quality_metrics.quality_score
325 > main_contact.quality_metrics.quality_score
326 {
327 Ok(instance_contact.clone())
328 } else {
329 Ok(main_contact.clone())
330 }
331 }
332
333 MergeStrategy::TimestampBased => {
334 if instance_contact.last_seen > main_contact.last_seen {
335 Ok(instance_contact.clone())
336 } else {
337 Ok(main_contact.clone())
338 }
339 }
340
341 MergeStrategy::MetricsCombined => {
342 self.combine_contact_metrics(main_contact, instance_contact)
343 }
344
345 MergeStrategy::SuccessRateBased => {
346 if instance_contact.quality_metrics.success_rate
347 > main_contact.quality_metrics.success_rate
348 {
349 Ok(instance_contact.clone())
350 } else {
351 Ok(main_contact.clone())
352 }
353 }
354 }
355 }
356
357 fn combine_contact_metrics(
359 &self,
360 main_contact: &ContactEntry,
361 instance_contact: &ContactEntry,
362 ) -> Result<ContactEntry> {
363 let mut combined_contact = main_contact.clone();
364
365 if instance_contact.last_seen > main_contact.last_seen {
367 combined_contact.last_seen = instance_contact.last_seen;
368 }
369
370 combined_contact.connection_history.total_attempts +=
372 instance_contact.connection_history.total_attempts;
373 combined_contact.connection_history.successful_connections +=
374 instance_contact.connection_history.successful_connections;
375 combined_contact.connection_history.failed_connections +=
376 instance_contact.connection_history.failed_connections;
377
378 for addr in &instance_contact.addresses {
380 if !combined_contact.addresses.contains(addr) {
381 combined_contact.addresses.push(*addr);
382 }
383 }
384
385 for capability in &instance_contact.capabilities {
387 if !combined_contact.capabilities.contains(capability) {
388 combined_contact.capabilities.push(capability.clone());
389 }
390 }
391
392 if instance_contact.reputation_score > combined_contact.reputation_score {
394 combined_contact.reputation_score = instance_contact.reputation_score;
395 }
396
397 combined_contact.ipv6_identity_verified =
399 combined_contact.ipv6_identity_verified || instance_contact.ipv6_identity_verified;
400
401 combined_contact.update_success_rate();
403 combined_contact.recalculate_quality_score();
404
405 Ok(combined_contact)
406 }
407
408 async fn cleanup_processed_caches(&self) -> Result<()> {
410 let cache_files = self.discover_instance_caches()?;
411 let mut cleaned_count = 0;
412
413 for cache_file in cache_files {
414 if let Some(process_id) = self.extract_process_id(&cache_file)
416 && !self.is_process_running(process_id)
417 {
418 if let Err(e) = std::fs::remove_file(&cache_file) {
420 warn!(
421 "Failed to remove old instance cache {:?}: {}",
422 cache_file, e
423 );
424 } else {
425 cleaned_count += 1;
426 }
427 }
428 }
429
430 if cleaned_count > 0 {
431 debug!("Cleaned up {} old instance cache files", cleaned_count);
432 }
433
434 Ok(())
435 }
436
437 fn extract_process_id(&self, cache_file: &std::path::Path) -> Option<u32> {
439 cache_file
440 .file_stem()
441 .and_then(|name| name.to_str())
442 .and_then(|name| {
443 let parts: Vec<&str> = name.split('_').collect();
444 if parts.len() >= 2 {
445 parts[0].parse().ok()
446 } else {
447 None
448 }
449 })
450 }
451
452 fn is_process_running(&self, process_id: u32) -> bool {
454 #[cfg(unix)]
455 {
456 use std::process::Command;
457 Command::new("kill")
458 .args(["-0", &process_id.to_string()])
459 .output()
460 .map(|output| output.status.success())
461 .unwrap_or(false)
462 }
463
464 #[cfg(windows)]
465 {
466 use std::process::Command;
467 Command::new("tasklist")
468 .args(["/FI", &format!("PID eq {}", process_id)])
469 .output()
470 .map(|output| {
471 String::from_utf8_lossy(&output.stdout).contains(&process_id.to_string())
472 })
473 .unwrap_or(false)
474 }
475
476 #[cfg(not(any(unix, windows)))]
477 {
478 true
480 }
481 }
482
483 pub fn get_strategy(&self) -> &MergeStrategy {
485 &self.merge_strategy
486 }
487
488 pub fn set_strategy(&mut self, strategy: MergeStrategy) {
490 self.merge_strategy = strategy;
491 }
492}
493
494impl MergeResult {
495 fn empty() -> Self {
497 Self {
498 contacts_merged: 0,
499 contacts_updated: 0,
500 contacts_added: 0,
501 conflicts_resolved: 0,
502 instances_processed: 0,
503 merge_duration_ms: 0,
504 }
505 }
506
507 fn combine(&mut self, other: MergeResult) {
509 self.contacts_merged += other.contacts_merged;
510 self.contacts_updated += other.contacts_updated;
511 self.contacts_added += other.contacts_added;
512 self.conflicts_resolved += other.conflicts_resolved;
513 }
514}
515
516impl std::fmt::Display for MergeResult {
517 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
518 write!(
519 f,
520 "MergeResult {{ merged: {}, updated: {}, added: {}, conflicts: {}, instances: {}, duration: {}ms }}",
521 self.contacts_merged,
522 self.contacts_updated,
523 self.contacts_added,
524 self.conflicts_resolved,
525 self.instances_processed,
526 self.merge_duration_ms
527 )
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use tempfile::TempDir;
535
536 #[tokio::test]
537 async fn test_merge_coordinator_creation() {
538 let temp_dir = TempDir::new().unwrap();
539 let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf());
540 assert!(coordinator.is_ok());
541 }
542
543 #[tokio::test]
544 async fn test_conflict_resolution_quality_based() {
545 let temp_dir = TempDir::new().unwrap();
546 let coordinator = MergeCoordinator::with_strategy(
547 temp_dir.path().to_path_buf(),
548 MergeStrategy::QualityBased,
549 )
550 .unwrap();
551
552 let mut main_contact = ContactEntry::new(
553 PeerId::from("test-peer"),
554 vec!["127.0.0.1:9000".parse().unwrap()],
555 );
556 main_contact.quality_metrics.quality_score = 0.5;
557
558 let mut instance_contact = ContactEntry::new(
559 PeerId::from("test-peer"),
560 vec!["127.0.0.1:9001".parse().unwrap()],
561 );
562 instance_contact.quality_metrics.quality_score = 0.8;
563
564 let resolved = coordinator
565 .resolve_conflict(&main_contact, &instance_contact)
566 .unwrap();
567 assert_eq!(resolved.quality_metrics.quality_score, 0.8);
568 }
569
570 #[tokio::test]
571 async fn test_metrics_combination() {
572 let temp_dir = TempDir::new().unwrap();
573 let coordinator = MergeCoordinator::with_strategy(
574 temp_dir.path().to_path_buf(),
575 MergeStrategy::MetricsCombined,
576 )
577 .unwrap();
578
579 let mut main_contact = ContactEntry::new(
580 PeerId::from("test-peer"),
581 vec!["127.0.0.1:9000".parse().unwrap()],
582 );
583 main_contact.connection_history.total_attempts = 10;
584 main_contact.connection_history.successful_connections = 8;
585
586 let mut instance_contact = ContactEntry::new(
587 PeerId::from("test-peer"),
588 vec!["127.0.0.1:9001".parse().unwrap()],
589 );
590 instance_contact.connection_history.total_attempts = 5;
591 instance_contact.connection_history.successful_connections = 4;
592
593 let combined = coordinator
594 .combine_contact_metrics(&main_contact, &instance_contact)
595 .unwrap();
596 assert_eq!(combined.connection_history.total_attempts, 15);
597 assert_eq!(combined.connection_history.successful_connections, 12);
598 }
599
600 #[test]
601 fn test_process_id_extraction() {
602 let temp_dir = TempDir::new().unwrap();
603 let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf()).unwrap();
604
605 let cache_file = PathBuf::from("12345_1234567890.cache");
606 let process_id = coordinator.extract_process_id(&cache_file);
607 assert_eq!(process_id, Some(12345));
608 }
609
610 #[test]
611 fn test_is_process_running_compilation() {
612 let temp_dir = TempDir::new().unwrap();
614 let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf()).unwrap();
615
616 let current_pid = std::process::id();
618 let is_running = coordinator.is_process_running(current_pid);
619
620 assert!(is_running, "Current process should be detected as running");
622
623 let non_existent_pid = 999999;
625 let is_not_running = coordinator.is_process_running(non_existent_pid);
626
627 let _ = is_not_running; }
631
632 #[test]
633 #[cfg(windows)]
634 fn test_windows_tasklist_command_format() {
635 use std::process::Command;
637
638 let process_id = std::process::id();
641 let filter_arg = format!("PID eq {}", process_id);
642
643 let result = Command::new("tasklist")
645 .args(["/FI", &filter_arg])
646 .output();
647
648 assert!(result.is_ok() || result.is_err(), "Command should either succeed or fail gracefully");
650 }
651}