saorsa_core/bootstrap/
merge.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Multi-Instance Cache Merge Coordination
15//!
16//! Handles conflict resolution and merging when multiple P2P Foundation instances
17//! are running locally and sharing the same bootstrap cache.
18
19use crate::bootstrap::{BootstrapCache, ContactEntry};
20use crate::error::BootstrapError;
21use crate::{P2PError, PeerId, Result};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::time::SystemTime;
26use tracing::{debug, info, warn};
27
28/// Merge coordinator for handling multi-instance cache coordination
29#[derive(Clone)]
30pub struct MergeCoordinator {
31    /// Main cache directory path
32    _cache_dir: PathBuf,
33    /// Directory containing instance-specific cache files
34    instance_cache_dir: PathBuf,
35    /// Strategy used for resolving merge conflicts
36    merge_strategy: MergeStrategy,
37}
38
39/// Strategy for resolving conflicts during merge operations
40#[derive(Debug, Clone)]
41pub enum MergeStrategy {
42    /// Use quality score to determine the best contact
43    QualityBased,
44    /// Use most recent timestamp
45    TimestampBased,
46    /// Combine metrics from both contacts
47    MetricsCombined,
48    /// Use success rate as primary factor
49    SuccessRateBased,
50}
51
52/// Instance cache data structure
53#[derive(Debug, Serialize, Deserialize)]
54struct InstanceCacheData {
55    instance_id: String,
56    timestamp: chrono::DateTime<chrono::Utc>,
57    process_id: u32,
58    contacts: HashMap<PeerId, ContactEntry>,
59    version: u32,
60}
61
62/// Merge operation result
63#[derive(Debug)]
64pub struct MergeResult {
65    /// Number of contacts that were merged from other instances
66    pub contacts_merged: usize,
67    /// Number of existing contacts that were updated
68    pub contacts_updated: usize,
69    /// Number of new contacts that were added
70    pub contacts_added: usize,
71    /// Number of conflicts that were resolved during merge
72    pub conflicts_resolved: usize,
73    /// Number of instance cache files that were processed
74    pub instances_processed: usize,
75    /// Total time taken for the merge operation in milliseconds
76    pub merge_duration_ms: u64,
77}
78
79/// Conflict resolution information
80#[derive(Debug)]
81#[allow(dead_code)]
82struct ConflictInfo {
83    peer_id: PeerId,
84    main_contact: ContactEntry,
85    instance_contact: ContactEntry,
86    resolution_strategy: MergeStrategy,
87}
88
89impl MergeCoordinator {
90    /// Create a new merge coordinator
91    pub fn new(cache_dir: PathBuf) -> Result<Self> {
92        let instance_cache_dir = cache_dir.join("instance_caches");
93
94        // Ensure instance cache directory exists
95        std::fs::create_dir_all(&instance_cache_dir).map_err(|e| {
96            P2PError::Bootstrap(BootstrapError::CacheError(
97                format!("Failed to create instance cache directory: {e}").into(),
98            ))
99        })?;
100
101        Ok(Self {
102            _cache_dir: cache_dir,
103            instance_cache_dir,
104            merge_strategy: MergeStrategy::QualityBased,
105        })
106    }
107
108    /// Create coordinator with custom merge strategy
109    pub fn with_strategy(cache_dir: PathBuf, strategy: MergeStrategy) -> Result<Self> {
110        let mut coordinator = Self::new(cache_dir)?;
111        coordinator.merge_strategy = strategy;
112        Ok(coordinator)
113    }
114
115    /// Merge all instance caches into the main cache
116    pub async fn merge_instance_caches(&self, main_cache: &BootstrapCache) -> Result<MergeResult> {
117        let merge_start = SystemTime::now();
118
119        debug!("Starting merge of instance caches");
120
121        // Discover all instance cache files
122        let instance_files = self.discover_instance_caches()?;
123
124        if instance_files.is_empty() {
125            debug!("No instance caches found to merge");
126            return Ok(MergeResult::empty());
127        }
128
129        // Load all instance caches
130        let instance_caches = self.load_instance_caches(instance_files).await?;
131
132        // Perform merge operation
133        let merge_result = self.perform_merge(main_cache, instance_caches).await?;
134
135        // Cleanup old instance caches
136        self.cleanup_processed_caches().await?;
137
138        let merge_duration = merge_start.elapsed().unwrap_or_default().as_millis() as u64;
139
140        info!(
141            "Merge completed: {} contacts processed, {} conflicts resolved in {}ms",
142            merge_result.contacts_merged, merge_result.conflicts_resolved, merge_duration
143        );
144
145        Ok(MergeResult {
146            merge_duration_ms: merge_duration,
147            ..merge_result
148        })
149    }
150
151    /// Discover all instance cache files
152    fn discover_instance_caches(&self) -> Result<Vec<PathBuf>> {
153        let mut cache_files = Vec::new();
154
155        if !self.instance_cache_dir.exists() {
156            return Ok(cache_files);
157        }
158
159        let entries = std::fs::read_dir(&self.instance_cache_dir).map_err(|e| {
160            P2PError::Bootstrap(BootstrapError::CacheError(
161                format!("Failed to read instance cache directory: {e}").into(),
162            ))
163        })?;
164
165        for entry in entries {
166            let entry = entry?;
167            let path = entry.path();
168
169            if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("cache") {
170                // Check if the process is still running
171                if let Some(process_id) = self.extract_process_id(&path) {
172                    if self.is_process_running(process_id) {
173                        cache_files.push(path);
174                    } else {
175                        // Process is dead, we can safely include this cache
176                        cache_files.push(path);
177                    }
178                }
179            }
180        }
181
182        debug!("Discovered {} instance cache files", cache_files.len());
183
184        Ok(cache_files)
185    }
186
187    /// Load instance caches from files
188    async fn load_instance_caches(
189        &self,
190        cache_files: Vec<PathBuf>,
191    ) -> Result<Vec<InstanceCacheData>> {
192        let mut instance_caches = Vec::new();
193
194        for cache_file in cache_files {
195            match self.load_instance_cache(&cache_file).await {
196                Ok(cache_data) => {
197                    if self.validate_instance_cache(&cache_data) {
198                        instance_caches.push(cache_data);
199                    } else {
200                        warn!("Invalid instance cache found: {:?}", cache_file);
201                    }
202                }
203                Err(e) => {
204                    warn!("Failed to load instance cache {:?}: {}", cache_file, e);
205                }
206            }
207        }
208
209        debug!("Loaded {} valid instance caches", instance_caches.len());
210
211        Ok(instance_caches)
212    }
213
214    /// Load a single instance cache
215    async fn load_instance_cache(&self, cache_file: &PathBuf) -> Result<InstanceCacheData> {
216        let json_data = std::fs::read_to_string(cache_file).map_err(|e| {
217            P2PError::Bootstrap(BootstrapError::CacheError(
218                format!("Failed to read instance cache: {e}").into(),
219            ))
220        })?;
221
222        let cache_data: InstanceCacheData = serde_json::from_str(&json_data).map_err(|e| {
223            P2PError::Bootstrap(BootstrapError::InvalidData(
224                format!("Failed to parse instance cache: {e}").into(),
225            ))
226        })?;
227
228        Ok(cache_data)
229    }
230
231    /// Validate instance cache data
232    fn validate_instance_cache(&self, cache_data: &InstanceCacheData) -> bool {
233        // Check version compatibility
234        if cache_data.version != 1 {
235            return false;
236        }
237
238        // Check timestamp is reasonable (not too old)
239        let now = chrono::Utc::now();
240        let age = now.signed_duration_since(cache_data.timestamp);
241
242        if age.num_hours() > 24 {
243            debug!("Instance cache too old: {} hours", age.num_hours());
244            return false;
245        }
246
247        true
248    }
249
250    /// Perform the actual merge operation
251    async fn perform_merge(
252        &self,
253        main_cache: &BootstrapCache,
254        instance_caches: Vec<InstanceCacheData>,
255    ) -> Result<MergeResult> {
256        let mut result = MergeResult::empty();
257        result.instances_processed = instance_caches.len();
258
259        // Get current main cache contacts
260        let mut merged_contacts = main_cache.get_all_contacts().await;
261
262        // Process each instance cache
263        for instance_cache in instance_caches {
264            let instance_result = self
265                .merge_single_instance(&mut merged_contacts, instance_cache)
266                .await?;
267            result.combine(instance_result);
268        }
269
270        // Update main cache with merged contacts
271        main_cache.set_all_contacts(merged_contacts).await;
272
273        // Save updated cache
274        main_cache.save_to_disk().await?;
275
276        Ok(result)
277    }
278
279    /// Merge a single instance cache
280    async fn merge_single_instance(
281        &self,
282        main_contacts: &mut HashMap<PeerId, ContactEntry>,
283        instance_cache: InstanceCacheData,
284    ) -> Result<MergeResult> {
285        let mut result = MergeResult::empty();
286
287        for (peer_id, instance_contact) in instance_cache.contacts {
288            match main_contacts.get(&peer_id) {
289                Some(main_contact) => {
290                    // Contact exists in main cache, resolve conflict
291                    let resolved_contact =
292                        self.resolve_conflict(main_contact, &instance_contact)?;
293
294                    if resolved_contact.quality_metrics.quality_score
295                        != main_contact.quality_metrics.quality_score
296                    {
297                        result.contacts_updated += 1;
298                        result.conflicts_resolved += 1;
299                    }
300
301                    main_contacts.insert(peer_id, resolved_contact);
302                }
303                None => {
304                    // New contact, add to main cache
305                    main_contacts.insert(peer_id, instance_contact);
306                    result.contacts_added += 1;
307                }
308            }
309
310            result.contacts_merged += 1;
311        }
312
313        Ok(result)
314    }
315
316    /// Resolve conflict between two contact entries
317    fn resolve_conflict(
318        &self,
319        main_contact: &ContactEntry,
320        instance_contact: &ContactEntry,
321    ) -> Result<ContactEntry> {
322        match self.merge_strategy {
323            MergeStrategy::QualityBased => {
324                if instance_contact.quality_metrics.quality_score
325                    > main_contact.quality_metrics.quality_score
326                {
327                    Ok(instance_contact.clone())
328                } else {
329                    Ok(main_contact.clone())
330                }
331            }
332
333            MergeStrategy::TimestampBased => {
334                if instance_contact.last_seen > main_contact.last_seen {
335                    Ok(instance_contact.clone())
336                } else {
337                    Ok(main_contact.clone())
338                }
339            }
340
341            MergeStrategy::MetricsCombined => {
342                self.combine_contact_metrics(main_contact, instance_contact)
343            }
344
345            MergeStrategy::SuccessRateBased => {
346                if instance_contact.quality_metrics.success_rate
347                    > main_contact.quality_metrics.success_rate
348                {
349                    Ok(instance_contact.clone())
350                } else {
351                    Ok(main_contact.clone())
352                }
353            }
354        }
355    }
356
357    /// Combine metrics from two contacts
358    fn combine_contact_metrics(
359        &self,
360        main_contact: &ContactEntry,
361        instance_contact: &ContactEntry,
362    ) -> Result<ContactEntry> {
363        let mut combined_contact = main_contact.clone();
364
365        // Use the most recent timestamp
366        if instance_contact.last_seen > main_contact.last_seen {
367            combined_contact.last_seen = instance_contact.last_seen;
368        }
369
370        // Combine connection history
371        combined_contact.connection_history.total_attempts +=
372            instance_contact.connection_history.total_attempts;
373        combined_contact.connection_history.successful_connections +=
374            instance_contact.connection_history.successful_connections;
375        combined_contact.connection_history.failed_connections +=
376            instance_contact.connection_history.failed_connections;
377
378        // Update addresses (union of both sets)
379        for addr in &instance_contact.addresses {
380            if !combined_contact.addresses.contains(addr) {
381                combined_contact.addresses.push(*addr);
382            }
383        }
384
385        // Update capabilities (union of both sets)
386        for capability in &instance_contact.capabilities {
387            if !combined_contact.capabilities.contains(capability) {
388                combined_contact.capabilities.push(capability.clone());
389            }
390        }
391
392        // Use higher reputation score
393        if instance_contact.reputation_score > combined_contact.reputation_score {
394            combined_contact.reputation_score = instance_contact.reputation_score;
395        }
396
397        // Use verified status if either is verified
398        combined_contact.ipv6_identity_verified =
399            combined_contact.ipv6_identity_verified || instance_contact.ipv6_identity_verified;
400
401        // Recalculate quality metrics
402        combined_contact.update_success_rate();
403        combined_contact.recalculate_quality_score();
404
405        Ok(combined_contact)
406    }
407
408    /// Cleanup processed instance caches
409    async fn cleanup_processed_caches(&self) -> Result<()> {
410        let cache_files = self.discover_instance_caches()?;
411        let mut cleaned_count = 0;
412
413        for cache_file in cache_files {
414            // Check if process is still running
415            if let Some(process_id) = self.extract_process_id(&cache_file)
416                && !self.is_process_running(process_id)
417            {
418                // Process is dead, safe to remove cache
419                if let Err(e) = std::fs::remove_file(&cache_file) {
420                    warn!(
421                        "Failed to remove old instance cache {:?}: {}",
422                        cache_file, e
423                    );
424                } else {
425                    cleaned_count += 1;
426                }
427            }
428        }
429
430        if cleaned_count > 0 {
431            debug!("Cleaned up {} old instance cache files", cleaned_count);
432        }
433
434        Ok(())
435    }
436
437    /// Extract process ID from cache file name
438    fn extract_process_id(&self, cache_file: &std::path::Path) -> Option<u32> {
439        cache_file
440            .file_stem()
441            .and_then(|name| name.to_str())
442            .and_then(|name| {
443                let parts: Vec<&str> = name.split('_').collect();
444                if parts.len() >= 2 {
445                    parts[0].parse().ok()
446                } else {
447                    None
448                }
449            })
450    }
451
452    /// Check if a process is still running
453    fn is_process_running(&self, process_id: u32) -> bool {
454        #[cfg(unix)]
455        {
456            use std::process::Command;
457            Command::new("kill")
458                .args(["-0", &process_id.to_string()])
459                .output()
460                .map(|output| output.status.success())
461                .unwrap_or(false)
462        }
463
464        #[cfg(windows)]
465        {
466            use std::process::Command;
467            Command::new("tasklist")
468                .args(["/FI", &format!("PID eq {}", process_id)])
469                .output()
470                .map(|output| {
471                    String::from_utf8_lossy(&output.stdout).contains(&process_id.to_string())
472                })
473                .unwrap_or(false)
474        }
475
476        #[cfg(not(any(unix, windows)))]
477        {
478            // Assume process is still running if we can't check
479            true
480        }
481    }
482
483    /// Get merge strategy
484    pub fn get_strategy(&self) -> &MergeStrategy {
485        &self.merge_strategy
486    }
487
488    /// Set merge strategy
489    pub fn set_strategy(&mut self, strategy: MergeStrategy) {
490        self.merge_strategy = strategy;
491    }
492}
493
494impl MergeResult {
495    /// Create empty merge result
496    fn empty() -> Self {
497        Self {
498            contacts_merged: 0,
499            contacts_updated: 0,
500            contacts_added: 0,
501            conflicts_resolved: 0,
502            instances_processed: 0,
503            merge_duration_ms: 0,
504        }
505    }
506
507    /// Combine with another merge result
508    fn combine(&mut self, other: MergeResult) {
509        self.contacts_merged += other.contacts_merged;
510        self.contacts_updated += other.contacts_updated;
511        self.contacts_added += other.contacts_added;
512        self.conflicts_resolved += other.conflicts_resolved;
513    }
514}
515
516impl std::fmt::Display for MergeResult {
517    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
518        write!(
519            f,
520            "MergeResult {{ merged: {}, updated: {}, added: {}, conflicts: {}, instances: {}, duration: {}ms }}",
521            self.contacts_merged,
522            self.contacts_updated,
523            self.contacts_added,
524            self.conflicts_resolved,
525            self.instances_processed,
526            self.merge_duration_ms
527        )
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use tempfile::TempDir;
535
536    #[tokio::test]
537    async fn test_merge_coordinator_creation() {
538        let temp_dir = TempDir::new().unwrap();
539        let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf());
540        assert!(coordinator.is_ok());
541    }
542
543    #[tokio::test]
544    async fn test_conflict_resolution_quality_based() {
545        let temp_dir = TempDir::new().unwrap();
546        let coordinator = MergeCoordinator::with_strategy(
547            temp_dir.path().to_path_buf(),
548            MergeStrategy::QualityBased,
549        )
550        .unwrap();
551
552        let mut main_contact = ContactEntry::new(
553            PeerId::from("test-peer"),
554            vec!["127.0.0.1:9000".parse().unwrap()],
555        );
556        main_contact.quality_metrics.quality_score = 0.5;
557
558        let mut instance_contact = ContactEntry::new(
559            PeerId::from("test-peer"),
560            vec!["127.0.0.1:9001".parse().unwrap()],
561        );
562        instance_contact.quality_metrics.quality_score = 0.8;
563
564        let resolved = coordinator
565            .resolve_conflict(&main_contact, &instance_contact)
566            .unwrap();
567        assert_eq!(resolved.quality_metrics.quality_score, 0.8);
568    }
569
570    #[tokio::test]
571    async fn test_metrics_combination() {
572        let temp_dir = TempDir::new().unwrap();
573        let coordinator = MergeCoordinator::with_strategy(
574            temp_dir.path().to_path_buf(),
575            MergeStrategy::MetricsCombined,
576        )
577        .unwrap();
578
579        let mut main_contact = ContactEntry::new(
580            PeerId::from("test-peer"),
581            vec!["127.0.0.1:9000".parse().unwrap()],
582        );
583        main_contact.connection_history.total_attempts = 10;
584        main_contact.connection_history.successful_connections = 8;
585
586        let mut instance_contact = ContactEntry::new(
587            PeerId::from("test-peer"),
588            vec!["127.0.0.1:9001".parse().unwrap()],
589        );
590        instance_contact.connection_history.total_attempts = 5;
591        instance_contact.connection_history.successful_connections = 4;
592
593        let combined = coordinator
594            .combine_contact_metrics(&main_contact, &instance_contact)
595            .unwrap();
596        assert_eq!(combined.connection_history.total_attempts, 15);
597        assert_eq!(combined.connection_history.successful_connections, 12);
598    }
599
600    #[test]
601    fn test_process_id_extraction() {
602        let temp_dir = TempDir::new().unwrap();
603        let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf()).unwrap();
604
605        let cache_file = PathBuf::from("12345_1234567890.cache");
606        let process_id = coordinator.extract_process_id(&cache_file);
607        assert_eq!(process_id, Some(12345));
608    }
609
610    #[test]
611    fn test_is_process_running_compilation() {
612        // This test ensures the is_process_running function compiles correctly on all platforms
613        let temp_dir = TempDir::new().unwrap();
614        let coordinator = MergeCoordinator::new(temp_dir.path().to_path_buf()).unwrap();
615
616        // Test with current process ID (should be running)
617        let current_pid = std::process::id();
618        let is_running = coordinator.is_process_running(current_pid);
619
620        // On all platforms, the current process should be detected as running
621        assert!(is_running, "Current process should be detected as running");
622
623        // Test with a very high PID that's unlikely to exist
624        let non_existent_pid = 999999;
625        let is_not_running = coordinator.is_process_running(non_existent_pid);
626
627        // This may vary by platform, but we're mainly testing compilation here
628        // The actual behavior is platform-specific
629        let _ = is_not_running; // Just ensure it compiles and runs
630    }
631
632    #[test]
633    #[cfg(windows)]
634    fn test_windows_tasklist_command_format() {
635        // This test specifically validates the Windows command format
636        use std::process::Command;
637
638        // Test that the command format compiles and can be executed
639        // We're not testing the actual process detection, just the command syntax
640        let process_id = std::process::id();
641        let filter_arg = format!("PID eq {}", process_id);
642
643        // This should compile without the .into() bug
644        let result = Command::new("tasklist")
645            .args(["/FI", &filter_arg])
646            .output();
647
648        // The command should at least execute (even if tasklist isn't available in test env)
649        assert!(result.is_ok() || result.is_err(), "Command should either succeed or fail gracefully");
650    }
651}