1use dashmap::DashMap;
23use libp2p::{Multiaddr, PeerId};
24use parking_lot::RwLock;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use thiserror::Error;
28use tracing::{debug, info, warn};
29
30#[derive(Debug, Error)]
32pub enum MigrationError {
33 #[error("No active connection to migrate")]
34 NoActiveConnection,
35
36 #[error("Migration already in progress")]
37 MigrationInProgress,
38
39 #[error("Migration failed: {0}")]
40 MigrationFailed(String),
41
42 #[error("Timeout during migration")]
43 MigrationTimeout,
44
45 #[error("Invalid migration state")]
46 InvalidState,
47
48 #[error("No suitable migration path available")]
49 NoMigrationPath,
50}
51
52#[derive(Debug, Clone)]
54pub struct MigrationConfig {
55 pub auto_migrate: bool,
57
58 pub migration_timeout: Duration,
60
61 pub max_retry_attempts: usize,
63
64 pub retry_backoff: Duration,
66
67 pub migration_cooldown: Duration,
69
70 pub keep_old_path: bool,
72
73 pub validate_new_path: bool,
75}
76
77impl Default for MigrationConfig {
78 fn default() -> Self {
79 Self {
80 auto_migrate: true,
81 migration_timeout: Duration::from_secs(30),
82 max_retry_attempts: 3,
83 retry_backoff: Duration::from_secs(2),
84 migration_cooldown: Duration::from_secs(10),
85 keep_old_path: true,
86 validate_new_path: true,
87 }
88 }
89}
90
91impl MigrationConfig {
92 pub fn mobile() -> Self {
97 Self {
98 auto_migrate: true,
99 migration_timeout: Duration::from_secs(15),
100 max_retry_attempts: 5,
101 retry_backoff: Duration::from_millis(500),
102 migration_cooldown: Duration::from_secs(5),
103 keep_old_path: true,
104 validate_new_path: true,
105 }
106 }
107
108 pub fn conservative() -> Self {
113 Self {
114 auto_migrate: false,
115 migration_timeout: Duration::from_secs(60),
116 max_retry_attempts: 2,
117 retry_backoff: Duration::from_secs(5),
118 migration_cooldown: Duration::from_secs(30),
119 keep_old_path: true,
120 validate_new_path: true,
121 }
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
127pub enum MigrationState {
128 Idle,
130 Initiated,
132 Validating,
134 Migrating,
136 Completed,
138 Failed,
140}
141
142#[derive(Debug, Clone)]
144pub struct MigrationAttempt {
145 pub peer_id: PeerId,
147 pub old_address: Multiaddr,
149 pub new_address: Multiaddr,
151 pub state: MigrationState,
153 pub started_at: Instant,
155 pub retry_count: usize,
157 pub error: Option<String>,
159}
160
161#[derive(Debug, Clone, Default)]
163pub struct MigrationStats {
164 pub total_attempts: usize,
166 pub successful_migrations: usize,
168 pub failed_migrations: usize,
170 pub in_progress: usize,
172 pub avg_duration_ms: u64,
174 pub total_retries: usize,
176}
177
178pub struct ConnectionMigrationManager {
180 config: MigrationConfig,
182 active_migrations: Arc<DashMap<PeerId, MigrationAttempt>>,
184 last_migration: Arc<DashMap<PeerId, Instant>>,
186 stats: Arc<RwLock<MigrationStats>>,
188 migration_durations: Arc<RwLock<Vec<u64>>>,
190}
191
192impl ConnectionMigrationManager {
193 pub fn new(config: MigrationConfig) -> Self {
195 Self {
196 config,
197 active_migrations: Arc::new(DashMap::new()),
198 last_migration: Arc::new(DashMap::new()),
199 stats: Arc::new(RwLock::new(MigrationStats::default())),
200 migration_durations: Arc::new(RwLock::new(Vec::new())),
201 }
202 }
203
204 pub fn mobile() -> Self {
206 Self::new(MigrationConfig::mobile())
207 }
208
209 pub fn conservative() -> Self {
211 Self::new(MigrationConfig::conservative())
212 }
213
214 pub fn initiate_migration(
226 &self,
227 peer_id: PeerId,
228 old_address: Multiaddr,
229 new_address: Multiaddr,
230 ) -> Result<(), MigrationError> {
231 if self.active_migrations.contains_key(&peer_id) {
233 return Err(MigrationError::MigrationInProgress);
234 }
235
236 if let Some(last) = self.last_migration.get(&peer_id) {
238 if last.elapsed() < self.config.migration_cooldown {
239 debug!(
240 "Migration cooldown active for peer {} ({:?} remaining)",
241 peer_id,
242 self.config.migration_cooldown - last.elapsed()
243 );
244 return Err(MigrationError::InvalidState);
245 }
246 }
247
248 info!(
249 "Initiating connection migration for peer {} from {} to {}",
250 peer_id, old_address, new_address
251 );
252
253 let attempt = MigrationAttempt {
255 peer_id,
256 old_address,
257 new_address,
258 state: MigrationState::Initiated,
259 started_at: Instant::now(),
260 retry_count: 0,
261 error: None,
262 };
263
264 self.active_migrations.insert(peer_id, attempt);
266
267 let mut stats = self.stats.write();
269 stats.total_attempts += 1;
270 stats.in_progress += 1;
271
272 Ok(())
273 }
274
275 pub fn update_migration_state(
277 &self,
278 peer_id: &PeerId,
279 new_state: MigrationState,
280 ) -> Result<(), MigrationError> {
281 if let Some(mut attempt) = self.active_migrations.get_mut(peer_id) {
282 debug!(
283 "Migration state change for peer {}: {:?} -> {:?}",
284 peer_id, attempt.state, new_state
285 );
286 attempt.state = new_state;
287 Ok(())
288 } else {
289 Err(MigrationError::NoActiveConnection)
290 }
291 }
292
293 pub fn complete_migration(&self, peer_id: &PeerId) -> Result<(), MigrationError> {
295 if let Some((_, attempt)) = self.active_migrations.remove(peer_id) {
296 let duration_ms = attempt.started_at.elapsed().as_millis() as u64;
297
298 info!(
299 "Migration completed for peer {} in {} ms",
300 peer_id, duration_ms
301 );
302
303 self.last_migration.insert(*peer_id, Instant::now());
305
306 {
308 let mut stats = self.stats.write();
309 stats.successful_migrations += 1;
310 stats.in_progress = stats.in_progress.saturating_sub(1);
311 stats.total_retries += attempt.retry_count;
312 }
313
314 {
316 let mut durations = self.migration_durations.write();
317 durations.push(duration_ms);
318
319 if durations.len() > 100 {
321 durations.remove(0);
322 }
323
324 let avg = durations.iter().sum::<u64>() / durations.len() as u64;
326 self.stats.write().avg_duration_ms = avg;
327 }
328
329 Ok(())
330 } else {
331 Err(MigrationError::NoActiveConnection)
332 }
333 }
334
335 pub fn fail_migration(&self, peer_id: &PeerId, error: String) -> Result<(), MigrationError> {
337 if let Some((_, mut attempt)) = self.active_migrations.remove(peer_id) {
338 warn!("Migration failed for peer {}: {}", peer_id, error);
339
340 attempt.error = Some(error);
341 attempt.state = MigrationState::Failed;
342
343 let mut stats = self.stats.write();
345 stats.failed_migrations += 1;
346 stats.in_progress = stats.in_progress.saturating_sub(1);
347 stats.total_retries += attempt.retry_count;
348
349 Ok(())
350 } else {
351 Err(MigrationError::NoActiveConnection)
352 }
353 }
354
355 pub fn retry_migration(&self, peer_id: &PeerId) -> Result<(), MigrationError> {
357 if let Some(mut attempt) = self.active_migrations.get_mut(peer_id) {
358 if attempt.retry_count >= self.config.max_retry_attempts {
359 return Err(MigrationError::MigrationFailed(
360 "Max retry attempts reached".to_string(),
361 ));
362 }
363
364 attempt.retry_count += 1;
365 attempt.state = MigrationState::Initiated;
366 attempt.error = None;
367
368 info!(
369 "Retrying migration for peer {} (attempt {})",
370 peer_id,
371 attempt.retry_count + 1
372 );
373
374 Ok(())
375 } else {
376 Err(MigrationError::NoActiveConnection)
377 }
378 }
379
380 pub fn is_migrating(&self, peer_id: &PeerId) -> bool {
382 self.active_migrations.contains_key(peer_id)
383 }
384
385 pub fn get_migration_state(&self, peer_id: &PeerId) -> Option<MigrationState> {
387 self.active_migrations
388 .get(peer_id)
389 .map(|attempt| attempt.state)
390 }
391
392 pub fn get_active_migrations(&self) -> Vec<MigrationAttempt> {
394 self.active_migrations
395 .iter()
396 .map(|entry| entry.value().clone())
397 .collect()
398 }
399
400 pub fn stats(&self) -> MigrationStats {
402 self.stats.read().clone()
403 }
404
405 pub fn can_migrate(&self, peer_id: &PeerId) -> bool {
407 if self.active_migrations.contains_key(peer_id) {
408 return false;
409 }
410
411 if let Some(last) = self.last_migration.get(peer_id) {
412 last.elapsed() >= self.config.migration_cooldown
413 } else {
414 true
415 }
416 }
417
418 pub fn config(&self) -> &MigrationConfig {
420 &self.config
421 }
422
423 pub fn cleanup_timeouts(&self) {
425 let timeout = self.config.migration_timeout;
426 let mut timed_out = Vec::new();
427
428 for entry in self.active_migrations.iter() {
429 if entry.value().started_at.elapsed() > timeout {
430 timed_out.push(*entry.key());
431 }
432 }
433
434 for peer_id in timed_out {
435 self.fail_migration(&peer_id, "Migration timeout".to_string())
436 .ok();
437 }
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use std::str::FromStr;
445
446 fn test_peer_id() -> PeerId {
447 PeerId::random()
448 }
449
450 fn test_addr() -> Multiaddr {
451 Multiaddr::from_str("/ip4/127.0.0.1/tcp/4001").unwrap()
452 }
453
454 fn test_addr2() -> Multiaddr {
455 Multiaddr::from_str("/ip4/192.168.1.1/tcp/4001").unwrap()
456 }
457
458 #[test]
459 fn test_migration_config_default() {
460 let config = MigrationConfig::default();
461 assert!(config.auto_migrate);
462 assert!(config.keep_old_path);
463 assert!(config.validate_new_path);
464 }
465
466 #[test]
467 fn test_migration_config_mobile() {
468 let config = MigrationConfig::mobile();
469 assert!(config.auto_migrate);
470 assert_eq!(config.max_retry_attempts, 5);
471 assert_eq!(config.migration_cooldown, Duration::from_secs(5));
472 }
473
474 #[test]
475 fn test_migration_config_conservative() {
476 let config = MigrationConfig::conservative();
477 assert!(!config.auto_migrate);
478 assert_eq!(config.max_retry_attempts, 2);
479 assert_eq!(config.migration_cooldown, Duration::from_secs(30));
480 }
481
482 #[test]
483 fn test_initiate_migration() {
484 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
485 let peer = test_peer_id();
486 let old_addr = test_addr();
487 let new_addr = test_addr2();
488
489 let result = manager.initiate_migration(peer, old_addr, new_addr);
490 assert!(result.is_ok());
491
492 let stats = manager.stats();
493 assert_eq!(stats.total_attempts, 1);
494 assert_eq!(stats.in_progress, 1);
495 }
496
497 #[test]
498 fn test_migration_in_progress_error() {
499 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
500 let peer = test_peer_id();
501 let old_addr = test_addr();
502 let new_addr = test_addr2();
503
504 manager
505 .initiate_migration(peer, old_addr.clone(), new_addr.clone())
506 .unwrap();
507
508 let result = manager.initiate_migration(peer, old_addr, new_addr);
509 assert!(matches!(result, Err(MigrationError::MigrationInProgress)));
510 }
511
512 #[test]
513 fn test_complete_migration() {
514 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
515 let peer = test_peer_id();
516
517 manager
518 .initiate_migration(peer, test_addr(), test_addr2())
519 .unwrap();
520 let result = manager.complete_migration(&peer);
521
522 assert!(result.is_ok());
523
524 let stats = manager.stats();
525 assert_eq!(stats.successful_migrations, 1);
526 assert_eq!(stats.in_progress, 0);
527 }
528
529 #[test]
530 fn test_fail_migration() {
531 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
532 let peer = test_peer_id();
533
534 manager
535 .initiate_migration(peer, test_addr(), test_addr2())
536 .unwrap();
537 let result = manager.fail_migration(&peer, "Test error".to_string());
538
539 assert!(result.is_ok());
540
541 let stats = manager.stats();
542 assert_eq!(stats.failed_migrations, 1);
543 assert_eq!(stats.in_progress, 0);
544 }
545
546 #[test]
547 fn test_retry_migration() {
548 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
549 let peer = test_peer_id();
550
551 manager
552 .initiate_migration(peer, test_addr(), test_addr2())
553 .unwrap();
554
555 let result = manager.retry_migration(&peer);
556 assert!(result.is_ok());
557
558 let attempt = manager.active_migrations.get(&peer).unwrap();
559 assert_eq!(attempt.retry_count, 1);
560 }
561
562 #[test]
563 fn test_retry_limit() {
564 let config = MigrationConfig {
565 max_retry_attempts: 2,
566 ..Default::default()
567 };
568 let manager = ConnectionMigrationManager::new(config);
569 let peer = test_peer_id();
570
571 manager
572 .initiate_migration(peer, test_addr(), test_addr2())
573 .unwrap();
574
575 assert!(manager.retry_migration(&peer).is_ok());
577 assert!(manager.retry_migration(&peer).is_ok());
579 assert!(matches!(
581 manager.retry_migration(&peer),
582 Err(MigrationError::MigrationFailed(_))
583 ));
584 }
585
586 #[test]
587 fn test_is_migrating() {
588 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
589 let peer = test_peer_id();
590
591 assert!(!manager.is_migrating(&peer));
592
593 manager
594 .initiate_migration(peer, test_addr(), test_addr2())
595 .unwrap();
596 assert!(manager.is_migrating(&peer));
597
598 manager.complete_migration(&peer).unwrap();
599 assert!(!manager.is_migrating(&peer));
600 }
601
602 #[test]
603 fn test_migration_state_updates() {
604 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
605 let peer = test_peer_id();
606
607 manager
608 .initiate_migration(peer, test_addr(), test_addr2())
609 .unwrap();
610
611 assert_eq!(
612 manager.get_migration_state(&peer),
613 Some(MigrationState::Initiated)
614 );
615
616 manager
617 .update_migration_state(&peer, MigrationState::Validating)
618 .unwrap();
619 assert_eq!(
620 manager.get_migration_state(&peer),
621 Some(MigrationState::Validating)
622 );
623
624 manager
625 .update_migration_state(&peer, MigrationState::Migrating)
626 .unwrap();
627 assert_eq!(
628 manager.get_migration_state(&peer),
629 Some(MigrationState::Migrating)
630 );
631 }
632
633 #[test]
634 fn test_can_migrate() {
635 let config = MigrationConfig {
636 migration_cooldown: Duration::from_millis(100),
637 ..Default::default()
638 };
639 let manager = ConnectionMigrationManager::new(config);
640 let peer = test_peer_id();
641
642 assert!(manager.can_migrate(&peer));
643
644 manager
645 .initiate_migration(peer, test_addr(), test_addr2())
646 .unwrap();
647 assert!(!manager.can_migrate(&peer));
648
649 manager.complete_migration(&peer).unwrap();
650 assert!(!manager.can_migrate(&peer)); std::thread::sleep(Duration::from_millis(150));
653 assert!(manager.can_migrate(&peer)); }
655
656 #[test]
657 fn test_get_active_migrations() {
658 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
659 let peer1 = test_peer_id();
660 let peer2 = test_peer_id();
661
662 manager
663 .initiate_migration(peer1, test_addr(), test_addr2())
664 .unwrap();
665 manager
666 .initiate_migration(peer2, test_addr(), test_addr2())
667 .unwrap();
668
669 let active = manager.get_active_migrations();
670 assert_eq!(active.len(), 2);
671 }
672
673 #[test]
674 fn test_average_duration_calculation() {
675 let manager = ConnectionMigrationManager::new(MigrationConfig::default());
676 let peer1 = test_peer_id();
677 let peer2 = test_peer_id();
678
679 manager
680 .initiate_migration(peer1, test_addr(), test_addr2())
681 .unwrap();
682 std::thread::sleep(Duration::from_millis(10));
683 manager.complete_migration(&peer1).unwrap();
684
685 manager
686 .initiate_migration(peer2, test_addr(), test_addr2())
687 .unwrap();
688 std::thread::sleep(Duration::from_millis(10));
689 manager.complete_migration(&peer2).unwrap();
690
691 let stats = manager.stats();
692 assert!(stats.avg_duration_ms > 0);
693 }
694}