ant_quic/masque/
migration.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! Connection Migration for MASQUE Relay
9//!
10//! Provides relay-to-direct path upgrade functionality. When a connection
11//! is established through a relay, this module coordinates attempts to
12//! establish a direct path and migrate the connection.
13//!
14//! # Migration Flow
15//!
16//! 1. Data flows via relay (RelayOnly state)
17//! 2. Exchange ADD_ADDRESS frames through relay
18//! 3. Coordinate PUNCH_ME_NOW timing
19//! 4. Both peers send PATH_CHALLENGE to candidates
20//! 5. On PATH_RESPONSE, QUIC migrates to direct path
21//! 6. Relay kept as fallback
22//!
23//! # Example
24//!
25//! ```rust,ignore
26//! use ant_quic::masque::migration::{MigrationCoordinator, MigrationConfig};
27//!
28//! let config = MigrationConfig::default();
29//! let coordinator = MigrationCoordinator::new(config);
30//!
31//! // Start migration attempt
32//! coordinator.start_migration(peer_addr).await;
33//!
34//! // Check migration state
35//! match coordinator.state() {
36//!     MigrationState::DirectEstablished => println!("Direct path active!"),
37//!     MigrationState::RelayOnly => println!("Still using relay"),
38//!     _ => {}
39//! }
40//! ```
41
42use std::collections::HashMap;
43use std::net::SocketAddr;
44use std::sync::Arc;
45use std::sync::atomic::{AtomicU64, Ordering};
46use std::time::{Duration, Instant};
47use tokio::sync::RwLock;
48
49/// Configuration for connection migration
50#[derive(Debug, Clone)]
51pub struct MigrationConfig {
52    /// Time to wait between migration attempts
53    pub probe_interval: Duration,
54    /// Maximum time to wait for path validation
55    pub validation_timeout: Duration,
56    /// Maximum concurrent path probes
57    pub max_concurrent_probes: usize,
58    /// Delay before attempting migration after relay established
59    pub initial_delay: Duration,
60    /// Maximum migration attempts before giving up
61    pub max_attempts: u32,
62    /// Whether to automatically attempt migration
63    pub auto_migrate: bool,
64}
65
66impl Default for MigrationConfig {
67    fn default() -> Self {
68        Self {
69            probe_interval: Duration::from_secs(5),
70            validation_timeout: Duration::from_secs(3),
71            max_concurrent_probes: 4,
72            initial_delay: Duration::from_secs(2),
73            max_attempts: 5,
74            auto_migrate: true,
75        }
76    }
77}
78
79/// State of a connection migration attempt
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub enum MigrationState {
82    /// Connection is relay-only, no migration attempted
83    RelayOnly,
84    /// Waiting for initial delay before probing
85    WaitingToProbe {
86        /// When we'll start probing
87        probe_at: Instant,
88    },
89    /// Actively probing candidate addresses
90    ProbeInProgress {
91        /// Candidate addresses being probed
92        candidates: Vec<SocketAddr>,
93        /// When probing started
94        started_at: Instant,
95    },
96    /// A direct path has been validated, migration pending
97    MigrationPending {
98        /// The validated direct path
99        verified_path: SocketAddr,
100        /// RTT measured on the direct path
101        measured_rtt: Duration,
102    },
103    /// Successfully migrated to direct path
104    DirectEstablished {
105        /// The direct path address
106        direct_path: SocketAddr,
107        /// When migration completed
108        migrated_at: Instant,
109    },
110    /// Migration failed, falling back to relay
111    FallbackToRelay {
112        /// Reason for fallback
113        reason: String,
114        /// Number of attempts made
115        attempts: u32,
116    },
117}
118
119impl MigrationState {
120    /// Check if currently using relay
121    pub fn is_relayed(&self) -> bool {
122        !matches!(self, Self::DirectEstablished { .. })
123    }
124
125    /// Check if migration is in progress
126    pub fn is_migrating(&self) -> bool {
127        matches!(
128            self,
129            Self::WaitingToProbe { .. }
130                | Self::ProbeInProgress { .. }
131                | Self::MigrationPending { .. }
132        )
133    }
134
135    /// Check if direct path is established
136    pub fn is_direct(&self) -> bool {
137        matches!(self, Self::DirectEstablished { .. })
138    }
139}
140
141/// Statistics for migration operations
142#[derive(Debug, Default)]
143pub struct MigrationStats {
144    /// Total migration attempts
145    pub attempts: AtomicU64,
146    /// Successful migrations
147    pub successful: AtomicU64,
148    /// Failed migrations
149    pub failed: AtomicU64,
150    /// Paths probed
151    pub paths_probed: AtomicU64,
152    /// Average migration time (ms)
153    pub avg_migration_time_ms: AtomicU64,
154}
155
156impl MigrationStats {
157    /// Create new statistics
158    pub fn new() -> Self {
159        Self::default()
160    }
161
162    /// Record a migration attempt result
163    pub fn record_attempt(&self, success: bool, duration: Duration) {
164        self.attempts.fetch_add(1, Ordering::Relaxed);
165        if success {
166            self.successful.fetch_add(1, Ordering::Relaxed);
167            // Update average migration time
168            let ms = duration.as_millis() as u64;
169            let prev_avg = self.avg_migration_time_ms.load(Ordering::Relaxed);
170            let successful = self.successful.load(Ordering::Relaxed);
171            if successful > 0 {
172                let new_avg = ((prev_avg * (successful - 1)) + ms) / successful;
173                self.avg_migration_time_ms.store(new_avg, Ordering::Relaxed);
174            }
175        } else {
176            self.failed.fetch_add(1, Ordering::Relaxed);
177        }
178    }
179
180    /// Record a path probe
181    pub fn record_probe(&self) {
182        self.paths_probed.fetch_add(1, Ordering::Relaxed);
183    }
184
185    /// Get success rate as percentage
186    pub fn success_rate(&self) -> f64 {
187        let attempts = self.attempts.load(Ordering::Relaxed);
188        if attempts == 0 {
189            return 0.0;
190        }
191        let successful = self.successful.load(Ordering::Relaxed);
192        (successful as f64 / attempts as f64) * 100.0
193    }
194}
195
196/// Information about a candidate path
197#[derive(Debug, Clone)]
198#[allow(dead_code)] // Fields reserved for future path management
199struct CandidatePath {
200    /// Address of the candidate
201    address: SocketAddr,
202    /// When we started probing this candidate
203    probe_started: Option<Instant>,
204    /// Measured RTT if validated
205    rtt: Option<Duration>,
206    /// Whether this path is validated
207    validated: bool,
208    /// Number of probe attempts
209    probe_count: u32,
210}
211
212impl CandidatePath {
213    fn new(address: SocketAddr) -> Self {
214        Self {
215            address,
216            probe_started: None,
217            rtt: None,
218            validated: false,
219            probe_count: 0,
220        }
221    }
222}
223
224/// Coordinates connection migration from relay to direct path
225#[derive(Debug)]
226pub struct MigrationCoordinator {
227    /// Configuration
228    config: MigrationConfig,
229    /// Current migration state per peer
230    states: RwLock<HashMap<SocketAddr, MigrationState>>,
231    /// Candidate paths per peer
232    candidates: RwLock<HashMap<SocketAddr, Vec<CandidatePath>>>,
233    /// Statistics
234    stats: Arc<MigrationStats>,
235    /// Relay address (for fallback)
236    relay_address: RwLock<Option<SocketAddr>>,
237}
238
239impl MigrationCoordinator {
240    /// Create a new migration coordinator
241    pub fn new(config: MigrationConfig) -> Self {
242        Self {
243            config,
244            states: RwLock::new(HashMap::new()),
245            candidates: RwLock::new(HashMap::new()),
246            stats: Arc::new(MigrationStats::new()),
247            relay_address: RwLock::new(None),
248        }
249    }
250
251    /// Get statistics
252    pub fn stats(&self) -> Arc<MigrationStats> {
253        Arc::clone(&self.stats)
254    }
255
256    /// Set the relay address for this coordinator
257    pub async fn set_relay(&self, relay: SocketAddr) {
258        let mut relay_addr = self.relay_address.write().await;
259        *relay_addr = Some(relay);
260    }
261
262    /// Get current migration state for a peer
263    pub async fn state(&self, peer: SocketAddr) -> MigrationState {
264        let states = self.states.read().await;
265        states
266            .get(&peer)
267            .cloned()
268            .unwrap_or(MigrationState::RelayOnly)
269    }
270
271    /// Register candidate addresses for a peer
272    pub async fn add_candidates(&self, peer: SocketAddr, addrs: Vec<SocketAddr>) {
273        let mut candidates = self.candidates.write().await;
274        let peer_candidates = candidates.entry(peer).or_default();
275
276        for addr in addrs {
277            if !peer_candidates.iter().any(|c| c.address == addr) {
278                peer_candidates.push(CandidatePath::new(addr));
279            }
280        }
281    }
282
283    /// Start migration attempt for a peer
284    pub async fn start_migration(&self, peer: SocketAddr) {
285        if !self.config.auto_migrate {
286            return;
287        }
288
289        let mut states = self.states.write().await;
290
291        // Only start if in relay-only state
292        if let Some(state) = states.get(&peer) {
293            if !matches!(state, MigrationState::RelayOnly) {
294                return;
295            }
296        }
297
298        // Set waiting state with initial delay
299        states.insert(
300            peer,
301            MigrationState::WaitingToProbe {
302                probe_at: Instant::now() + self.config.initial_delay,
303            },
304        );
305
306        tracing::debug!(peer = %peer, "Scheduled migration probe");
307    }
308
309    /// Poll migration progress - should be called periodically
310    pub async fn poll(&self, peer: SocketAddr) -> MigrationState {
311        let state = self.state(peer).await;
312
313        match &state {
314            MigrationState::WaitingToProbe { probe_at } => {
315                if Instant::now() >= *probe_at {
316                    // Time to start probing
317                    self.begin_probing(peer).await;
318                }
319            }
320            MigrationState::ProbeInProgress {
321                candidates: _,
322                started_at,
323            } => {
324                if started_at.elapsed() > self.config.validation_timeout {
325                    // Probing timed out
326                    self.handle_probe_timeout(peer).await;
327                }
328            }
329            _ => {}
330        }
331
332        self.state(peer).await
333    }
334
335    /// Begin probing candidates
336    async fn begin_probing(&self, peer: SocketAddr) {
337        let candidates = {
338            let candidates = self.candidates.read().await;
339            candidates
340                .get(&peer)
341                .map(|c| c.iter().map(|p| p.address).collect::<Vec<_>>())
342                .unwrap_or_default()
343        };
344
345        if candidates.is_empty() {
346            // No candidates to probe
347            let mut states = self.states.write().await;
348            states.insert(
349                peer,
350                MigrationState::FallbackToRelay {
351                    reason: "No candidate addresses available".to_string(),
352                    attempts: 0,
353                },
354            );
355            return;
356        }
357
358        // Limit concurrent probes
359        let probe_candidates: Vec<_> = candidates
360            .into_iter()
361            .take(self.config.max_concurrent_probes)
362            .collect();
363
364        let mut states = self.states.write().await;
365        states.insert(
366            peer,
367            MigrationState::ProbeInProgress {
368                candidates: probe_candidates.clone(),
369                started_at: Instant::now(),
370            },
371        );
372
373        // Record probe stats
374        for _ in &probe_candidates {
375            self.stats.record_probe();
376        }
377
378        tracing::info!(
379            peer = %peer,
380            candidates = probe_candidates.len(),
381            "Started probing candidate paths"
382        );
383    }
384
385    /// Handle probe timeout
386    async fn handle_probe_timeout(&self, peer: SocketAddr) {
387        let mut states = self.states.write().await;
388
389        let attempts =
390            if let Some(MigrationState::FallbackToRelay { attempts, .. }) = states.get(&peer) {
391                *attempts + 1
392            } else {
393                1
394            };
395
396        if attempts >= self.config.max_attempts {
397            states.insert(
398                peer,
399                MigrationState::FallbackToRelay {
400                    reason: "Maximum migration attempts exceeded".to_string(),
401                    attempts,
402                },
403            );
404            self.stats
405                .record_attempt(false, self.config.validation_timeout);
406            tracing::warn!(peer = %peer, "Migration failed after {} attempts", attempts);
407        } else {
408            // Schedule another attempt
409            states.insert(
410                peer,
411                MigrationState::WaitingToProbe {
412                    probe_at: Instant::now() + self.config.probe_interval,
413                },
414            );
415            tracing::debug!(peer = %peer, "Scheduling retry after probe timeout");
416        }
417    }
418
419    /// Report a validated path (called when PATH_RESPONSE received)
420    pub async fn report_validated_path(&self, peer: SocketAddr, path: SocketAddr, rtt: Duration) {
421        let mut states = self.states.write().await;
422
423        // Update candidate
424        {
425            let mut candidates = self.candidates.write().await;
426            if let Some(peer_candidates) = candidates.get_mut(&peer) {
427                if let Some(candidate) = peer_candidates.iter_mut().find(|c| c.address == path) {
428                    candidate.validated = true;
429                    candidate.rtt = Some(rtt);
430                }
431            }
432        }
433
434        // Only transition from ProbeInProgress
435        if let Some(MigrationState::ProbeInProgress { started_at, .. }) = states.get(&peer) {
436            let duration = started_at.elapsed();
437
438            states.insert(
439                peer,
440                MigrationState::MigrationPending {
441                    verified_path: path,
442                    measured_rtt: rtt,
443                },
444            );
445
446            tracing::info!(
447                peer = %peer,
448                path = %path,
449                rtt_ms = rtt.as_millis(),
450                "Direct path validated, migration pending"
451            );
452
453            self.stats.record_attempt(true, duration);
454        }
455    }
456
457    /// Complete migration to direct path
458    pub async fn complete_migration(&self, peer: SocketAddr) {
459        let mut states = self.states.write().await;
460
461        if let Some(MigrationState::MigrationPending { verified_path, .. }) = states.get(&peer) {
462            let path = *verified_path;
463            states.insert(
464                peer,
465                MigrationState::DirectEstablished {
466                    direct_path: path,
467                    migrated_at: Instant::now(),
468                },
469            );
470
471            tracing::info!(peer = %peer, path = %path, "Migration completed - direct path active");
472        }
473    }
474
475    /// Force fallback to relay
476    pub async fn fallback_to_relay(&self, peer: SocketAddr, reason: &str) {
477        let mut states = self.states.write().await;
478
479        let attempts =
480            if let Some(MigrationState::FallbackToRelay { attempts, .. }) = states.get(&peer) {
481                *attempts
482            } else {
483                0
484            };
485
486        states.insert(
487            peer,
488            MigrationState::FallbackToRelay {
489                reason: reason.to_string(),
490                attempts,
491            },
492        );
493
494        tracing::warn!(peer = %peer, reason = reason, "Forced fallback to relay");
495    }
496
497    /// Reset migration state for a peer
498    pub async fn reset(&self, peer: SocketAddr) {
499        let mut states = self.states.write().await;
500        let mut candidates = self.candidates.write().await;
501
502        states.remove(&peer);
503        candidates.remove(&peer);
504    }
505
506    /// Get all peers currently migrating
507    pub async fn migrating_peers(&self) -> Vec<SocketAddr> {
508        let states = self.states.read().await;
509        states
510            .iter()
511            .filter(|(_, state)| state.is_migrating())
512            .map(|(peer, _)| *peer)
513            .collect()
514    }
515
516    /// Get all peers with direct paths
517    pub async fn direct_peers(&self) -> Vec<SocketAddr> {
518        let states = self.states.read().await;
519        states
520            .iter()
521            .filter(|(_, state)| state.is_direct())
522            .map(|(peer, _)| *peer)
523            .collect()
524    }
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530    use std::net::{IpAddr, Ipv4Addr};
531
532    fn peer_addr(id: u8) -> SocketAddr {
533        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, id)), 9000)
534    }
535
536    fn candidate_addr(id: u8) -> SocketAddr {
537        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, id)), 9001)
538    }
539
540    #[tokio::test]
541    async fn test_coordinator_creation() {
542        let config = MigrationConfig::default();
543        let coordinator = MigrationCoordinator::new(config);
544
545        let state = coordinator.state(peer_addr(1)).await;
546        assert!(matches!(state, MigrationState::RelayOnly));
547    }
548
549    #[tokio::test]
550    async fn test_add_candidates() {
551        let config = MigrationConfig::default();
552        let coordinator = MigrationCoordinator::new(config);
553
554        let peer = peer_addr(1);
555        let candidates = vec![candidate_addr(1), candidate_addr(2)];
556
557        coordinator.add_candidates(peer, candidates.clone()).await;
558
559        let stored = coordinator.candidates.read().await;
560        let peer_candidates = stored.get(&peer).unwrap();
561        assert_eq!(peer_candidates.len(), 2);
562    }
563
564    #[tokio::test]
565    async fn test_start_migration() {
566        let config = MigrationConfig {
567            initial_delay: Duration::from_millis(1),
568            ..Default::default()
569        };
570        let coordinator = MigrationCoordinator::new(config);
571
572        let peer = peer_addr(1);
573        coordinator.start_migration(peer).await;
574
575        let state = coordinator.state(peer).await;
576        assert!(matches!(state, MigrationState::WaitingToProbe { .. }));
577    }
578
579    #[tokio::test]
580    async fn test_begin_probing_no_candidates() {
581        let config = MigrationConfig {
582            initial_delay: Duration::from_millis(1),
583            ..Default::default()
584        };
585        let coordinator = MigrationCoordinator::new(config);
586
587        let peer = peer_addr(1);
588        coordinator.start_migration(peer).await;
589
590        // Wait for delay
591        tokio::time::sleep(Duration::from_millis(10)).await;
592
593        // Poll should transition to FallbackToRelay due to no candidates
594        let state = coordinator.poll(peer).await;
595        assert!(matches!(state, MigrationState::FallbackToRelay { .. }));
596    }
597
598    #[tokio::test]
599    async fn test_begin_probing_with_candidates() {
600        let config = MigrationConfig {
601            initial_delay: Duration::from_millis(1),
602            validation_timeout: Duration::from_secs(10),
603            ..Default::default()
604        };
605        let coordinator = MigrationCoordinator::new(config);
606
607        let peer = peer_addr(1);
608        let candidates = vec![candidate_addr(1), candidate_addr(2)];
609        coordinator.add_candidates(peer, candidates).await;
610        coordinator.start_migration(peer).await;
611
612        // Wait for delay
613        tokio::time::sleep(Duration::from_millis(10)).await;
614
615        // Poll should transition to ProbeInProgress
616        let state = coordinator.poll(peer).await;
617        assert!(matches!(state, MigrationState::ProbeInProgress { .. }));
618    }
619
620    #[tokio::test]
621    async fn test_report_validated_path() {
622        let config = MigrationConfig {
623            initial_delay: Duration::from_millis(1),
624            validation_timeout: Duration::from_secs(10),
625            ..Default::default()
626        };
627        let coordinator = MigrationCoordinator::new(config);
628
629        let peer = peer_addr(1);
630        let candidate = candidate_addr(1);
631        coordinator.add_candidates(peer, vec![candidate]).await;
632        coordinator.start_migration(peer).await;
633
634        tokio::time::sleep(Duration::from_millis(10)).await;
635        coordinator.poll(peer).await;
636
637        // Report validated path
638        coordinator
639            .report_validated_path(peer, candidate, Duration::from_millis(50))
640            .await;
641
642        let state = coordinator.state(peer).await;
643        assert!(matches!(state, MigrationState::MigrationPending { .. }));
644    }
645
646    #[tokio::test]
647    async fn test_complete_migration() {
648        let config = MigrationConfig {
649            initial_delay: Duration::from_millis(1),
650            validation_timeout: Duration::from_secs(10),
651            ..Default::default()
652        };
653        let coordinator = MigrationCoordinator::new(config);
654
655        let peer = peer_addr(1);
656        let candidate = candidate_addr(1);
657        coordinator.add_candidates(peer, vec![candidate]).await;
658        coordinator.start_migration(peer).await;
659
660        tokio::time::sleep(Duration::from_millis(10)).await;
661        coordinator.poll(peer).await;
662
663        coordinator
664            .report_validated_path(peer, candidate, Duration::from_millis(50))
665            .await;
666        coordinator.complete_migration(peer).await;
667
668        let state = coordinator.state(peer).await;
669        assert!(matches!(state, MigrationState::DirectEstablished { .. }));
670        assert!(state.is_direct());
671        assert!(!state.is_relayed());
672    }
673
674    #[tokio::test]
675    async fn test_fallback_to_relay() {
676        let config = MigrationConfig::default();
677        let coordinator = MigrationCoordinator::new(config);
678
679        let peer = peer_addr(1);
680        coordinator.fallback_to_relay(peer, "Test fallback").await;
681
682        let state = coordinator.state(peer).await;
683        assert!(matches!(state, MigrationState::FallbackToRelay { .. }));
684    }
685
686    #[tokio::test]
687    async fn test_reset() {
688        let config = MigrationConfig::default();
689        let coordinator = MigrationCoordinator::new(config);
690
691        let peer = peer_addr(1);
692        coordinator
693            .add_candidates(peer, vec![candidate_addr(1)])
694            .await;
695        coordinator.start_migration(peer).await;
696        coordinator.reset(peer).await;
697
698        let state = coordinator.state(peer).await;
699        assert!(matches!(state, MigrationState::RelayOnly));
700
701        let candidates = coordinator.candidates.read().await;
702        assert!(candidates.get(&peer).is_none());
703    }
704
705    #[tokio::test]
706    async fn test_stats() {
707        let config = MigrationConfig::default();
708        let coordinator = MigrationCoordinator::new(config);
709
710        let stats = coordinator.stats();
711        stats.record_attempt(true, Duration::from_millis(100));
712        stats.record_attempt(true, Duration::from_millis(200));
713        stats.record_attempt(false, Duration::from_millis(150));
714
715        assert_eq!(stats.attempts.load(Ordering::Relaxed), 3);
716        assert_eq!(stats.successful.load(Ordering::Relaxed), 2);
717        assert_eq!(stats.failed.load(Ordering::Relaxed), 1);
718        assert!((stats.success_rate() - 66.67).abs() < 1.0);
719    }
720
721    #[tokio::test]
722    async fn test_migrating_and_direct_peers() {
723        let config = MigrationConfig {
724            initial_delay: Duration::from_millis(1),
725            validation_timeout: Duration::from_secs(10),
726            ..Default::default()
727        };
728        let coordinator = MigrationCoordinator::new(config);
729
730        let peer1 = peer_addr(1);
731        let peer2 = peer_addr(2);
732        let candidate = candidate_addr(1);
733
734        // Start migration for peer1
735        coordinator.add_candidates(peer1, vec![candidate]).await;
736        coordinator.start_migration(peer1).await;
737
738        // Complete migration for peer2
739        coordinator.add_candidates(peer2, vec![candidate]).await;
740        coordinator.start_migration(peer2).await;
741        tokio::time::sleep(Duration::from_millis(10)).await;
742        coordinator.poll(peer2).await;
743        coordinator
744            .report_validated_path(peer2, candidate, Duration::from_millis(50))
745            .await;
746        coordinator.complete_migration(peer2).await;
747
748        let migrating = coordinator.migrating_peers().await;
749        let direct = coordinator.direct_peers().await;
750
751        assert!(migrating.contains(&peer1));
752        assert!(!migrating.contains(&peer2));
753        assert!(direct.contains(&peer2));
754        assert!(!direct.contains(&peer1));
755    }
756}