ant_core/bootstrap/
merge.rs

1//! Multi-Instance Cache Merge Coordination
2//!
3//! Handles conflict resolution and merging when multiple P2P Foundation instances
4//! are running locally and sharing the same bootstrap cache.
5
6use crate::{PeerId, Result, P2PError};
7use crate::bootstrap::{ContactEntry, BootstrapCache};
8use std::collections::HashMap;
9use std::path::PathBuf;
10use std::time::SystemTime;
11use serde::{Deserialize, Serialize};
12use tracing::{debug, info, warn};
13
14/// Merge coordinator for handling multi-instance cache coordination
15#[derive(Clone)]
16pub struct MergeCoordinator {
17    /// Main cache directory path
18    cache_dir: PathBuf,
19    /// Directory containing instance-specific cache files
20    instance_cache_dir: PathBuf,
21    /// Strategy used for resolving merge conflicts
22    merge_strategy: MergeStrategy,
23}
24
25/// Strategy for resolving conflicts during merge operations
26#[derive(Debug, Clone)]
27pub enum MergeStrategy {
28    /// Use quality score to determine the best contact
29    QualityBased,
30    /// Use most recent timestamp
31    TimestampBased,
32    /// Combine metrics from both contacts
33    MetricsCombined,
34    /// Use success rate as primary factor
35    SuccessRateBased,
36}
37
38/// Instance cache data structure
39#[derive(Debug, Serialize, Deserialize)]
40struct InstanceCacheData {
41    instance_id: String,
42    timestamp: chrono::DateTime<chrono::Utc>,
43    process_id: u32,
44    contacts: HashMap<PeerId, ContactEntry>,
45    version: u32,
46}
47
48/// Merge operation result
49#[derive(Debug)]
50pub struct MergeResult {
51    /// Number of contacts that were merged from other instances
52    pub contacts_merged: usize,
53    /// Number of existing contacts that were updated
54    pub contacts_updated: usize,
55    /// Number of new contacts that were added
56    pub contacts_added: usize,
57    /// Number of conflicts that were resolved during merge
58    pub conflicts_resolved: usize,
59    /// Number of instance cache files that were processed
60    pub instances_processed: usize,
61    /// Total time taken for the merge operation in milliseconds
62    pub merge_duration_ms: u64,
63}
64
65/// Conflict resolution information
66#[derive(Debug)]
67struct ConflictInfo {
68    peer_id: PeerId,
69    main_contact: ContactEntry,
70    instance_contact: ContactEntry,
71    resolution_strategy: MergeStrategy,
72}
73
74impl MergeCoordinator {
75    /// Create a new merge coordinator
76    pub fn new(cache_dir: PathBuf) -> Result<Self> {
77        let instance_cache_dir = cache_dir.join("instance_caches");
78        
79        // Ensure instance cache directory exists
80        std::fs::create_dir_all(&instance_cache_dir)
81            .map_err(|e| P2PError::Bootstrap(format!("Failed to create instance cache directory: {}", e)))?;
82        
83        Ok(Self {
84            cache_dir,
85            instance_cache_dir,
86            merge_strategy: MergeStrategy::QualityBased,
87        })
88    }
89    
90    /// Create coordinator with custom merge strategy
91    pub fn with_strategy(cache_dir: PathBuf, strategy: MergeStrategy) -> Result<Self> {
92        let mut coordinator = Self::new(cache_dir)?;
93        coordinator.merge_strategy = strategy;
94        Ok(coordinator)
95    }
96    
97    /// Merge all instance caches into the main cache
98    pub async fn merge_instance_caches(&self, main_cache: &BootstrapCache) -> Result<MergeResult> {
99        let merge_start = SystemTime::now();
100        
101        debug!("Starting merge of instance caches");
102        
103        // Discover all instance cache files
104        let instance_files = self.discover_instance_caches()?;
105        
106        if instance_files.is_empty() {
107            debug!("No instance caches found to merge");
108            return Ok(MergeResult::empty());
109        }
110        
111        // Load all instance caches
112        let instance_caches = self.load_instance_caches(instance_files).await?;
113        
114        // Perform merge operation
115        let merge_result = self.perform_merge(main_cache, instance_caches).await?;
116        
117        // Cleanup old instance caches
118        self.cleanup_processed_caches().await?;
119        
120        let merge_duration = merge_start.elapsed()
121            .unwrap_or_default()
122            .as_millis() as u64;
123        
124        info!("Merge completed: {} contacts processed, {} conflicts resolved in {}ms",
125              merge_result.contacts_merged, merge_result.conflicts_resolved, merge_duration);
126        
127        Ok(MergeResult {
128            merge_duration_ms: merge_duration,
129            ..merge_result
130        })
131    }
132    
133    /// Discover all instance cache files
134    fn discover_instance_caches(&self) -> Result<Vec<PathBuf>> {
135        let mut cache_files = Vec::new();
136        
137        if !self.instance_cache_dir.exists() {
138            return Ok(cache_files);
139        }
140        
141        let entries = std::fs::read_dir(&self.instance_cache_dir)
142            .map_err(|e| P2PError::Bootstrap(format!("Failed to read instance cache directory: {}", e)))?;
143        
144        for entry in entries {
145            let entry = entry?;
146            let path = entry.path();
147            
148            if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("cache") {
149                // Check if the process is still running
150                if let Some(process_id) = self.extract_process_id(&path) {
151                    if self.is_process_running(process_id) {
152                        cache_files.push(path);
153                    } else {
154                        // Process is dead, we can safely include this cache
155                        cache_files.push(path);
156                    }
157                }
158            }
159        }
160        
161        debug!("Discovered {} instance cache files", cache_files.len());
162        
163        Ok(cache_files)
164    }
165    
166    /// Load instance caches from files
167    async fn load_instance_caches(&self, cache_files: Vec<PathBuf>) -> Result<Vec<InstanceCacheData>> {
168        let mut instance_caches = Vec::new();
169        
170        for cache_file in cache_files {
171            match self.load_instance_cache(&cache_file).await {
172                Ok(cache_data) => {
173                    if self.validate_instance_cache(&cache_data) {
174                        instance_caches.push(cache_data);
175                    } else {
176                        warn!("Invalid instance cache found: {:?}", cache_file);
177                    }
178                }
179                Err(e) => {
180                    warn!("Failed to load instance cache {:?}: {}", cache_file, e);
181                }
182            }
183        }
184        
185        debug!("Loaded {} valid instance caches", instance_caches.len());
186        
187        Ok(instance_caches)
188    }
189    
190    /// Load a single instance cache
191    async fn load_instance_cache(&self, cache_file: &PathBuf) -> Result<InstanceCacheData> {
192        let json_data = std::fs::read_to_string(cache_file)
193            .map_err(|e| P2PError::Bootstrap(format!("Failed to read instance cache: {}", e)))?;
194        
195        let cache_data: InstanceCacheData = serde_json::from_str(&json_data)
196            .map_err(|e| P2PError::Bootstrap(format!("Failed to parse instance cache: {}", e)))?;
197        
198        Ok(cache_data)
199    }
200    
201    /// Validate instance cache data
202    fn validate_instance_cache(&self, cache_data: &InstanceCacheData) -> bool {
203        // Check version compatibility
204        if cache_data.version != 1 {
205            return false;
206        }
207        
208        // Check timestamp is reasonable (not too old)
209        let now = chrono::Utc::now();
210        let age = now.signed_duration_since(cache_data.timestamp);
211        
212        if age.num_hours() > 24 {
213            debug!("Instance cache too old: {} hours", age.num_hours());
214            return false;
215        }
216        
217        true
218    }
219    
220    /// Perform the actual merge operation
221    async fn perform_merge(&self, main_cache: &BootstrapCache, instance_caches: Vec<InstanceCacheData>) -> Result<MergeResult> {
222        let mut result = MergeResult::empty();
223        result.instances_processed = instance_caches.len();
224        
225        // Get current main cache contacts
226        let mut merged_contacts = main_cache.get_all_contacts().await;
227        
228        // Process each instance cache
229        for instance_cache in instance_caches {
230            let instance_result = self.merge_single_instance(&mut merged_contacts, instance_cache).await?;
231            result.combine(instance_result);
232        }
233        
234        // Update main cache with merged contacts
235        main_cache.set_all_contacts(merged_contacts).await;
236        
237        // Save updated cache
238        main_cache.save_to_disk().await?;
239        
240        Ok(result)
241    }
242    
243    /// Merge a single instance cache
244    async fn merge_single_instance(&self, main_contacts: &mut HashMap<PeerId, ContactEntry>, instance_cache: InstanceCacheData) -> Result<MergeResult> {
245        let mut result = MergeResult::empty();
246        
247        for (peer_id, instance_contact) in instance_cache.contacts {
248            match main_contacts.get(&peer_id) {
249                Some(main_contact) => {
250                    // Contact exists in main cache, resolve conflict
251                    let resolved_contact = self.resolve_conflict(main_contact, &instance_contact)?;
252                    
253                    if resolved_contact.quality_metrics.quality_score != main_contact.quality_metrics.quality_score {
254                        result.contacts_updated += 1;
255                        result.conflicts_resolved += 1;
256                    }
257                    
258                    main_contacts.insert(peer_id, resolved_contact);
259                }
260                None => {
261                    // New contact, add to main cache
262                    main_contacts.insert(peer_id, instance_contact);
263                    result.contacts_added += 1;
264                }
265            }
266            
267            result.contacts_merged += 1;
268        }
269        
270        Ok(result)
271    }
272    
273    /// Resolve conflict between two contact entries
274    fn resolve_conflict(&self, main_contact: &ContactEntry, instance_contact: &ContactEntry) -> Result<ContactEntry> {
275        match self.merge_strategy {
276            MergeStrategy::QualityBased => {
277                if instance_contact.quality_metrics.quality_score > main_contact.quality_metrics.quality_score {
278                    Ok(instance_contact.clone())
279                } else {
280                    Ok(main_contact.clone())
281                }
282            }
283            
284            MergeStrategy::TimestampBased => {
285                if instance_contact.last_seen > main_contact.last_seen {
286                    Ok(instance_contact.clone())
287                } else {
288                    Ok(main_contact.clone())
289                }
290            }
291            
292            MergeStrategy::MetricsCombined => {
293                self.combine_contact_metrics(main_contact, instance_contact)
294            }
295            
296            MergeStrategy::SuccessRateBased => {
297                if instance_contact.quality_metrics.success_rate > main_contact.quality_metrics.success_rate {
298                    Ok(instance_contact.clone())
299                } else {
300                    Ok(main_contact.clone())
301                }
302            }
303        }
304    }
305    
306    /// Combine metrics from two contacts
307    fn combine_contact_metrics(&self, main_contact: &ContactEntry, instance_contact: &ContactEntry) -> Result<ContactEntry> {
308        let mut combined_contact = main_contact.clone();
309        
310        // Use the most recent timestamp
311        if instance_contact.last_seen > main_contact.last_seen {
312            combined_contact.last_seen = instance_contact.last_seen;
313        }
314        
315        // Combine connection history
316        combined_contact.connection_history.total_attempts += 
317            instance_contact.connection_history.total_attempts;
318        combined_contact.connection_history.successful_connections += 
319            instance_contact.connection_history.successful_connections;
320        combined_contact.connection_history.failed_connections += 
321            instance_contact.connection_history.failed_connections;
322        
323        // Update addresses (union of both sets)
324        for addr in &instance_contact.addresses {
325            if !combined_contact.addresses.contains(addr) {
326                combined_contact.addresses.push(addr.clone());
327            }
328        }
329        
330        // Update capabilities (union of both sets)
331        for capability in &instance_contact.capabilities {
332            if !combined_contact.capabilities.contains(capability) {
333                combined_contact.capabilities.push(capability.clone());
334            }
335        }
336        
337        // Use higher reputation score
338        if instance_contact.reputation_score > combined_contact.reputation_score {
339            combined_contact.reputation_score = instance_contact.reputation_score;
340        }
341        
342        // Use verified status if either is verified
343        combined_contact.ipv6_identity_verified = 
344            combined_contact.ipv6_identity_verified || instance_contact.ipv6_identity_verified;
345        
346        // Recalculate quality metrics
347        combined_contact.update_success_rate();
348        combined_contact.recalculate_quality_score();
349        
350        Ok(combined_contact)
351    }
352    
353    /// Cleanup processed instance caches
354    async fn cleanup_processed_caches(&self) -> Result<()> {
355        let cache_files = self.discover_instance_caches()?;
356        let mut cleaned_count = 0;
357        
358        for cache_file in cache_files {
359            // Check if process is still running
360            if let Some(process_id) = self.extract_process_id(&cache_file) {
361                if !self.is_process_running(process_id) {
362                    // Process is dead, safe to remove cache
363                    if let Err(e) = std::fs::remove_file(&cache_file) {
364                        warn!("Failed to remove old instance cache {:?}: {}", cache_file, e);
365                    } else {
366                        cleaned_count += 1;
367                    }
368                }
369            }
370        }
371        
372        if cleaned_count > 0 {
373            debug!("Cleaned up {} old instance cache files", cleaned_count);
374        }
375        
376        Ok(())
377    }
378    
379    /// Extract process ID from cache file name
380    fn extract_process_id(&self, cache_file: &PathBuf) -> Option<u32> {
381        cache_file
382            .file_stem()
383            .and_then(|name| name.to_str())
384            .and_then(|name| {
385                let parts: Vec<&str> = name.split('_').collect();
386                if parts.len() >= 2 {
387                    parts[0].parse().ok()
388                } else {
389                    None
390                }
391            })
392    }
393    
394    /// Check if a process is still running
395    fn is_process_running(&self, process_id: u32) -> bool {
396        #[cfg(unix)]
397        {
398            use std::process::Command;
399            Command::new("kill")
400                .args(&["-0", &process_id.to_string()])
401                .output()
402                .map(|output| output.status.success())
403                .unwrap_or(false)
404        }
405        
406        #[cfg(windows)]
407        {
408            use std::process::Command;
409            Command::new("tasklist")
410                .args(&["/FI", &format!("PID eq {}", process_id)])
411                .output()
412                .map(|output| {
413                    String::from_utf8_lossy(&output.stdout)
414                        .contains(&process_id.to_string())
415                })
416                .unwrap_or(false)
417        }
418        
419        #[cfg(not(any(unix, windows)))]
420        {
421            // Assume process is still running if we can't check
422            true
423        }
424    }
425    
426    /// Get merge strategy
427    pub fn get_strategy(&self) -> &MergeStrategy {
428        &self.merge_strategy
429    }
430    
431    /// Set merge strategy
432    pub fn set_strategy(&mut self, strategy: MergeStrategy) {
433        self.merge_strategy = strategy;
434    }
435}
436
437impl MergeResult {
438    /// Create empty merge result
439    fn empty() -> Self {
440        Self {
441            contacts_merged: 0,
442            contacts_updated: 0,
443            contacts_added: 0,
444            conflicts_resolved: 0,
445            instances_processed: 0,
446            merge_duration_ms: 0,
447        }
448    }
449    
450    /// Combine with another merge result
451    fn combine(&mut self, other: MergeResult) {
452        self.contacts_merged += other.contacts_merged;
453        self.contacts_updated += other.contacts_updated;
454        self.contacts_added += other.contacts_added;
455        self.conflicts_resolved += other.conflicts_resolved;
456    }
457}
458
459impl std::fmt::Display for MergeResult {
460    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
461        write!(
462            f,
463            "MergeResult {{ merged: {}, updated: {}, added: {}, conflicts: {}, instances: {}, duration: {}ms }}",
464            self.contacts_merged,
465            self.contacts_updated,
466            self.contacts_added,
467            self.conflicts_resolved,
468            self.instances_processed,
469            self.merge_duration_ms
470        )
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477    use tempfile::TempDir;
478
479    #[tokio::test]
480    async fn test_merge_coordinator_creation() {
481        let temp_dir = TempDir::new().unwrap();
482        let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf());
483        assert!(coordinator.is_ok());
484    }
485    
486    #[tokio::test]
487    async fn test_conflict_resolution_quality_based() {
488        let temp_dir = TempDir::new().unwrap();
489        let coordinator = MergeCoordinator::with_strategy(
490            temp_dir.path().to_path_buf(),
491            MergeStrategy::QualityBased
492        ).unwrap();
493        
494        let mut main_contact = ContactEntry::new(
495            PeerId::from("test-peer"),
496            vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
497        );
498        main_contact.quality_metrics.quality_score = 0.5;
499        
500        let mut instance_contact = ContactEntry::new(
501            PeerId::from("test-peer"),
502            vec!["/ip4/127.0.0.1/tcp/9001".to_string()]
503        );
504        instance_contact.quality_metrics.quality_score = 0.8;
505        
506        let resolved = coordinator.resolve_conflict(&main_contact, &instance_contact).unwrap();
507        assert_eq!(resolved.quality_metrics.quality_score, 0.8);
508    }
509    
510    #[tokio::test]
511    async fn test_metrics_combination() {
512        let temp_dir = TempDir::new().unwrap();
513        let coordinator = MergeCoordinator::with_strategy(
514            temp_dir.path().to_path_buf(),
515            MergeStrategy::MetricsCombined
516        ).unwrap();
517        
518        let mut main_contact = ContactEntry::new(
519            PeerId::from("test-peer"),
520            vec!["/ip4/127.0.0.1/tcp/9000".to_string()]
521        );
522        main_contact.connection_history.total_attempts = 10;
523        main_contact.connection_history.successful_connections = 8;
524        
525        let mut instance_contact = ContactEntry::new(
526            PeerId::from("test-peer"),
527            vec!["/ip4/127.0.0.1/tcp/9001".to_string()]
528        );
529        instance_contact.connection_history.total_attempts = 5;
530        instance_contact.connection_history.successful_connections = 4;
531        
532        let combined = coordinator.combine_contact_metrics(&main_contact, &instance_contact).unwrap();
533        assert_eq!(combined.connection_history.total_attempts, 15);
534        assert_eq!(combined.connection_history.successful_connections, 12);
535    }
536    
537    #[test]
538    fn test_process_id_extraction() {
539        let temp_dir = TempDir::new().unwrap();
540        let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf()).unwrap();
541        
542        let cache_file = PathBuf::from("12345_1234567890.cache");
543        let process_id = coordinator.extract_process_id(&cache_file);
544        assert_eq!(process_id, Some(12345));
545    }
546}