ipfrs_network/
connection_migration.rs

1//! QUIC Connection Migration for Mobile Support
2//!
3//! This module implements QUIC connection migration to handle network changes
4//! seamlessly, particularly important for mobile devices switching between
5//! WiFi, cellular, and other network interfaces.
6//!
7//! ## Features
8//!
9//! - Automatic detection of network interface changes
10//! - Seamless connection migration without data loss
11//! - State preservation during migration
12//! - Retry logic for failed migrations
13//! - Statistics tracking for migration events
14//!
15//! ## Use Cases
16//!
17//! - Mobile devices switching from WiFi to cellular
18//! - Laptops moving between networks
19//! - Devices with multiple network interfaces
20//! - Network interface failures requiring fallback
21
22use dashmap::DashMap;
23use libp2p::{Multiaddr, PeerId};
24use parking_lot::RwLock;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use thiserror::Error;
28use tracing::{debug, info, warn};
29
30/// Errors that can occur during connection migration
31#[derive(Debug, Error)]
32pub enum MigrationError {
33    #[error("No active connection to migrate")]
34    NoActiveConnection,
35
36    #[error("Migration already in progress")]
37    MigrationInProgress,
38
39    #[error("Migration failed: {0}")]
40    MigrationFailed(String),
41
42    #[error("Timeout during migration")]
43    MigrationTimeout,
44
45    #[error("Invalid migration state")]
46    InvalidState,
47
48    #[error("No suitable migration path available")]
49    NoMigrationPath,
50}
51
52/// Connection migration configuration
53#[derive(Debug, Clone)]
54pub struct MigrationConfig {
55    /// Enable automatic migration on network changes
56    pub auto_migrate: bool,
57
58    /// Maximum time to wait for migration to complete
59    pub migration_timeout: Duration,
60
61    /// Maximum number of migration retry attempts
62    pub max_retry_attempts: usize,
63
64    /// Backoff duration between retry attempts
65    pub retry_backoff: Duration,
66
67    /// Minimum time between migrations for the same connection
68    pub migration_cooldown: Duration,
69
70    /// Keep old path alive during migration (default: true for safety)
71    pub keep_old_path: bool,
72
73    /// Validate new path before closing old path
74    pub validate_new_path: bool,
75}
76
77impl Default for MigrationConfig {
78    fn default() -> Self {
79        Self {
80            auto_migrate: true,
81            migration_timeout: Duration::from_secs(30),
82            max_retry_attempts: 3,
83            retry_backoff: Duration::from_secs(2),
84            migration_cooldown: Duration::from_secs(10),
85            keep_old_path: true,
86            validate_new_path: true,
87        }
88    }
89}
90
91impl MigrationConfig {
92    /// Create a mobile-optimized configuration
93    ///
94    /// Aggressive migration settings for mobile devices that frequently
95    /// switch between WiFi and cellular networks.
96    pub fn mobile() -> Self {
97        Self {
98            auto_migrate: true,
99            migration_timeout: Duration::from_secs(15),
100            max_retry_attempts: 5,
101            retry_backoff: Duration::from_millis(500),
102            migration_cooldown: Duration::from_secs(5),
103            keep_old_path: true,
104            validate_new_path: true,
105        }
106    }
107
108    /// Create a conservative configuration
109    ///
110    /// More cautious migration settings for stable networks where
111    /// migrations are rare.
112    pub fn conservative() -> Self {
113        Self {
114            auto_migrate: false,
115            migration_timeout: Duration::from_secs(60),
116            max_retry_attempts: 2,
117            retry_backoff: Duration::from_secs(5),
118            migration_cooldown: Duration::from_secs(30),
119            keep_old_path: true,
120            validate_new_path: true,
121        }
122    }
123}
124
125/// State of a connection migration
126#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum MigrationState {
128    /// No migration in progress
129    Idle,
130    /// Migration initiated
131    Initiated,
132    /// Validating new network path
133    Validating,
134    /// Transferring connection state
135    Migrating,
136    /// Migration completed successfully
137    Completed,
138    /// Migration failed
139    Failed,
140}
141
142/// Information about a connection migration attempt
143#[derive(Debug, Clone)]
144pub struct MigrationAttempt {
145    /// Peer being migrated
146    pub peer_id: PeerId,
147    /// Old network address
148    pub old_address: Multiaddr,
149    /// New network address
150    pub new_address: Multiaddr,
151    /// Current migration state
152    pub state: MigrationState,
153    /// When the migration started
154    pub started_at: Instant,
155    /// Number of retry attempts
156    pub retry_count: usize,
157    /// Error message if failed
158    pub error: Option<String>,
159}
160
161/// Connection migration statistics
162#[derive(Debug, Clone, Default)]
163pub struct MigrationStats {
164    /// Total number of migration attempts
165    pub total_attempts: usize,
166    /// Number of successful migrations
167    pub successful_migrations: usize,
168    /// Number of failed migrations
169    pub failed_migrations: usize,
170    /// Number of migrations in progress
171    pub in_progress: usize,
172    /// Average migration duration (milliseconds)
173    pub avg_duration_ms: u64,
174    /// Total retry attempts
175    pub total_retries: usize,
176}
177
178/// Manages QUIC connection migration
179pub struct ConnectionMigrationManager {
180    /// Configuration
181    config: MigrationConfig,
182    /// Active migration attempts
183    active_migrations: Arc<DashMap<PeerId, MigrationAttempt>>,
184    /// Last migration time per peer (for cooldown)
185    last_migration: Arc<DashMap<PeerId, Instant>>,
186    /// Migration statistics
187    stats: Arc<RwLock<MigrationStats>>,
188    /// Migration history (for tracking durations)
189    migration_durations: Arc<RwLock<Vec<u64>>>,
190}
191
192impl ConnectionMigrationManager {
193    /// Create a new connection migration manager
194    pub fn new(config: MigrationConfig) -> Self {
195        Self {
196            config,
197            active_migrations: Arc::new(DashMap::new()),
198            last_migration: Arc::new(DashMap::new()),
199            stats: Arc::new(RwLock::new(MigrationStats::default())),
200            migration_durations: Arc::new(RwLock::new(Vec::new())),
201        }
202    }
203
204    /// Create with mobile-optimized configuration
205    pub fn mobile() -> Self {
206        Self::new(MigrationConfig::mobile())
207    }
208
209    /// Create with conservative configuration
210    pub fn conservative() -> Self {
211        Self::new(MigrationConfig::conservative())
212    }
213
214    /// Initiate migration for a peer connection
215    ///
216    /// # Arguments
217    ///
218    /// * `peer_id` - The peer to migrate
219    /// * `old_address` - Current network address
220    /// * `new_address` - New network address to migrate to
221    ///
222    /// # Returns
223    ///
224    /// Ok if migration was initiated, Err if migration cannot start
225    pub fn initiate_migration(
226        &self,
227        peer_id: PeerId,
228        old_address: Multiaddr,
229        new_address: Multiaddr,
230    ) -> Result<(), MigrationError> {
231        // Check if migration is already in progress
232        if self.active_migrations.contains_key(&peer_id) {
233            return Err(MigrationError::MigrationInProgress);
234        }
235
236        // Check cooldown period
237        if let Some(last) = self.last_migration.get(&peer_id) {
238            if last.elapsed() < self.config.migration_cooldown {
239                debug!(
240                    "Migration cooldown active for peer {} ({:?} remaining)",
241                    peer_id,
242                    self.config.migration_cooldown - last.elapsed()
243                );
244                return Err(MigrationError::InvalidState);
245            }
246        }
247
248        info!(
249            "Initiating connection migration for peer {} from {} to {}",
250            peer_id, old_address, new_address
251        );
252
253        // Create migration attempt
254        let attempt = MigrationAttempt {
255            peer_id,
256            old_address,
257            new_address,
258            state: MigrationState::Initiated,
259            started_at: Instant::now(),
260            retry_count: 0,
261            error: None,
262        };
263
264        // Record the attempt
265        self.active_migrations.insert(peer_id, attempt);
266
267        // Update statistics
268        let mut stats = self.stats.write();
269        stats.total_attempts += 1;
270        stats.in_progress += 1;
271
272        Ok(())
273    }
274
275    /// Update the state of an ongoing migration
276    pub fn update_migration_state(
277        &self,
278        peer_id: &PeerId,
279        new_state: MigrationState,
280    ) -> Result<(), MigrationError> {
281        if let Some(mut attempt) = self.active_migrations.get_mut(peer_id) {
282            debug!(
283                "Migration state change for peer {}: {:?} -> {:?}",
284                peer_id, attempt.state, new_state
285            );
286            attempt.state = new_state;
287            Ok(())
288        } else {
289            Err(MigrationError::NoActiveConnection)
290        }
291    }
292
293    /// Mark a migration as completed successfully
294    pub fn complete_migration(&self, peer_id: &PeerId) -> Result<(), MigrationError> {
295        if let Some((_, attempt)) = self.active_migrations.remove(peer_id) {
296            let duration_ms = attempt.started_at.elapsed().as_millis() as u64;
297
298            info!(
299                "Migration completed for peer {} in {} ms",
300                peer_id, duration_ms
301            );
302
303            // Update last migration time
304            self.last_migration.insert(*peer_id, Instant::now());
305
306            // Update statistics
307            {
308                let mut stats = self.stats.write();
309                stats.successful_migrations += 1;
310                stats.in_progress = stats.in_progress.saturating_sub(1);
311                stats.total_retries += attempt.retry_count;
312            }
313
314            // Record duration for average calculation
315            {
316                let mut durations = self.migration_durations.write();
317                durations.push(duration_ms);
318
319                // Keep only last 100 durations to prevent unbounded growth
320                if durations.len() > 100 {
321                    durations.remove(0);
322                }
323
324                // Update average
325                let avg = durations.iter().sum::<u64>() / durations.len() as u64;
326                self.stats.write().avg_duration_ms = avg;
327            }
328
329            Ok(())
330        } else {
331            Err(MigrationError::NoActiveConnection)
332        }
333    }
334
335    /// Mark a migration as failed
336    pub fn fail_migration(&self, peer_id: &PeerId, error: String) -> Result<(), MigrationError> {
337        if let Some((_, mut attempt)) = self.active_migrations.remove(peer_id) {
338            warn!("Migration failed for peer {}: {}", peer_id, error);
339
340            attempt.error = Some(error);
341            attempt.state = MigrationState::Failed;
342
343            // Update statistics
344            let mut stats = self.stats.write();
345            stats.failed_migrations += 1;
346            stats.in_progress = stats.in_progress.saturating_sub(1);
347            stats.total_retries += attempt.retry_count;
348
349            Ok(())
350        } else {
351            Err(MigrationError::NoActiveConnection)
352        }
353    }
354
355    /// Retry a failed migration
356    pub fn retry_migration(&self, peer_id: &PeerId) -> Result<(), MigrationError> {
357        if let Some(mut attempt) = self.active_migrations.get_mut(peer_id) {
358            if attempt.retry_count >= self.config.max_retry_attempts {
359                return Err(MigrationError::MigrationFailed(
360                    "Max retry attempts reached".to_string(),
361                ));
362            }
363
364            attempt.retry_count += 1;
365            attempt.state = MigrationState::Initiated;
366            attempt.error = None;
367
368            info!(
369                "Retrying migration for peer {} (attempt {})",
370                peer_id,
371                attempt.retry_count + 1
372            );
373
374            Ok(())
375        } else {
376            Err(MigrationError::NoActiveConnection)
377        }
378    }
379
380    /// Check if a migration is in progress for a peer
381    pub fn is_migrating(&self, peer_id: &PeerId) -> bool {
382        self.active_migrations.contains_key(peer_id)
383    }
384
385    /// Get the current migration state for a peer
386    pub fn get_migration_state(&self, peer_id: &PeerId) -> Option<MigrationState> {
387        self.active_migrations
388            .get(peer_id)
389            .map(|attempt| attempt.state)
390    }
391
392    /// Get all active migration attempts
393    pub fn get_active_migrations(&self) -> Vec<MigrationAttempt> {
394        self.active_migrations
395            .iter()
396            .map(|entry| entry.value().clone())
397            .collect()
398    }
399
400    /// Get migration statistics
401    pub fn stats(&self) -> MigrationStats {
402        self.stats.read().clone()
403    }
404
405    /// Check if a peer can be migrated (respects cooldown)
406    pub fn can_migrate(&self, peer_id: &PeerId) -> bool {
407        if self.active_migrations.contains_key(peer_id) {
408            return false;
409        }
410
411        if let Some(last) = self.last_migration.get(peer_id) {
412            last.elapsed() >= self.config.migration_cooldown
413        } else {
414            true
415        }
416    }
417
418    /// Get the configuration
419    pub fn config(&self) -> &MigrationConfig {
420        &self.config
421    }
422
423    /// Clean up timed-out migrations
424    pub fn cleanup_timeouts(&self) {
425        let timeout = self.config.migration_timeout;
426        let mut timed_out = Vec::new();
427
428        for entry in self.active_migrations.iter() {
429            if entry.value().started_at.elapsed() > timeout {
430                timed_out.push(*entry.key());
431            }
432        }
433
434        for peer_id in timed_out {
435            self.fail_migration(&peer_id, "Migration timeout".to_string())
436                .ok();
437        }
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use std::str::FromStr;
445
446    fn test_peer_id() -> PeerId {
447        PeerId::random()
448    }
449
450    fn test_addr() -> Multiaddr {
451        Multiaddr::from_str("/ip4/127.0.0.1/tcp/4001").unwrap()
452    }
453
454    fn test_addr2() -> Multiaddr {
455        Multiaddr::from_str("/ip4/192.168.1.1/tcp/4001").unwrap()
456    }
457
458    #[test]
459    fn test_migration_config_default() {
460        let config = MigrationConfig::default();
461        assert!(config.auto_migrate);
462        assert!(config.keep_old_path);
463        assert!(config.validate_new_path);
464    }
465
466    #[test]
467    fn test_migration_config_mobile() {
468        let config = MigrationConfig::mobile();
469        assert!(config.auto_migrate);
470        assert_eq!(config.max_retry_attempts, 5);
471        assert_eq!(config.migration_cooldown, Duration::from_secs(5));
472    }
473
474    #[test]
475    fn test_migration_config_conservative() {
476        let config = MigrationConfig::conservative();
477        assert!(!config.auto_migrate);
478        assert_eq!(config.max_retry_attempts, 2);
479        assert_eq!(config.migration_cooldown, Duration::from_secs(30));
480    }
481
482    #[test]
483    fn test_initiate_migration() {
484        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
485        let peer = test_peer_id();
486        let old_addr = test_addr();
487        let new_addr = test_addr2();
488
489        let result = manager.initiate_migration(peer, old_addr, new_addr);
490        assert!(result.is_ok());
491
492        let stats = manager.stats();
493        assert_eq!(stats.total_attempts, 1);
494        assert_eq!(stats.in_progress, 1);
495    }
496
497    #[test]
498    fn test_migration_in_progress_error() {
499        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
500        let peer = test_peer_id();
501        let old_addr = test_addr();
502        let new_addr = test_addr2();
503
504        manager
505            .initiate_migration(peer, old_addr.clone(), new_addr.clone())
506            .unwrap();
507
508        let result = manager.initiate_migration(peer, old_addr, new_addr);
509        assert!(matches!(result, Err(MigrationError::MigrationInProgress)));
510    }
511
512    #[test]
513    fn test_complete_migration() {
514        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
515        let peer = test_peer_id();
516
517        manager
518            .initiate_migration(peer, test_addr(), test_addr2())
519            .unwrap();
520        let result = manager.complete_migration(&peer);
521
522        assert!(result.is_ok());
523
524        let stats = manager.stats();
525        assert_eq!(stats.successful_migrations, 1);
526        assert_eq!(stats.in_progress, 0);
527    }
528
529    #[test]
530    fn test_fail_migration() {
531        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
532        let peer = test_peer_id();
533
534        manager
535            .initiate_migration(peer, test_addr(), test_addr2())
536            .unwrap();
537        let result = manager.fail_migration(&peer, "Test error".to_string());
538
539        assert!(result.is_ok());
540
541        let stats = manager.stats();
542        assert_eq!(stats.failed_migrations, 1);
543        assert_eq!(stats.in_progress, 0);
544    }
545
546    #[test]
547    fn test_retry_migration() {
548        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
549        let peer = test_peer_id();
550
551        manager
552            .initiate_migration(peer, test_addr(), test_addr2())
553            .unwrap();
554
555        let result = manager.retry_migration(&peer);
556        assert!(result.is_ok());
557
558        let attempt = manager.active_migrations.get(&peer).unwrap();
559        assert_eq!(attempt.retry_count, 1);
560    }
561
562    #[test]
563    fn test_retry_limit() {
564        let config = MigrationConfig {
565            max_retry_attempts: 2,
566            ..Default::default()
567        };
568        let manager = ConnectionMigrationManager::new(config);
569        let peer = test_peer_id();
570
571        manager
572            .initiate_migration(peer, test_addr(), test_addr2())
573            .unwrap();
574
575        // First retry should succeed
576        assert!(manager.retry_migration(&peer).is_ok());
577        // Second retry should succeed
578        assert!(manager.retry_migration(&peer).is_ok());
579        // Third retry should fail (max reached)
580        assert!(matches!(
581            manager.retry_migration(&peer),
582            Err(MigrationError::MigrationFailed(_))
583        ));
584    }
585
586    #[test]
587    fn test_is_migrating() {
588        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
589        let peer = test_peer_id();
590
591        assert!(!manager.is_migrating(&peer));
592
593        manager
594            .initiate_migration(peer, test_addr(), test_addr2())
595            .unwrap();
596        assert!(manager.is_migrating(&peer));
597
598        manager.complete_migration(&peer).unwrap();
599        assert!(!manager.is_migrating(&peer));
600    }
601
602    #[test]
603    fn test_migration_state_updates() {
604        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
605        let peer = test_peer_id();
606
607        manager
608            .initiate_migration(peer, test_addr(), test_addr2())
609            .unwrap();
610
611        assert_eq!(
612            manager.get_migration_state(&peer),
613            Some(MigrationState::Initiated)
614        );
615
616        manager
617            .update_migration_state(&peer, MigrationState::Validating)
618            .unwrap();
619        assert_eq!(
620            manager.get_migration_state(&peer),
621            Some(MigrationState::Validating)
622        );
623
624        manager
625            .update_migration_state(&peer, MigrationState::Migrating)
626            .unwrap();
627        assert_eq!(
628            manager.get_migration_state(&peer),
629            Some(MigrationState::Migrating)
630        );
631    }
632
633    #[test]
634    fn test_can_migrate() {
635        let config = MigrationConfig {
636            migration_cooldown: Duration::from_millis(100),
637            ..Default::default()
638        };
639        let manager = ConnectionMigrationManager::new(config);
640        let peer = test_peer_id();
641
642        assert!(manager.can_migrate(&peer));
643
644        manager
645            .initiate_migration(peer, test_addr(), test_addr2())
646            .unwrap();
647        assert!(!manager.can_migrate(&peer));
648
649        manager.complete_migration(&peer).unwrap();
650        assert!(!manager.can_migrate(&peer)); // Cooldown active
651
652        std::thread::sleep(Duration::from_millis(150));
653        assert!(manager.can_migrate(&peer)); // Cooldown expired
654    }
655
656    #[test]
657    fn test_get_active_migrations() {
658        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
659        let peer1 = test_peer_id();
660        let peer2 = test_peer_id();
661
662        manager
663            .initiate_migration(peer1, test_addr(), test_addr2())
664            .unwrap();
665        manager
666            .initiate_migration(peer2, test_addr(), test_addr2())
667            .unwrap();
668
669        let active = manager.get_active_migrations();
670        assert_eq!(active.len(), 2);
671    }
672
673    #[test]
674    fn test_average_duration_calculation() {
675        let manager = ConnectionMigrationManager::new(MigrationConfig::default());
676        let peer1 = test_peer_id();
677        let peer2 = test_peer_id();
678
679        manager
680            .initiate_migration(peer1, test_addr(), test_addr2())
681            .unwrap();
682        std::thread::sleep(Duration::from_millis(10));
683        manager.complete_migration(&peer1).unwrap();
684
685        manager
686            .initiate_migration(peer2, test_addr(), test_addr2())
687            .unwrap();
688        std::thread::sleep(Duration::from_millis(10));
689        manager.complete_migration(&peer2).unwrap();
690
691        let stats = manager.stats();
692        assert!(stats.avg_duration_ms > 0);
693    }
694}