1use std::{
8 collections::{HashMap, VecDeque},
9 future::Future,
10 pin::Pin,
11 sync::Arc,
12 time::{Duration, Instant},
13};
14
15use tokio::{
16 sync::RwLock,
17 time::{sleep, timeout},
18};
19use tracing::{debug, info, warn};
20
21use crate::{
22 monitoring::{ErrorCategory, MonitoringError},
23 nat_traversal_api::{NatTraversalError, PeerId},
24};
25
26pub struct ErrorRecoveryManager {
28 config: RecoveryConfig,
30 recovery_sessions: Arc<RwLock<HashMap<String, RecoverySession>>>,
32 retry_policies: HashMap<ErrorCategory, RetryPolicy>,
34 fallback_strategies: Vec<FallbackStrategy>,
36 circuit_breaker: Arc<CircuitBreaker>,
38 migration_handler: Arc<ConnectionMigrationHandler>,
40 cleanup_manager: Arc<ResourceCleanupManager>,
42}
43
44#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
46pub struct RecoveryConfig {
47 pub enable_auto_retry: bool,
49 pub max_concurrent_recoveries: usize,
51 pub default_retry_policy: RetryPolicy,
53 pub enable_circuit_breaker: bool,
55 pub circuit_breaker_config: CircuitBreakerConfig,
57 pub enable_connection_migration: bool,
59 pub cleanup_interval: Duration,
61 pub recovery_timeout: Duration,
63}
64
65impl Default for RecoveryConfig {
66 fn default() -> Self {
67 Self {
68 enable_auto_retry: true,
69 max_concurrent_recoveries: 10,
70 default_retry_policy: RetryPolicy::default(),
71 enable_circuit_breaker: true,
72 circuit_breaker_config: CircuitBreakerConfig::default(),
73 enable_connection_migration: true,
74 cleanup_interval: Duration::from_secs(60),
75 recovery_timeout: Duration::from_secs(300), }
77 }
78}
79
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
82pub struct RetryPolicy {
83 pub max_attempts: u32,
85 pub initial_delay: Duration,
87 pub max_delay: Duration,
89 pub backoff_multiplier: f64,
91 pub enable_jitter: bool,
93 pub jitter_factor: f64,
95 pub attempt_timeout: Duration,
97}
98
99impl Default for RetryPolicy {
100 fn default() -> Self {
101 Self {
102 max_attempts: 3,
103 initial_delay: Duration::from_millis(500),
104 max_delay: Duration::from_secs(30),
105 backoff_multiplier: 2.0,
106 enable_jitter: true,
107 jitter_factor: 0.1,
108 attempt_timeout: Duration::from_secs(10),
109 }
110 }
111}
112
113#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
115pub struct CircuitBreakerConfig {
116 pub failure_threshold: u32,
118 pub success_threshold: u32,
120 pub timeout: Duration,
122 pub window_size: Duration,
124}
125
126impl Default for CircuitBreakerConfig {
127 fn default() -> Self {
128 Self {
129 failure_threshold: 5,
130 success_threshold: 3,
131 timeout: Duration::from_secs(60),
132 window_size: Duration::from_secs(300), }
134 }
135}
136
137#[derive(Debug, Clone)]
139struct RecoverySession {
140 session_id: String,
142 peer_id: PeerId,
144 original_error: NatTraversalError,
146 current_attempt: u32,
148 start_time: Instant,
150 last_attempt_time: Option<Instant>,
152 current_strategy: RecoveryStrategy,
154 remaining_strategies: Vec<FallbackStrategy>,
156 state: RecoveryState,
158}
159
160#[derive(Debug, Clone)]
162pub enum RecoveryStrategy {
163 Retry {
165 policy: RetryPolicy,
166 },
167 AlternativeBootstrap {
169 nodes: Vec<std::net::SocketAddr>,
170 },
171 RelayFallback {
173 relay_servers: Vec<std::net::SocketAddr>,
174 },
175 ConnectionMigration {
177 new_path: std::net::SocketAddr,
178 },
179 GracefulDegradation {
181 reduced_functionality: bool,
182 },
183}
184
185#[derive(Debug, Clone)]
187pub enum FallbackStrategy {
188 AlternativeNatMethod,
190 RelayServers,
192 DirectConnection,
194 ReducedRequirements,
196 ManualIntervention,
198}
199
200#[derive(Debug, Clone, PartialEq)]
202enum RecoveryState {
203 InProgress,
205 Succeeded,
207 Failed,
209 Cancelled,
211 WaitingRetry,
213}
214
215#[derive(Debug)]
217struct CircuitBreaker {
218 config: CircuitBreakerConfig,
220 state: RwLock<CircuitBreakerState>,
222 failure_count: RwLock<u32>,
224 success_count: RwLock<u32>,
226 last_state_change: RwLock<Instant>,
228 failure_history: RwLock<VecDeque<Instant>>,
230}
231
232#[derive(Debug, Clone, Copy, PartialEq)]
234enum CircuitBreakerState {
235 Closed,
237 Open,
239 HalfOpen,
241}
242
243#[derive(Debug)]
245struct ConnectionMigrationHandler {
246 active_migrations: RwLock<HashMap<PeerId, MigrationSession>>,
248 network_detector: NetworkChangeDetector,
250}
251
252#[derive(Debug)]
254struct MigrationSession {
255 peer_id: PeerId,
257 old_path: std::net::SocketAddr,
259 new_path: std::net::SocketAddr,
261 start_time: Instant,
263 state: MigrationState,
265}
266
267#[derive(Debug, Clone, PartialEq)]
269enum MigrationState {
270 Detecting,
272 Preparing,
274 Migrating,
276 Validating,
278 Completed,
280 Failed,
282}
283
284#[derive(Debug)]
286struct NetworkChangeDetector {
287 last_network_state: RwLock<NetworkState>,
289 detection_interval: Duration,
291}
292
293#[derive(Debug, Clone, PartialEq)]
295struct NetworkState {
296 interfaces: HashMap<String, InterfaceState>,
298 default_route: Option<std::net::SocketAddr>,
300 dns_servers: Vec<std::net::IpAddr>,
302}
303
304#[derive(Debug, Clone, PartialEq)]
306struct InterfaceState {
307 name: String,
309 status: String,
311 addresses: Vec<std::net::IpAddr>,
313}
314
315#[derive(Debug)]
317struct ResourceCleanupManager {
318 cleanup_tasks: RwLock<Vec<CleanupTask>>,
320 interval: Duration,
322}
323
324#[derive(Debug)]
326struct CleanupTask {
327 task_id: String,
329 resource: CleanupResource,
331 cleanup_time: Instant,
333}
334
335#[derive(Debug)]
337enum CleanupResource {
338 Connection { peer_id: PeerId },
340 Session { session_id: String },
342 TempFiles { paths: Vec<std::path::PathBuf> },
344 MemoryBuffers { buffer_ids: Vec<String> },
346}
347
348impl ErrorRecoveryManager {
349 pub async fn new(config: RecoveryConfig) -> Result<Self, MonitoringError> {
351 let recovery_sessions = Arc::new(RwLock::new(HashMap::new()));
352
353 let mut retry_policies = HashMap::new();
355 retry_policies.insert(ErrorCategory::NetworkConnectivity, RetryPolicy {
356 max_attempts: 5,
357 initial_delay: Duration::from_secs(1),
358 max_delay: Duration::from_secs(60),
359 backoff_multiplier: 2.0,
360 enable_jitter: true,
361 jitter_factor: 0.2,
362 attempt_timeout: Duration::from_secs(15),
363 });
364
365 retry_policies.insert(ErrorCategory::NatTraversal, RetryPolicy {
366 max_attempts: 3,
367 initial_delay: Duration::from_secs(2),
368 max_delay: Duration::from_secs(30),
369 backoff_multiplier: 1.5,
370 enable_jitter: true,
371 jitter_factor: 0.1,
372 attempt_timeout: Duration::from_secs(20),
373 });
374
375 retry_policies.insert(ErrorCategory::Timeout, RetryPolicy {
376 max_attempts: 4,
377 initial_delay: Duration::from_millis(500),
378 max_delay: Duration::from_secs(45),
379 backoff_multiplier: 2.5,
380 enable_jitter: true,
381 jitter_factor: 0.15,
382 attempt_timeout: Duration::from_secs(25),
383 });
384
385 let fallback_strategies = vec![
387 FallbackStrategy::AlternativeNatMethod,
388 FallbackStrategy::RelayServers,
389 FallbackStrategy::DirectConnection,
390 FallbackStrategy::ReducedRequirements,
391 ];
392
393 let circuit_breaker = Arc::new(CircuitBreaker::new(config.circuit_breaker_config.clone()));
394 let migration_handler = Arc::new(ConnectionMigrationHandler::new().await?);
395 let cleanup_manager = Arc::new(ResourceCleanupManager::new(config.cleanup_interval));
396
397 Ok(Self {
398 config,
399 recovery_sessions,
400 retry_policies,
401 fallback_strategies,
402 circuit_breaker,
403 migration_handler,
404 cleanup_manager,
405 })
406 }
407
408 pub async fn start(&self) -> Result<(), MonitoringError> {
410 info!("Starting error recovery manager");
411
412 self.start_cleanup_task().await?;
414 self.start_migration_monitoring().await?;
415 self.start_circuit_breaker_monitoring().await?;
416
417 info!("Error recovery manager started");
418 Ok(())
419 }
420
421 pub async fn stop(&self) -> Result<(), MonitoringError> {
423 info!("Stopping error recovery manager");
424
425 let mut sessions = self.recovery_sessions.write().await;
427 for (_, session) in sessions.iter_mut() {
428 session.state = RecoveryState::Cancelled;
429 }
430 sessions.clear();
431
432 info!("Error recovery manager stopped");
433 Ok(())
434 }
435
436 pub async fn initiate_recovery(
438 &self,
439 peer_id: PeerId,
440 error: NatTraversalError,
441 ) -> Result<String, MonitoringError> {
442 if !self.circuit_breaker.allow_request().await {
444 return Err(MonitoringError::SystemError(
445 "Circuit breaker is open, recovery not allowed".to_string()
446 ));
447 }
448
449 let sessions = self.recovery_sessions.read().await;
451 if sessions.len() >= self.config.max_concurrent_recoveries {
452 return Err(MonitoringError::SystemError(
453 "Maximum concurrent recoveries reached".to_string()
454 ));
455 }
456 drop(sessions);
457
458 let session_id = uuid::Uuid::new_v4().to_string();
459
460 info!("Initiating error recovery for peer {:?} (session: {})", peer_id, session_id);
461
462 let strategy = self.determine_recovery_strategy(&error).await;
464 let remaining_strategies = self.get_fallback_strategies(&error).await;
465
466 let session = RecoverySession {
467 session_id: session_id.clone(),
468 peer_id,
469 original_error: error,
470 current_attempt: 0,
471 start_time: Instant::now(),
472 last_attempt_time: None,
473 current_strategy: strategy,
474 remaining_strategies,
475 state: RecoveryState::InProgress,
476 };
477
478 let mut sessions = self.recovery_sessions.write().await;
480 sessions.insert(session_id.clone(), session);
481 drop(sessions);
482
483 self.execute_recovery_strategy(session_id.clone()).await?;
485
486 Ok(session_id)
487 }
488
489 fn execute_recovery_strategy(&self, session_id: String) -> Pin<Box<dyn Future<Output = Result<(), MonitoringError>> + Send + '_>> {
491 Box::pin(async move {
492 let session = {
493 let sessions = self.recovery_sessions.read().await;
494 sessions.get(&session_id).cloned()
495 };
496
497 let mut session = session.ok_or_else(|| {
498 MonitoringError::SystemError("Recovery session not found".to_string())
499 })?;
500
501 let strategy = session.current_strategy.clone();
503
504 match strategy {
505 RecoveryStrategy::Retry { policy } => {
506 self.execute_retry_strategy(&mut session, policy).await?;
507 }
508 RecoveryStrategy::AlternativeBootstrap { nodes } => {
509 self.execute_alternative_bootstrap(&mut session, nodes).await?;
510 }
511 RecoveryStrategy::RelayFallback { relay_servers } => {
512 self.execute_relay_fallback(&mut session, relay_servers).await?;
513 }
514 RecoveryStrategy::ConnectionMigration { new_path } => {
515 self.execute_connection_migration(&mut session, new_path).await?;
516 }
517 RecoveryStrategy::GracefulDegradation { reduced_functionality } => {
518 self.execute_graceful_degradation(&mut session, reduced_functionality).await?;
519 }
520 }
521
522 let mut sessions = self.recovery_sessions.write().await;
524 sessions.insert(session_id, session);
525
526 Ok(())
527 })
528 }
529
530 async fn execute_retry_strategy(
532 &self,
533 session: &mut RecoverySession,
534 policy: RetryPolicy,
535 ) -> Result<(), MonitoringError> {
536 session.current_attempt += 1;
537
538 if session.current_attempt > policy.max_attempts {
539 warn!("Maximum retry attempts reached for session {}", session.session_id);
540 session.state = RecoveryState::Failed;
541 return self.try_next_fallback_strategy(session).await;
542 }
543
544 let delay = self.calculate_retry_delay(&policy, session.current_attempt);
546
547 info!(
548 "Retrying recovery for session {} (attempt {}/{}) after {:?}",
549 session.session_id, session.current_attempt, policy.max_attempts, delay
550 );
551
552 session.state = RecoveryState::WaitingRetry;
553 session.last_attempt_time = Some(Instant::now());
554
555 sleep(delay).await;
557
558 let recovery_result = timeout(
560 policy.attempt_timeout,
561 self.attempt_connection_recovery(session.peer_id)
562 ).await;
563
564 match recovery_result {
565 Ok(Ok(())) => {
566 info!("Recovery succeeded for session {}", session.session_id);
567 session.state = RecoveryState::Succeeded;
568 self.circuit_breaker.record_success().await;
569 }
570 Ok(Err(e)) => {
571 warn!("Recovery attempt failed for session {}: {:?}", session.session_id, e);
572 self.circuit_breaker.record_failure().await;
573 }
575 Err(_) => {
576 warn!("Recovery attempt timed out for session {}", session.session_id);
577 self.circuit_breaker.record_failure().await;
578 }
580 }
581
582 Ok(())
583 }
584
585 async fn execute_alternative_bootstrap(
587 &self,
588 session: &mut RecoverySession,
589 nodes: Vec<std::net::SocketAddr>,
590 ) -> Result<(), MonitoringError> {
591 info!("Trying alternative bootstrap nodes for session {}", session.session_id);
592
593 for node in nodes {
594 debug!("Attempting connection via bootstrap node: {}", node);
595
596 match self.attempt_bootstrap_connection(session.peer_id, node).await {
597 Ok(()) => {
598 info!("Alternative bootstrap connection succeeded for session {}", session.session_id);
599 session.state = RecoveryState::Succeeded;
600 return Ok(());
601 }
602 Err(e) => {
603 warn!("Alternative bootstrap node {} failed: {:?}", node, e);
604 continue;
605 }
606 }
607 }
608
609 warn!("All alternative bootstrap nodes failed for session {}", session.session_id);
610 self.try_next_fallback_strategy(session).await
611 }
612
613 async fn execute_relay_fallback(
615 &self,
616 session: &mut RecoverySession,
617 relay_servers: Vec<std::net::SocketAddr>,
618 ) -> Result<(), MonitoringError> {
619 info!("Attempting relay fallback for session {}", session.session_id);
620
621 for relay in relay_servers {
622 debug!("Attempting relay connection via: {}", relay);
623
624 match self.attempt_relay_connection(session.peer_id, relay).await {
625 Ok(()) => {
626 info!("Relay connection succeeded for session {}", session.session_id);
627 session.state = RecoveryState::Succeeded;
628 return Ok(());
629 }
630 Err(e) => {
631 warn!("Relay server {} failed: {:?}", relay, e);
632 continue;
633 }
634 }
635 }
636
637 warn!("All relay servers failed for session {}", session.session_id);
638 self.try_next_fallback_strategy(session).await
639 }
640
641 async fn execute_connection_migration(
643 &self,
644 session: &mut RecoverySession,
645 new_path: std::net::SocketAddr,
646 ) -> Result<(), MonitoringError> {
647 info!("Attempting connection migration for session {} to {}", session.session_id, new_path);
648
649 match self.migration_handler.migrate_connection(session.peer_id, new_path).await {
650 Ok(()) => {
651 info!("Connection migration succeeded for session {}", session.session_id);
652 session.state = RecoveryState::Succeeded;
653 Ok(())
654 }
655 Err(e) => {
656 warn!("Connection migration failed for session {}: {:?}", session.session_id, e);
657 self.try_next_fallback_strategy(session).await
658 }
659 }
660 }
661
662 async fn execute_graceful_degradation(
664 &self,
665 session: &mut RecoverySession,
666 _reduced_functionality: bool,
667 ) -> Result<(), MonitoringError> {
668 info!("Applying graceful degradation for session {}", session.session_id);
669
670 session.state = RecoveryState::Succeeded;
678 Ok(())
679 }
680
681 async fn try_next_fallback_strategy(&self, session: &mut RecoverySession) -> Result<(), MonitoringError> {
683 if let Some(next_strategy) = session.remaining_strategies.pop() {
684 info!("Trying next fallback strategy for session {}: {:?}", session.session_id, next_strategy);
685
686 session.current_strategy = self.convert_fallback_to_strategy(next_strategy).await;
687 session.current_attempt = 0;
688
689 self.execute_recovery_strategy(session.session_id.clone()).await
691 } else {
692 warn!("All recovery strategies exhausted for session {}", session.session_id);
693 session.state = RecoveryState::Failed;
694 Ok(())
695 }
696 }
697
698 fn calculate_retry_delay(&self, policy: &RetryPolicy, attempt: u32) -> Duration {
700 let base_delay = policy.initial_delay.as_millis() as f64;
701 let exponential_delay = base_delay * policy.backoff_multiplier.powi(attempt as i32 - 1);
702 let capped_delay = exponential_delay.min(policy.max_delay.as_millis() as f64);
703
704 let final_delay = if policy.enable_jitter {
705 let jitter = capped_delay * policy.jitter_factor * (rand::random::<f64>() - 0.5);
706 capped_delay + jitter
707 } else {
708 capped_delay
709 };
710
711 Duration::from_millis(final_delay.max(0.0) as u64)
712 }
713
714 async fn determine_recovery_strategy(&self, error: &NatTraversalError) -> RecoveryStrategy {
716 match error {
717 NatTraversalError::NoBootstrapNodes => {
718 RecoveryStrategy::AlternativeBootstrap {
719 nodes: vec!["fallback1.example.com:9000".parse().unwrap()],
720 }
721 }
722 NatTraversalError::HolePunchingFailed => {
723 RecoveryStrategy::RelayFallback {
724 relay_servers: vec!["relay1.example.com:9000".parse().unwrap()],
725 }
726 }
727 NatTraversalError::Timeout => {
728 RecoveryStrategy::Retry {
729 policy: self.retry_policies.get(&ErrorCategory::Timeout)
730 .cloned()
731 .unwrap_or_else(|| self.config.default_retry_policy.clone()),
732 }
733 }
734 _ => {
735 RecoveryStrategy::Retry {
736 policy: self.config.default_retry_policy.clone(),
737 }
738 }
739 }
740 }
741
742 async fn get_fallback_strategies(&self, _error: &NatTraversalError) -> Vec<FallbackStrategy> {
744 self.fallback_strategies.clone()
745 }
746
747 async fn convert_fallback_to_strategy(&self, fallback: FallbackStrategy) -> RecoveryStrategy {
749 match fallback {
750 FallbackStrategy::AlternativeNatMethod => {
751 RecoveryStrategy::Retry {
752 policy: self.config.default_retry_policy.clone(),
753 }
754 }
755 FallbackStrategy::RelayServers => {
756 RecoveryStrategy::RelayFallback {
757 relay_servers: vec!["relay1.example.com:9000".parse().unwrap()],
758 }
759 }
760 FallbackStrategy::DirectConnection => {
761 RecoveryStrategy::GracefulDegradation {
762 reduced_functionality: true,
763 }
764 }
765 FallbackStrategy::ReducedRequirements => {
766 RecoveryStrategy::GracefulDegradation {
767 reduced_functionality: true,
768 }
769 }
770 FallbackStrategy::ManualIntervention => {
771 RecoveryStrategy::GracefulDegradation {
772 reduced_functionality: true,
773 }
774 }
775 }
776 }
777
778 async fn attempt_connection_recovery(&self, _peer_id: PeerId) -> Result<(), MonitoringError> {
780 sleep(Duration::from_millis(100)).await;
782
783 if rand::random::<f64>() > 0.3 {
785 Ok(())
786 } else {
787 Err(MonitoringError::SystemError("Recovery failed".to_string()))
788 }
789 }
790
791 async fn attempt_bootstrap_connection(&self, _peer_id: PeerId, _node: std::net::SocketAddr) -> Result<(), MonitoringError> {
793 sleep(Duration::from_millis(200)).await;
795
796 if rand::random::<f64>() > 0.4 {
797 Ok(())
798 } else {
799 Err(MonitoringError::SystemError("Bootstrap connection failed".to_string()))
800 }
801 }
802
803 async fn attempt_relay_connection(&self, _peer_id: PeerId, _relay: std::net::SocketAddr) -> Result<(), MonitoringError> {
805 sleep(Duration::from_millis(300)).await;
807
808 if rand::random::<f64>() > 0.2 {
809 Ok(())
810 } else {
811 Err(MonitoringError::SystemError("Relay connection failed".to_string()))
812 }
813 }
814
815 async fn start_cleanup_task(&self) -> Result<(), MonitoringError> {
817 debug!("Starting resource cleanup task");
819 Ok(())
820 }
821
822 async fn start_migration_monitoring(&self) -> Result<(), MonitoringError> {
824 debug!("Starting connection migration monitoring");
826 Ok(())
827 }
828
829 async fn start_circuit_breaker_monitoring(&self) -> Result<(), MonitoringError> {
831 debug!("Starting circuit breaker monitoring");
833 Ok(())
834 }
835
836 pub async fn get_recovery_statistics(&self) -> RecoveryStatistics {
838 let sessions = self.recovery_sessions.read().await;
839
840 let total_sessions = sessions.len();
841 let successful_recoveries = sessions.values()
842 .filter(|s| s.state == RecoveryState::Succeeded)
843 .count();
844 let failed_recoveries = sessions.values()
845 .filter(|s| s.state == RecoveryState::Failed)
846 .count();
847 let active_recoveries = sessions.values()
848 .filter(|s| s.state == RecoveryState::InProgress)
849 .count();
850
851 RecoveryStatistics {
852 total_sessions,
853 successful_recoveries,
854 failed_recoveries,
855 active_recoveries,
856 success_rate: if total_sessions > 0 {
857 successful_recoveries as f64 / total_sessions as f64
858 } else {
859 0.0
860 },
861 }
862 }
863}
864
865#[derive(Debug, Clone)]
867pub struct RecoveryStatistics {
868 pub total_sessions: usize,
869 pub successful_recoveries: usize,
870 pub failed_recoveries: usize,
871 pub active_recoveries: usize,
872 pub success_rate: f64,
873}
874
875impl CircuitBreaker {
878 fn new(config: CircuitBreakerConfig) -> Self {
879 Self {
880 config,
881 state: RwLock::new(CircuitBreakerState::Closed),
882 failure_count: RwLock::new(0),
883 success_count: RwLock::new(0),
884 last_state_change: RwLock::new(Instant::now()),
885 failure_history: RwLock::new(VecDeque::new()),
886 }
887 }
888
889 async fn allow_request(&self) -> bool {
890 let state = *self.state.read().await;
891
892 match state {
893 CircuitBreakerState::Closed => true,
894 CircuitBreakerState::Open => {
895 let last_change = *self.last_state_change.read().await;
896 if last_change.elapsed() > self.config.timeout {
897 *self.state.write().await = CircuitBreakerState::HalfOpen;
899 *self.success_count.write().await = 0;
900 true
901 } else {
902 false
903 }
904 }
905 CircuitBreakerState::HalfOpen => true,
906 }
907 }
908
909 async fn record_success(&self) {
910 let state = *self.state.read().await;
911
912 match state {
913 CircuitBreakerState::HalfOpen => {
914 let mut success_count = self.success_count.write().await;
915 *success_count += 1;
916
917 if *success_count >= self.config.success_threshold {
918 *self.state.write().await = CircuitBreakerState::Closed;
919 *self.failure_count.write().await = 0;
920 *self.last_state_change.write().await = Instant::now();
921 }
922 }
923 _ => {
924 *self.failure_count.write().await = 0;
926 }
927 }
928 }
929
930 async fn record_failure(&self) {
931 let mut failure_count = self.failure_count.write().await;
932 *failure_count += 1;
933
934 let mut history = self.failure_history.write().await;
936 history.push_back(Instant::now());
937
938 let cutoff = Instant::now() - self.config.window_size;
940 while let Some(&front_time) = history.front() {
941 if front_time < cutoff {
942 history.pop_front();
943 } else {
944 break;
945 }
946 }
947
948 if history.len() >= self.config.failure_threshold as usize {
950 *self.state.write().await = CircuitBreakerState::Open;
951 *self.last_state_change.write().await = Instant::now();
952 }
953 }
954}
955
956impl ConnectionMigrationHandler {
957 async fn new() -> Result<Self, MonitoringError> {
958 Ok(Self {
959 active_migrations: RwLock::new(HashMap::new()),
960 network_detector: NetworkChangeDetector::new(),
961 })
962 }
963
964 async fn migrate_connection(&self, peer_id: PeerId, new_path: std::net::SocketAddr) -> Result<(), MonitoringError> {
965 info!("Migrating connection for peer {:?} to {}", peer_id, new_path);
966
967 sleep(Duration::from_millis(500)).await;
969
970 if rand::random::<f64>() > 0.1 {
971 Ok(())
972 } else {
973 Err(MonitoringError::SystemError("Migration failed".to_string()))
974 }
975 }
976}
977
978impl NetworkChangeDetector {
979 fn new() -> Self {
980 Self {
981 last_network_state: RwLock::new(NetworkState {
982 interfaces: HashMap::new(),
983 default_route: None,
984 dns_servers: Vec::new(),
985 }),
986 detection_interval: Duration::from_secs(5),
987 }
988 }
989}
990
991impl ResourceCleanupManager {
992 fn new(interval: Duration) -> Self {
993 Self {
994 cleanup_tasks: RwLock::new(Vec::new()),
995 interval,
996 }
997 }
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use super::*;
1003
1004 #[tokio::test]
1005 async fn test_error_recovery_manager_creation() {
1006 let config = RecoveryConfig::default();
1007 let manager = ErrorRecoveryManager::new(config).await.unwrap();
1008
1009 let stats = manager.get_recovery_statistics().await;
1010 assert_eq!(stats.total_sessions, 0);
1011 }
1012
1013 #[tokio::test]
1014 async fn test_retry_delay_calculation() {
1015 let config = RecoveryConfig::default();
1016 let manager = ErrorRecoveryManager::new(config).await.unwrap();
1017
1018 let policy = RetryPolicy {
1019 initial_delay: Duration::from_millis(100),
1020 backoff_multiplier: 2.0,
1021 max_delay: Duration::from_secs(10),
1022 enable_jitter: false,
1023 jitter_factor: 0.0,
1024 ..RetryPolicy::default()
1025 };
1026
1027 let delay1 = manager.calculate_retry_delay(&policy, 1);
1028 let delay2 = manager.calculate_retry_delay(&policy, 2);
1029 let delay3 = manager.calculate_retry_delay(&policy, 3);
1030
1031 assert_eq!(delay1, Duration::from_millis(100));
1032 assert_eq!(delay2, Duration::from_millis(200));
1033 assert_eq!(delay3, Duration::from_millis(400));
1034 }
1035
1036 #[tokio::test]
1037 async fn test_circuit_breaker() {
1038 let config = CircuitBreakerConfig {
1039 failure_threshold: 2,
1040 success_threshold: 1,
1041 timeout: Duration::from_millis(100),
1042 window_size: Duration::from_secs(60),
1043 };
1044
1045 let breaker = CircuitBreaker::new(config);
1046
1047 assert!(breaker.allow_request().await);
1049
1050 breaker.record_failure().await;
1052 breaker.record_failure().await;
1053
1054 assert!(!breaker.allow_request().await);
1056
1057 tokio::time::sleep(Duration::from_millis(150)).await;
1059
1060 assert!(breaker.allow_request().await);
1062
1063 breaker.record_success().await;
1065 assert!(breaker.allow_request().await);
1066 }
1067}