1use std::collections::HashMap;
43use std::net::SocketAddr;
44use std::sync::Arc;
45use std::sync::atomic::{AtomicU64, Ordering};
46use std::time::{Duration, Instant};
47use tokio::sync::RwLock;
48
49#[derive(Debug, Clone)]
51pub struct MigrationConfig {
52 pub probe_interval: Duration,
54 pub validation_timeout: Duration,
56 pub max_concurrent_probes: usize,
58 pub initial_delay: Duration,
60 pub max_attempts: u32,
62 pub auto_migrate: bool,
64}
65
66impl Default for MigrationConfig {
67 fn default() -> Self {
68 Self {
69 probe_interval: Duration::from_secs(5),
70 validation_timeout: Duration::from_secs(3),
71 max_concurrent_probes: 4,
72 initial_delay: Duration::from_secs(2),
73 max_attempts: 5,
74 auto_migrate: true,
75 }
76 }
77}
78
79#[derive(Debug, Clone, PartialEq, Eq)]
81pub enum MigrationState {
82 RelayOnly,
84 WaitingToProbe {
86 probe_at: Instant,
88 },
89 ProbeInProgress {
91 candidates: Vec<SocketAddr>,
93 started_at: Instant,
95 },
96 MigrationPending {
98 verified_path: SocketAddr,
100 measured_rtt: Duration,
102 },
103 DirectEstablished {
105 direct_path: SocketAddr,
107 migrated_at: Instant,
109 },
110 FallbackToRelay {
112 reason: String,
114 attempts: u32,
116 },
117}
118
119impl MigrationState {
120 pub fn is_relayed(&self) -> bool {
122 !matches!(self, Self::DirectEstablished { .. })
123 }
124
125 pub fn is_migrating(&self) -> bool {
127 matches!(
128 self,
129 Self::WaitingToProbe { .. }
130 | Self::ProbeInProgress { .. }
131 | Self::MigrationPending { .. }
132 )
133 }
134
135 pub fn is_direct(&self) -> bool {
137 matches!(self, Self::DirectEstablished { .. })
138 }
139}
140
141#[derive(Debug, Default)]
143pub struct MigrationStats {
144 pub attempts: AtomicU64,
146 pub successful: AtomicU64,
148 pub failed: AtomicU64,
150 pub paths_probed: AtomicU64,
152 pub avg_migration_time_ms: AtomicU64,
154}
155
156impl MigrationStats {
157 pub fn new() -> Self {
159 Self::default()
160 }
161
162 pub fn record_attempt(&self, success: bool, duration: Duration) {
164 self.attempts.fetch_add(1, Ordering::Relaxed);
165 if success {
166 self.successful.fetch_add(1, Ordering::Relaxed);
167 let ms = duration.as_millis() as u64;
169 let prev_avg = self.avg_migration_time_ms.load(Ordering::Relaxed);
170 let successful = self.successful.load(Ordering::Relaxed);
171 if successful > 0 {
172 let new_avg = ((prev_avg * (successful - 1)) + ms) / successful;
173 self.avg_migration_time_ms.store(new_avg, Ordering::Relaxed);
174 }
175 } else {
176 self.failed.fetch_add(1, Ordering::Relaxed);
177 }
178 }
179
180 pub fn record_probe(&self) {
182 self.paths_probed.fetch_add(1, Ordering::Relaxed);
183 }
184
185 pub fn success_rate(&self) -> f64 {
187 let attempts = self.attempts.load(Ordering::Relaxed);
188 if attempts == 0 {
189 return 0.0;
190 }
191 let successful = self.successful.load(Ordering::Relaxed);
192 (successful as f64 / attempts as f64) * 100.0
193 }
194}
195
196#[derive(Debug, Clone)]
198#[allow(dead_code)] struct CandidatePath {
200 address: SocketAddr,
202 probe_started: Option<Instant>,
204 rtt: Option<Duration>,
206 validated: bool,
208 probe_count: u32,
210}
211
212impl CandidatePath {
213 fn new(address: SocketAddr) -> Self {
214 Self {
215 address,
216 probe_started: None,
217 rtt: None,
218 validated: false,
219 probe_count: 0,
220 }
221 }
222}
223
224#[derive(Debug)]
226pub struct MigrationCoordinator {
227 config: MigrationConfig,
229 states: RwLock<HashMap<SocketAddr, MigrationState>>,
231 candidates: RwLock<HashMap<SocketAddr, Vec<CandidatePath>>>,
233 stats: Arc<MigrationStats>,
235 relay_address: RwLock<Option<SocketAddr>>,
237}
238
239impl MigrationCoordinator {
240 pub fn new(config: MigrationConfig) -> Self {
242 Self {
243 config,
244 states: RwLock::new(HashMap::new()),
245 candidates: RwLock::new(HashMap::new()),
246 stats: Arc::new(MigrationStats::new()),
247 relay_address: RwLock::new(None),
248 }
249 }
250
251 pub fn stats(&self) -> Arc<MigrationStats> {
253 Arc::clone(&self.stats)
254 }
255
256 pub async fn set_relay(&self, relay: SocketAddr) {
258 let mut relay_addr = self.relay_address.write().await;
259 *relay_addr = Some(relay);
260 }
261
262 pub async fn state(&self, peer: SocketAddr) -> MigrationState {
264 let states = self.states.read().await;
265 states
266 .get(&peer)
267 .cloned()
268 .unwrap_or(MigrationState::RelayOnly)
269 }
270
271 pub async fn add_candidates(&self, peer: SocketAddr, addrs: Vec<SocketAddr>) {
273 let mut candidates = self.candidates.write().await;
274 let peer_candidates = candidates.entry(peer).or_default();
275
276 for addr in addrs {
277 if !peer_candidates.iter().any(|c| c.address == addr) {
278 peer_candidates.push(CandidatePath::new(addr));
279 }
280 }
281 }
282
283 pub async fn start_migration(&self, peer: SocketAddr) {
285 if !self.config.auto_migrate {
286 return;
287 }
288
289 let mut states = self.states.write().await;
290
291 if let Some(state) = states.get(&peer) {
293 if !matches!(state, MigrationState::RelayOnly) {
294 return;
295 }
296 }
297
298 states.insert(
300 peer,
301 MigrationState::WaitingToProbe {
302 probe_at: Instant::now() + self.config.initial_delay,
303 },
304 );
305
306 tracing::debug!(peer = %peer, "Scheduled migration probe");
307 }
308
309 pub async fn poll(&self, peer: SocketAddr) -> MigrationState {
311 let state = self.state(peer).await;
312
313 match &state {
314 MigrationState::WaitingToProbe { probe_at } => {
315 if Instant::now() >= *probe_at {
316 self.begin_probing(peer).await;
318 }
319 }
320 MigrationState::ProbeInProgress {
321 candidates: _,
322 started_at,
323 } => {
324 if started_at.elapsed() > self.config.validation_timeout {
325 self.handle_probe_timeout(peer).await;
327 }
328 }
329 _ => {}
330 }
331
332 self.state(peer).await
333 }
334
335 async fn begin_probing(&self, peer: SocketAddr) {
337 let candidates = {
338 let candidates = self.candidates.read().await;
339 candidates
340 .get(&peer)
341 .map(|c| c.iter().map(|p| p.address).collect::<Vec<_>>())
342 .unwrap_or_default()
343 };
344
345 if candidates.is_empty() {
346 let mut states = self.states.write().await;
348 states.insert(
349 peer,
350 MigrationState::FallbackToRelay {
351 reason: "No candidate addresses available".to_string(),
352 attempts: 0,
353 },
354 );
355 return;
356 }
357
358 let probe_candidates: Vec<_> = candidates
360 .into_iter()
361 .take(self.config.max_concurrent_probes)
362 .collect();
363
364 let mut states = self.states.write().await;
365 states.insert(
366 peer,
367 MigrationState::ProbeInProgress {
368 candidates: probe_candidates.clone(),
369 started_at: Instant::now(),
370 },
371 );
372
373 for _ in &probe_candidates {
375 self.stats.record_probe();
376 }
377
378 tracing::info!(
379 peer = %peer,
380 candidates = probe_candidates.len(),
381 "Started probing candidate paths"
382 );
383 }
384
385 async fn handle_probe_timeout(&self, peer: SocketAddr) {
387 let mut states = self.states.write().await;
388
389 let attempts =
390 if let Some(MigrationState::FallbackToRelay { attempts, .. }) = states.get(&peer) {
391 *attempts + 1
392 } else {
393 1
394 };
395
396 if attempts >= self.config.max_attempts {
397 states.insert(
398 peer,
399 MigrationState::FallbackToRelay {
400 reason: "Maximum migration attempts exceeded".to_string(),
401 attempts,
402 },
403 );
404 self.stats
405 .record_attempt(false, self.config.validation_timeout);
406 tracing::warn!(peer = %peer, "Migration failed after {} attempts", attempts);
407 } else {
408 states.insert(
410 peer,
411 MigrationState::WaitingToProbe {
412 probe_at: Instant::now() + self.config.probe_interval,
413 },
414 );
415 tracing::debug!(peer = %peer, "Scheduling retry after probe timeout");
416 }
417 }
418
419 pub async fn report_validated_path(&self, peer: SocketAddr, path: SocketAddr, rtt: Duration) {
421 let mut states = self.states.write().await;
422
423 {
425 let mut candidates = self.candidates.write().await;
426 if let Some(peer_candidates) = candidates.get_mut(&peer) {
427 if let Some(candidate) = peer_candidates.iter_mut().find(|c| c.address == path) {
428 candidate.validated = true;
429 candidate.rtt = Some(rtt);
430 }
431 }
432 }
433
434 if let Some(MigrationState::ProbeInProgress { started_at, .. }) = states.get(&peer) {
436 let duration = started_at.elapsed();
437
438 states.insert(
439 peer,
440 MigrationState::MigrationPending {
441 verified_path: path,
442 measured_rtt: rtt,
443 },
444 );
445
446 tracing::info!(
447 peer = %peer,
448 path = %path,
449 rtt_ms = rtt.as_millis(),
450 "Direct path validated, migration pending"
451 );
452
453 self.stats.record_attempt(true, duration);
454 }
455 }
456
457 pub async fn complete_migration(&self, peer: SocketAddr) {
459 let mut states = self.states.write().await;
460
461 if let Some(MigrationState::MigrationPending { verified_path, .. }) = states.get(&peer) {
462 let path = *verified_path;
463 states.insert(
464 peer,
465 MigrationState::DirectEstablished {
466 direct_path: path,
467 migrated_at: Instant::now(),
468 },
469 );
470
471 tracing::info!(peer = %peer, path = %path, "Migration completed - direct path active");
472 }
473 }
474
475 pub async fn fallback_to_relay(&self, peer: SocketAddr, reason: &str) {
477 let mut states = self.states.write().await;
478
479 let attempts =
480 if let Some(MigrationState::FallbackToRelay { attempts, .. }) = states.get(&peer) {
481 *attempts
482 } else {
483 0
484 };
485
486 states.insert(
487 peer,
488 MigrationState::FallbackToRelay {
489 reason: reason.to_string(),
490 attempts,
491 },
492 );
493
494 tracing::warn!(peer = %peer, reason = reason, "Forced fallback to relay");
495 }
496
497 pub async fn reset(&self, peer: SocketAddr) {
499 let mut states = self.states.write().await;
500 let mut candidates = self.candidates.write().await;
501
502 states.remove(&peer);
503 candidates.remove(&peer);
504 }
505
506 pub async fn migrating_peers(&self) -> Vec<SocketAddr> {
508 let states = self.states.read().await;
509 states
510 .iter()
511 .filter(|(_, state)| state.is_migrating())
512 .map(|(peer, _)| *peer)
513 .collect()
514 }
515
516 pub async fn direct_peers(&self) -> Vec<SocketAddr> {
518 let states = self.states.read().await;
519 states
520 .iter()
521 .filter(|(_, state)| state.is_direct())
522 .map(|(peer, _)| *peer)
523 .collect()
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530 use std::net::{IpAddr, Ipv4Addr};
531
532 fn peer_addr(id: u8) -> SocketAddr {
533 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, id)), 9000)
534 }
535
536 fn candidate_addr(id: u8) -> SocketAddr {
537 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, id)), 9001)
538 }
539
540 #[tokio::test]
541 async fn test_coordinator_creation() {
542 let config = MigrationConfig::default();
543 let coordinator = MigrationCoordinator::new(config);
544
545 let state = coordinator.state(peer_addr(1)).await;
546 assert!(matches!(state, MigrationState::RelayOnly));
547 }
548
549 #[tokio::test]
550 async fn test_add_candidates() {
551 let config = MigrationConfig::default();
552 let coordinator = MigrationCoordinator::new(config);
553
554 let peer = peer_addr(1);
555 let candidates = vec![candidate_addr(1), candidate_addr(2)];
556
557 coordinator.add_candidates(peer, candidates.clone()).await;
558
559 let stored = coordinator.candidates.read().await;
560 let peer_candidates = stored.get(&peer).unwrap();
561 assert_eq!(peer_candidates.len(), 2);
562 }
563
564 #[tokio::test]
565 async fn test_start_migration() {
566 let config = MigrationConfig {
567 initial_delay: Duration::from_millis(1),
568 ..Default::default()
569 };
570 let coordinator = MigrationCoordinator::new(config);
571
572 let peer = peer_addr(1);
573 coordinator.start_migration(peer).await;
574
575 let state = coordinator.state(peer).await;
576 assert!(matches!(state, MigrationState::WaitingToProbe { .. }));
577 }
578
579 #[tokio::test]
580 async fn test_begin_probing_no_candidates() {
581 let config = MigrationConfig {
582 initial_delay: Duration::from_millis(1),
583 ..Default::default()
584 };
585 let coordinator = MigrationCoordinator::new(config);
586
587 let peer = peer_addr(1);
588 coordinator.start_migration(peer).await;
589
590 tokio::time::sleep(Duration::from_millis(10)).await;
592
593 let state = coordinator.poll(peer).await;
595 assert!(matches!(state, MigrationState::FallbackToRelay { .. }));
596 }
597
598 #[tokio::test]
599 async fn test_begin_probing_with_candidates() {
600 let config = MigrationConfig {
601 initial_delay: Duration::from_millis(1),
602 validation_timeout: Duration::from_secs(10),
603 ..Default::default()
604 };
605 let coordinator = MigrationCoordinator::new(config);
606
607 let peer = peer_addr(1);
608 let candidates = vec![candidate_addr(1), candidate_addr(2)];
609 coordinator.add_candidates(peer, candidates).await;
610 coordinator.start_migration(peer).await;
611
612 tokio::time::sleep(Duration::from_millis(10)).await;
614
615 let state = coordinator.poll(peer).await;
617 assert!(matches!(state, MigrationState::ProbeInProgress { .. }));
618 }
619
620 #[tokio::test]
621 async fn test_report_validated_path() {
622 let config = MigrationConfig {
623 initial_delay: Duration::from_millis(1),
624 validation_timeout: Duration::from_secs(10),
625 ..Default::default()
626 };
627 let coordinator = MigrationCoordinator::new(config);
628
629 let peer = peer_addr(1);
630 let candidate = candidate_addr(1);
631 coordinator.add_candidates(peer, vec![candidate]).await;
632 coordinator.start_migration(peer).await;
633
634 tokio::time::sleep(Duration::from_millis(10)).await;
635 coordinator.poll(peer).await;
636
637 coordinator
639 .report_validated_path(peer, candidate, Duration::from_millis(50))
640 .await;
641
642 let state = coordinator.state(peer).await;
643 assert!(matches!(state, MigrationState::MigrationPending { .. }));
644 }
645
646 #[tokio::test]
647 async fn test_complete_migration() {
648 let config = MigrationConfig {
649 initial_delay: Duration::from_millis(1),
650 validation_timeout: Duration::from_secs(10),
651 ..Default::default()
652 };
653 let coordinator = MigrationCoordinator::new(config);
654
655 let peer = peer_addr(1);
656 let candidate = candidate_addr(1);
657 coordinator.add_candidates(peer, vec![candidate]).await;
658 coordinator.start_migration(peer).await;
659
660 tokio::time::sleep(Duration::from_millis(10)).await;
661 coordinator.poll(peer).await;
662
663 coordinator
664 .report_validated_path(peer, candidate, Duration::from_millis(50))
665 .await;
666 coordinator.complete_migration(peer).await;
667
668 let state = coordinator.state(peer).await;
669 assert!(matches!(state, MigrationState::DirectEstablished { .. }));
670 assert!(state.is_direct());
671 assert!(!state.is_relayed());
672 }
673
674 #[tokio::test]
675 async fn test_fallback_to_relay() {
676 let config = MigrationConfig::default();
677 let coordinator = MigrationCoordinator::new(config);
678
679 let peer = peer_addr(1);
680 coordinator.fallback_to_relay(peer, "Test fallback").await;
681
682 let state = coordinator.state(peer).await;
683 assert!(matches!(state, MigrationState::FallbackToRelay { .. }));
684 }
685
686 #[tokio::test]
687 async fn test_reset() {
688 let config = MigrationConfig::default();
689 let coordinator = MigrationCoordinator::new(config);
690
691 let peer = peer_addr(1);
692 coordinator
693 .add_candidates(peer, vec![candidate_addr(1)])
694 .await;
695 coordinator.start_migration(peer).await;
696 coordinator.reset(peer).await;
697
698 let state = coordinator.state(peer).await;
699 assert!(matches!(state, MigrationState::RelayOnly));
700
701 let candidates = coordinator.candidates.read().await;
702 assert!(candidates.get(&peer).is_none());
703 }
704
705 #[tokio::test]
706 async fn test_stats() {
707 let config = MigrationConfig::default();
708 let coordinator = MigrationCoordinator::new(config);
709
710 let stats = coordinator.stats();
711 stats.record_attempt(true, Duration::from_millis(100));
712 stats.record_attempt(true, Duration::from_millis(200));
713 stats.record_attempt(false, Duration::from_millis(150));
714
715 assert_eq!(stats.attempts.load(Ordering::Relaxed), 3);
716 assert_eq!(stats.successful.load(Ordering::Relaxed), 2);
717 assert_eq!(stats.failed.load(Ordering::Relaxed), 1);
718 assert!((stats.success_rate() - 66.67).abs() < 1.0);
719 }
720
721 #[tokio::test]
722 async fn test_migrating_and_direct_peers() {
723 let config = MigrationConfig {
724 initial_delay: Duration::from_millis(1),
725 validation_timeout: Duration::from_secs(10),
726 ..Default::default()
727 };
728 let coordinator = MigrationCoordinator::new(config);
729
730 let peer1 = peer_addr(1);
731 let peer2 = peer_addr(2);
732 let candidate = candidate_addr(1);
733
734 coordinator.add_candidates(peer1, vec![candidate]).await;
736 coordinator.start_migration(peer1).await;
737
738 coordinator.add_candidates(peer2, vec![candidate]).await;
740 coordinator.start_migration(peer2).await;
741 tokio::time::sleep(Duration::from_millis(10)).await;
742 coordinator.poll(peer2).await;
743 coordinator
744 .report_validated_path(peer2, candidate, Duration::from_millis(50))
745 .await;
746 coordinator.complete_migration(peer2).await;
747
748 let migrating = coordinator.migrating_peers().await;
749 let direct = coordinator.direct_peers().await;
750
751 assert!(migrating.contains(&peer1));
752 assert!(!migrating.contains(&peer2));
753 assert!(direct.contains(&peer2));
754 assert!(!direct.contains(&peer1));
755 }
756}