1use std::{
2 collections::{HashMap, VecDeque},
3 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
4 time::Duration,
5};
6
7use crate::shared::ConnectionId;
8use tracing::{debug, info, trace, warn};
9
10use crate::{Instant, VarInt};
11
12#[derive(Debug)]
17pub(super) struct NatTraversalState {
18 pub(super) role: NatTraversalRole,
20 pub(super) local_candidates: HashMap<VarInt, AddressCandidate>,
22 pub(super) remote_candidates: HashMap<VarInt, AddressCandidate>,
24 pub(super) candidate_pairs: Vec<CandidatePair>,
26 pub(super) pair_index: HashMap<SocketAddr, usize>,
28 pub(super) active_validations: HashMap<SocketAddr, PathValidationState>,
30 pub(super) coordination: Option<CoordinationState>,
32 pub(super) next_sequence: VarInt,
34 pub(super) max_candidates: u32,
36 pub(super) coordination_timeout: Duration,
38 pub(super) stats: NatTraversalStats,
40 pub(super) security_state: SecurityValidationState,
42 pub(super) network_monitor: NetworkConditionMonitor,
44 pub(super) resource_manager: ResourceCleanupCoordinator,
46 pub(super) bootstrap_coordinator: Option<BootstrapCoordinator>,
48}
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum NatTraversalRole {
52 Client,
54 Server {
56 can_relay: bool,
58 },
59 Bootstrap,
61}
62#[derive(Debug, Clone)]
64pub(super) struct AddressCandidate {
65 pub(super) address: SocketAddr,
67 pub(super) priority: u32,
69 pub(super) source: CandidateSource,
71 pub(super) discovered_at: Instant,
73 pub(super) state: CandidateState,
75 pub(super) attempt_count: u32,
77 pub(super) last_attempt: Option<Instant>,
79}
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum CandidateSource {
83 Local,
85 Observed { by_node: Option<VarInt> },
87 Peer,
89 Predicted,
91}
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum CandidateState {
95 New,
97 Validating,
99 Valid,
101 Failed,
103 Removed,
105}
106#[derive(Debug)]
108pub(super) struct PathValidationState {
109 pub(super) challenge: u64,
111 pub(super) sent_at: Instant,
113 pub(super) retry_count: u32,
115 pub(super) max_retries: u32,
117 pub(super) coordination_round: Option<VarInt>,
119 pub(super) timeout_state: AdaptiveTimeoutState,
121 pub(super) last_retry_at: Option<Instant>,
123}
124#[derive(Debug)]
126pub(super) struct CoordinationState {
127 pub(super) round: VarInt,
129 pub(super) punch_targets: Vec<PunchTarget>,
131 pub(super) round_start: Instant,
133 pub(super) punch_start: Instant,
135 pub(super) round_duration: Duration,
137 pub(super) state: CoordinationPhase,
139 pub(super) punch_request_sent: bool,
141 pub(super) peer_punch_received: bool,
143 pub(super) retry_count: u32,
145 pub(super) max_retries: u32,
147 pub(super) timeout_state: AdaptiveTimeoutState,
149 pub(super) last_retry_at: Option<Instant>,
151}
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub(crate) enum CoordinationPhase {
155 Idle,
157 Requesting,
159 Coordinating,
161 Preparing,
163 Punching,
165 Validating,
167 Succeeded,
169 Failed,
171}
172#[derive(Debug, Clone)]
174pub(super) struct PunchTarget {
175 pub(super) remote_addr: SocketAddr,
177 pub(super) remote_sequence: VarInt,
179 pub(super) challenge: u64,
181}
182#[derive(Debug, Clone, PartialEq, Eq)]
184pub(super) enum TimeoutAction {
185 RetryDiscovery,
187 RetryCoordination,
189 StartValidation,
191 Complete,
193 Failed,
195}
196
197#[derive(Debug, Clone)]
199pub(super) struct CandidatePair {
200 pub(super) remote_sequence: VarInt,
202 pub(super) local_addr: SocketAddr,
204 pub(super) remote_addr: SocketAddr,
206 pub(super) priority: u64,
208 pub(super) state: PairState,
210 pub(super) pair_type: PairType,
212 pub(super) created_at: Instant,
214 pub(super) last_check: Option<Instant>,
216}
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
219pub(super) enum PairState {
220 Waiting,
222 Succeeded,
224 Failed,
226 Frozen,
228}
229#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
231pub(super) enum PairType {
232 HostToHost,
234 HostToServerReflexive,
236 ServerReflexiveToHost,
238 ServerReflexiveToServerReflexive,
240 PeerReflexive,
242}
243#[derive(Debug, Clone, Copy, PartialEq, Eq)]
245pub(super) enum CandidateType {
246 Host,
248 ServerReflexive,
250 PeerReflexive,
252}
253fn calculate_candidate_priority(
256 candidate_type: CandidateType,
257 local_preference: u16,
258 component_id: u8,
259) -> u32 {
260 let type_preference = match candidate_type {
261 CandidateType::Host => 126,
262 CandidateType::PeerReflexive => 110,
263 CandidateType::ServerReflexive => 100,
264 };
265 (1u32 << 24) * type_preference + (1u32 << 8) * local_preference as u32 + component_id as u32
267}
268
269fn calculate_pair_priority(local_priority: u32, remote_priority: u32) -> u64 {
272 let g = local_priority as u64;
273 let d = remote_priority as u64;
274 (1u64 << 32) * g.min(d) + 2 * g.max(d) + if g > d { 1 } else { 0 }
276}
277
278fn classify_candidate_type(source: CandidateSource) -> CandidateType {
280 match source {
281 CandidateSource::Local => CandidateType::Host,
282 CandidateSource::Observed { .. } => CandidateType::ServerReflexive,
283 CandidateSource::Peer => CandidateType::PeerReflexive,
284 CandidateSource::Predicted => CandidateType::ServerReflexive, }
286}
287fn classify_pair_type(local_type: CandidateType, remote_type: CandidateType) -> PairType {
289 match (local_type, remote_type) {
290 (CandidateType::Host, CandidateType::Host) => PairType::HostToHost,
291 (CandidateType::Host, CandidateType::ServerReflexive) => PairType::HostToServerReflexive,
292 (CandidateType::ServerReflexive, CandidateType::Host) => PairType::ServerReflexiveToHost,
293 (CandidateType::ServerReflexive, CandidateType::ServerReflexive) => {
294 PairType::ServerReflexiveToServerReflexive
295 }
296 (CandidateType::PeerReflexive, _) | (_, CandidateType::PeerReflexive) => {
297 PairType::PeerReflexive
298 }
299 }
300}
301fn are_candidates_compatible(local: &AddressCandidate, remote: &AddressCandidate) -> bool {
303 match (local.address, remote.address) {
305 (SocketAddr::V4(_), SocketAddr::V4(_)) => true,
306 (SocketAddr::V6(_), SocketAddr::V6(_)) => true,
307 _ => false, }
309}
310#[derive(Debug, Default, Clone)]
312pub(crate) struct NatTraversalStats {
313 pub(super) remote_candidates_received: u32,
315 pub(super) local_candidates_sent: u32,
317 pub(super) validations_succeeded: u32,
319 pub(super) validations_failed: u32,
321 pub(super) coordination_rounds: u32,
323 pub(super) successful_coordinations: u32,
325 pub(super) failed_coordinations: u32,
327 pub(super) timed_out_coordinations: u32,
329 pub(super) coordination_failures: u32,
331 pub(super) direct_connections: u32,
333 pub(super) security_rejections: u32,
335 pub(super) rate_limit_violations: u32,
337 pub(super) invalid_address_rejections: u32,
339 pub(super) suspicious_coordination_attempts: u32,
341}
342#[derive(Debug)]
344pub(super) struct SecurityValidationState {
345 candidate_rate_tracker: VecDeque<Instant>,
347 max_candidates_per_window: u32,
349 rate_window: Duration,
351 coordination_requests: VecDeque<CoordinationRequest>,
353 max_coordination_per_window: u32,
355 address_validation_cache: HashMap<SocketAddr, AddressValidationResult>,
357 validation_cache_timeout: Duration,
359}
360#[derive(Debug, Clone)]
362struct CoordinationRequest {
363 timestamp: Instant,
365}
366#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368enum AddressValidationResult {
369 Valid,
371 Invalid,
373 Suspicious,
375}
376#[derive(Debug, Clone)]
378pub(super) struct AdaptiveTimeoutState {
379 current_timeout: Duration,
381 min_timeout: Duration,
383 max_timeout: Duration,
385 base_timeout: Duration,
387 backoff_multiplier: f64,
389 max_backoff_multiplier: f64,
391 jitter_factor: f64,
393 srtt: Option<Duration>,
395 rttvar: Option<Duration>,
397 last_rtt: Option<Duration>,
399 consecutive_timeouts: u32,
401 successful_responses: u32,
403}
404#[derive(Debug)]
406pub(super) struct NetworkConditionMonitor {
407 rtt_samples: VecDeque<Duration>,
409 max_samples: usize,
411 packet_loss_rate: f64,
413 congestion_window: u32,
415 quality_score: f64,
417 last_quality_update: Instant,
419 quality_update_interval: Duration,
421 timeout_stats: TimeoutStatistics,
423}
424#[derive(Debug, Default)]
426struct TimeoutStatistics {
427 total_timeouts: u64,
429 total_responses: u64,
431 avg_response_time: Duration,
433 timeout_rate: f64,
435 last_update: Option<Instant>,
437}
438impl SecurityValidationState {
439 fn new() -> Self {
441 Self {
442 candidate_rate_tracker: VecDeque::new(),
443 max_candidates_per_window: 20, rate_window: Duration::from_secs(60),
445 coordination_requests: VecDeque::new(),
446 max_coordination_per_window: 5, address_validation_cache: HashMap::new(),
448 validation_cache_timeout: Duration::from_secs(300), }
450 }
451 fn new_with_limits(
453 max_candidates_per_window: u32,
454 max_coordination_per_window: u32,
455 rate_window: Duration,
456 ) -> Self {
457 Self {
458 candidate_rate_tracker: VecDeque::new(),
459 max_candidates_per_window,
460 rate_window,
461 coordination_requests: VecDeque::new(),
462 max_coordination_per_window,
463 address_validation_cache: HashMap::new(),
464 validation_cache_timeout: Duration::from_secs(300),
465 }
466 }
467 fn is_adaptive_rate_limited(&mut self, peer_id: [u8; 32], now: Instant) -> bool {
472 self.cleanup_rate_tracker(now);
474 self.cleanup_coordination_tracker(now);
475 let _current_candidate_rate =
477 self.candidate_rate_tracker.len() as f64 / self.rate_window.as_secs_f64();
478 let _current_coordination_rate =
479 self.coordination_requests.len() as f64 / self.rate_window.as_secs_f64();
480
481 let peer_reputation = self.calculate_peer_reputation(peer_id);
483 let adaptive_candidate_limit =
484 (self.max_candidates_per_window as f64 * peer_reputation) as u32;
485 let adaptive_coordination_limit =
486 (self.max_coordination_per_window as f64 * peer_reputation) as u32;
487
488 if self.candidate_rate_tracker.len() >= adaptive_candidate_limit as usize {
490 debug!(
491 "Adaptive candidate rate limit exceeded for peer {:?}: {} >= {}",
492 hex::encode(&peer_id[..8]),
493 self.candidate_rate_tracker.len(),
494 adaptive_candidate_limit
495 );
496 return true;
497 }
498
499 if self.coordination_requests.len() >= adaptive_coordination_limit as usize {
500 debug!(
501 "Adaptive coordination rate limit exceeded for peer {:?}: {} >= {}",
502 hex::encode(&peer_id[..8]),
503 self.coordination_requests.len(),
504 adaptive_coordination_limit
505 );
506 return true;
507 }
508
509 false
510 }
511
512 fn calculate_peer_reputation(&self, _peer_id: [u8; 32]) -> f64 {
517 1.0
526 }
527
528 fn validate_amplification_limits(
533 &mut self,
534 source_addr: SocketAddr,
535 target_addr: SocketAddr,
536 now: Instant,
537 ) -> Result<(), NatTraversalError> {
538 let amplification_key = (source_addr, target_addr);
540 if self.is_amplification_suspicious(amplification_key, now) {
549 warn!(
550 "Potential amplification attack detected: {} -> {}",
551 source_addr, target_addr
552 );
553 return Err(NatTraversalError::SuspiciousCoordination);
554 }
555
556 Ok(())
557 }
558
559 fn is_amplification_suspicious(
561 &self,
562 _amplification_key: (SocketAddr, SocketAddr),
563 _now: Instant,
564 ) -> bool {
565 false
574 }
575
576 fn generate_secure_coordination_round(&self) -> VarInt {
581 let secure_random: u64 = rand::random();
583 let bounded_random = secure_random % 1000000; VarInt::from_u64(bounded_random).unwrap_or(VarInt::from_u32(1))
587 }
588
589 fn enhanced_address_validation(
597 &mut self,
598 addr: SocketAddr,
599 source_addr: SocketAddr,
600 now: Instant,
601 ) -> Result<AddressValidationResult, NatTraversalError> {
602 let basic_result = self.validate_address(addr, now);
604 match basic_result {
605 AddressValidationResult::Invalid => {
606 return Err(NatTraversalError::InvalidAddress);
607 }
608 AddressValidationResult::Suspicious => {
609 return Err(NatTraversalError::SuspiciousCoordination);
610 }
611 AddressValidationResult::Valid => {
612 }
614 }
615
616 self.validate_amplification_limits(source_addr, addr, now)?;
618
619 if self.is_address_in_suspicious_range(addr) {
621 warn!("Address in suspicious range detected: {}", addr);
622 return Err(NatTraversalError::SuspiciousCoordination);
623 }
624
625 if self.is_coordination_pattern_suspicious(source_addr, addr, now) {
626 warn!(
627 "Suspicious coordination pattern detected: {} -> {}",
628 source_addr, addr
629 );
630 return Err(NatTraversalError::SuspiciousCoordination);
631 }
632
633 Ok(AddressValidationResult::Valid)
634 }
635
636 fn is_address_in_suspicious_range(&self, addr: SocketAddr) -> bool {
638 match addr.ip() {
639 IpAddr::V4(ipv4) => {
640 let octets = ipv4.octets();
642 if octets[0] == 0 || octets[0] == 127 {
644 return true;
645 }
646
647 if octets[0] == 192 && octets[1] == 0 && octets[2] == 2 {
649 return true;
650 }
651 if octets[0] == 198 && octets[1] == 51 && octets[2] == 100 {
652 return true;
653 }
654 if octets[0] == 203 && octets[1] == 0 && octets[2] == 113 {
655 return true;
656 }
657
658 false
659 }
660 IpAddr::V6(ipv6) => {
661 if ipv6.is_loopback() || ipv6.is_unspecified() {
663 return true;
664 }
665
666 let segments = ipv6.segments();
668 if segments[0] == 0x2001 && segments[1] == 0x0db8 {
669 return true;
670 }
671
672 false
673 }
674 }
675 }
676
677 fn is_coordination_pattern_suspicious(
679 &self,
680 _source_addr: SocketAddr,
681 _target_addr: SocketAddr,
682 _now: Instant,
683 ) -> bool {
684 false
693 }
694
695 fn is_candidate_rate_limited(&mut self, now: Instant) -> bool {
697 self.cleanup_rate_tracker(now);
699 if self.candidate_rate_tracker.len() >= self.max_candidates_per_window as usize {
701 return true;
702 }
703
704 self.candidate_rate_tracker.push_back(now);
706 false
707 }
708
709 fn is_coordination_rate_limited(&mut self, now: Instant) -> bool {
711 self.cleanup_coordination_tracker(now);
713 if self.coordination_requests.len() >= self.max_coordination_per_window as usize {
715 return true;
716 }
717
718 let request = CoordinationRequest { timestamp: now };
720 self.coordination_requests.push_back(request);
721 false
722 }
723
724 fn cleanup_rate_tracker(&mut self, now: Instant) {
726 let cutoff = now - self.rate_window;
727 while let Some(&front_time) = self.candidate_rate_tracker.front() {
728 if front_time < cutoff {
729 self.candidate_rate_tracker.pop_front();
730 } else {
731 break;
732 }
733 }
734 }
735 fn cleanup_coordination_tracker(&mut self, now: Instant) {
737 let cutoff = now - self.rate_window;
738 while let Some(front_request) = self.coordination_requests.front() {
739 if front_request.timestamp < cutoff {
740 self.coordination_requests.pop_front();
741 } else {
742 break;
743 }
744 }
745 }
746 fn validate_address(&mut self, addr: SocketAddr, now: Instant) -> AddressValidationResult {
748 if let Some(&cached_result) = self.address_validation_cache.get(&addr) {
750 return cached_result;
751 }
752 let result = self.perform_address_validation(addr);
753
754 self.address_validation_cache.insert(addr, result);
756
757 if self.address_validation_cache.len() > 1000 {
759 self.cleanup_address_cache(now);
760 }
761
762 result
763 }
764
765 fn perform_address_validation(&self, addr: SocketAddr) -> AddressValidationResult {
767 match addr.ip() {
768 IpAddr::V4(ipv4) => {
769 if ipv4.is_unspecified() || ipv4.is_broadcast() {
771 return AddressValidationResult::Invalid;
772 }
773 if ipv4.is_multicast() || ipv4.is_documentation() {
775 return AddressValidationResult::Suspicious;
776 }
777
778 if ipv4.octets()[0] == 0 || ipv4.octets()[0] == 127 {
780 return AddressValidationResult::Invalid;
781 }
782
783 if self.is_suspicious_ipv4(ipv4) {
785 return AddressValidationResult::Suspicious;
786 }
787 }
788 IpAddr::V6(ipv6) => {
789 if ipv6.is_unspecified() || ipv6.is_multicast() {
791 return AddressValidationResult::Invalid;
792 }
793
794 if self.is_suspicious_ipv6(ipv6) {
796 return AddressValidationResult::Suspicious;
797 }
798 }
799 }
800
801 if addr.port() == 0 || addr.port() < 1024 {
803 return AddressValidationResult::Suspicious;
804 }
805
806 AddressValidationResult::Valid
807 }
808
809 fn is_suspicious_ipv4(&self, ipv4: Ipv4Addr) -> bool {
811 let octets = ipv4.octets();
812 if octets[0] == octets[1] && octets[1] == octets[2] && octets[2] == octets[3] {
815 return true;
816 }
817
818 false
821 }
822
823 fn is_suspicious_ipv6(&self, ipv6: Ipv6Addr) -> bool {
825 let segments = ipv6.segments();
826 if segments.iter().all(|&s| s == segments[0]) {
828 return true;
829 }
830
831 false
832 }
833
834 fn cleanup_address_cache(&mut self, _now: Instant) {
836 if self.address_validation_cache.len() > 500 {
839 let keys_to_remove: Vec<_> = self
840 .address_validation_cache
841 .keys()
842 .take(self.address_validation_cache.len() / 2)
843 .copied()
844 .collect();
845 for key in keys_to_remove {
846 self.address_validation_cache.remove(&key);
847 }
848 }
849 }
850
851 fn validate_punch_me_now_frame(
859 &mut self,
860 frame: &crate::frame::PunchMeNow,
861 source_addr: SocketAddr,
862 peer_id: [u8; 32],
863 now: Instant,
864 ) -> Result<(), NatTraversalError> {
865 if self.is_coordination_rate_limited(now) {
867 debug!(
868 "PUNCH_ME_NOW frame rejected: coordination rate limit exceeded for peer {:?}",
869 hex::encode(&peer_id[..8])
870 );
871 return Err(NatTraversalError::RateLimitExceeded);
872 }
873 let addr_validation = self.validate_address(frame.address, now);
875 match addr_validation {
876 AddressValidationResult::Invalid => {
877 debug!(
878 "PUNCH_ME_NOW frame rejected: invalid address {:?} from peer {:?}",
879 frame.address,
880 hex::encode(&peer_id[..8])
881 );
882 return Err(NatTraversalError::InvalidAddress);
883 }
884 AddressValidationResult::Suspicious => {
885 debug!(
886 "PUNCH_ME_NOW frame rejected: suspicious address {:?} from peer {:?}",
887 frame.address,
888 hex::encode(&peer_id[..8])
889 );
890 return Err(NatTraversalError::SuspiciousCoordination);
891 }
892 AddressValidationResult::Valid => {
893 }
895 }
896
897 if !self.validate_address_consistency(frame.address, source_addr) {
900 debug!(
901 "PUNCH_ME_NOW frame rejected: address consistency check failed. Frame claims {:?}, but received from {:?}",
902 frame.address, source_addr
903 );
904 return Err(NatTraversalError::SuspiciousCoordination);
905 }
906
907 if !self.validate_coordination_parameters(frame) {
909 debug!(
910 "PUNCH_ME_NOW frame rejected: invalid coordination parameters from peer {:?}",
911 hex::encode(&peer_id[..8])
912 );
913 return Err(NatTraversalError::SuspiciousCoordination);
914 }
915
916 if let Some(target_peer_id) = frame.target_peer_id {
918 if !self.validate_target_peer_request(peer_id, target_peer_id, frame) {
919 debug!(
920 "PUNCH_ME_NOW frame rejected: invalid target peer request from {:?} to {:?}",
921 hex::encode(&peer_id[..8]),
922 hex::encode(&target_peer_id[..8])
923 );
924 return Err(NatTraversalError::SuspiciousCoordination);
925 }
926 }
927
928 if !self.validate_resource_limits(frame) {
930 debug!(
931 "PUNCH_ME_NOW frame rejected: resource limits exceeded from peer {:?}",
932 hex::encode(&peer_id[..8])
933 );
934 return Err(NatTraversalError::ResourceLimitExceeded);
935 }
936
937 debug!(
938 "PUNCH_ME_NOW frame validation passed for peer {:?}",
939 hex::encode(&peer_id[..8])
940 );
941 Ok(())
942 }
943
944 fn validate_address_consistency(
949 &self,
950 claimed_addr: SocketAddr,
951 observed_addr: SocketAddr,
952 ) -> bool {
953 match (claimed_addr.ip(), observed_addr.ip()) {
957 (IpAddr::V4(claimed_ip), IpAddr::V4(observed_ip)) => {
958 if claimed_ip == observed_ip {
960 return true;
961 }
962
963 if self.are_in_same_private_network_v4(claimed_ip, observed_ip) {
965 return true;
966 }
967
968 !claimed_ip.is_private() && !observed_ip.is_private()
971 }
972 (IpAddr::V6(claimed_ip), IpAddr::V6(observed_ip)) => {
973 claimed_ip == observed_ip || self.are_in_same_prefix_v6(claimed_ip, observed_ip)
975 }
976 _ => {
977 false
979 }
980 }
981 }
982
983 fn are_in_same_private_network_v4(&self, ip1: Ipv4Addr, ip2: Ipv4Addr) -> bool {
985 let ip1_octets = ip1.octets();
987 let ip2_octets = ip2.octets();
988 if ip1_octets[0] == 10 && ip2_octets[0] == 10 {
990 return true;
991 }
992
993 if ip1_octets[0] == 172
995 && ip2_octets[0] == 172
996 && (16..=31).contains(&ip1_octets[1])
997 && (16..=31).contains(&ip2_octets[1])
998 {
999 return true;
1000 }
1001
1002 if ip1_octets[0] == 192
1004 && ip1_octets[1] == 168
1005 && ip2_octets[0] == 192
1006 && ip2_octets[1] == 168
1007 {
1008 return true;
1009 }
1010
1011 false
1012 }
1013
1014 fn are_in_same_prefix_v6(&self, ip1: Ipv6Addr, ip2: Ipv6Addr) -> bool {
1016 let segments1 = ip1.segments();
1018 let segments2 = ip2.segments();
1019 segments1[0] == segments2[0]
1020 && segments1[1] == segments2[1]
1021 && segments1[2] == segments2[2]
1022 && segments1[3] == segments2[3]
1023 }
1024
1025 fn validate_coordination_parameters(&self, frame: &crate::frame::PunchMeNow) -> bool {
1027 if frame.round.into_inner() > 1000000 {
1029 return false;
1030 }
1031 if frame.paired_with_sequence_number.into_inner() > 10000 {
1033 return false;
1034 }
1035
1036 match frame.address.ip() {
1038 IpAddr::V4(ipv4) => {
1039 !ipv4.is_unspecified() && !ipv4.is_broadcast() && !ipv4.is_multicast()
1041 }
1042 IpAddr::V6(ipv6) => {
1043 !ipv6.is_unspecified() && !ipv6.is_multicast()
1045 }
1046 }
1047 }
1048
1049 fn validate_target_peer_request(
1051 &self,
1052 requesting_peer: [u8; 32],
1053 target_peer: [u8; 32],
1054 _frame: &crate::frame::PunchMeNow,
1055 ) -> bool {
1056 if requesting_peer == target_peer {
1058 return false;
1059 }
1060 true
1066 }
1067
1068 fn validate_resource_limits(&self, _frame: &crate::frame::PunchMeNow) -> bool {
1070 self.coordination_requests.len() < self.max_coordination_per_window as usize
1078 }
1079}
1080
1081impl AdaptiveTimeoutState {
1082 pub(crate) fn new() -> Self {
1084 let base_timeout = Duration::from_millis(1000); Self {
1086 current_timeout: base_timeout,
1087 min_timeout: Duration::from_millis(100),
1088 max_timeout: Duration::from_secs(30),
1089 base_timeout,
1090 backoff_multiplier: 1.0,
1091 max_backoff_multiplier: 8.0,
1092 jitter_factor: 0.1, srtt: None,
1094 rttvar: None,
1095 last_rtt: None,
1096 consecutive_timeouts: 0,
1097 successful_responses: 0,
1098 }
1099 }
1100 fn update_success(&mut self, rtt: Duration) {
1102 self.last_rtt = Some(rtt);
1103 self.successful_responses += 1;
1104 self.consecutive_timeouts = 0;
1105 match self.srtt {
1107 None => {
1108 self.srtt = Some(rtt);
1109 self.rttvar = Some(rtt / 2);
1110 }
1111 Some(srtt) => {
1112 let rttvar = self.rttvar.unwrap_or(rtt / 2);
1113 let abs_diff = rtt.abs_diff(srtt);
1114
1115 self.rttvar = Some(rttvar * 3 / 4 + abs_diff / 4);
1116 self.srtt = Some(srtt * 7 / 8 + rtt / 8);
1117 }
1118 }
1119
1120 self.backoff_multiplier = (self.backoff_multiplier * 0.8).max(1.0);
1122
1123 self.calculate_current_timeout();
1125 }
1126
1127 fn update_timeout(&mut self) {
1129 self.consecutive_timeouts += 1;
1130 self.backoff_multiplier = (self.backoff_multiplier * 2.0).min(self.max_backoff_multiplier);
1132
1133 self.calculate_current_timeout();
1135 }
1136
1137 fn calculate_current_timeout(&mut self) {
1139 let base_timeout = if let (Some(srtt), Some(rttvar)) = (self.srtt, self.rttvar) {
1140 srtt + rttvar * 4
1142 } else {
1143 self.base_timeout
1144 };
1145 let timeout = base_timeout.mul_f64(self.backoff_multiplier);
1147
1148 let jitter = 1.0 + (rand::random::<f64>() - 0.5) * 2.0 * self.jitter_factor;
1150 let timeout = timeout.mul_f64(jitter);
1151
1152 self.current_timeout = timeout.clamp(self.min_timeout, self.max_timeout);
1154 }
1155
1156 fn get_timeout(&self) -> Duration {
1158 self.current_timeout
1159 }
1160 fn should_retry(&self, max_retries: u32) -> bool {
1162 self.consecutive_timeouts < max_retries
1163 }
1164 fn get_retry_delay(&self) -> Duration {
1166 let delay = self.current_timeout.mul_f64(self.backoff_multiplier);
1167 delay.clamp(self.min_timeout, self.max_timeout)
1168 }
1169}
1170#[derive(Debug)]
1172pub(super) struct ResourceManagementConfig {
1173 max_active_validations: usize,
1175 max_local_candidates: usize,
1177 max_remote_candidates: usize,
1179 max_candidate_pairs: usize,
1181 max_coordination_history: usize,
1183 cleanup_interval: Duration,
1185 candidate_timeout: Duration,
1187 validation_timeout: Duration,
1189 coordination_timeout: Duration,
1191 memory_pressure_threshold: f64,
1193 aggressive_cleanup_threshold: f64,
1195}
1196#[derive(Debug, Default)]
1198pub(super) struct ResourceStats {
1199 active_validations: usize,
1201 local_candidates: usize,
1203 remote_candidates: usize,
1205 candidate_pairs: usize,
1207 peak_memory_usage: usize,
1209 cleanup_operations: u64,
1211 resources_cleaned: u64,
1213 allocation_failures: u64,
1215 last_cleanup: Option<Instant>,
1217 memory_pressure: f64,
1219}
1220#[derive(Debug)]
1222pub(super) struct ResourceCleanupCoordinator {
1223 config: ResourceManagementConfig,
1225 stats: ResourceStats,
1227 last_cleanup: Option<Instant>,
1229 cleanup_counter: u64,
1231 shutdown_requested: bool,
1233}
1234impl ResourceManagementConfig {
1235 fn new() -> Self {
1237 Self {
1238 max_active_validations: 100,
1239 max_local_candidates: 50,
1240 max_remote_candidates: 100,
1241 max_candidate_pairs: 200,
1242 max_coordination_history: 10,
1243 cleanup_interval: Duration::from_secs(30),
1244 candidate_timeout: Duration::from_secs(300), validation_timeout: Duration::from_secs(30),
1246 coordination_timeout: Duration::from_secs(60),
1247 memory_pressure_threshold: 0.75,
1248 aggressive_cleanup_threshold: 0.90,
1249 }
1250 }
1251 #[cfg(feature = "low_memory")]
1253 fn low_memory() -> Self {
1254 Self {
1255 max_active_validations: 25,
1256 max_local_candidates: 10,
1257 max_remote_candidates: 25,
1258 max_candidate_pairs: 50,
1259 max_coordination_history: 3,
1260 cleanup_interval: Duration::from_secs(15),
1261 candidate_timeout: Duration::from_secs(180), validation_timeout: Duration::from_secs(20),
1263 coordination_timeout: Duration::from_secs(30),
1264 memory_pressure_threshold: 0.60,
1265 aggressive_cleanup_threshold: 0.80,
1266 }
1267 }
1268}
1269impl ResourceCleanupCoordinator {
1270 fn new() -> Self {
1272 Self {
1273 config: ResourceManagementConfig::new(),
1274 stats: ResourceStats::default(),
1275 last_cleanup: None,
1276 cleanup_counter: 0,
1277 shutdown_requested: false,
1278 }
1279 }
1280 #[cfg(feature = "low_memory")]
1282 fn low_memory() -> Self {
1283 Self {
1284 config: ResourceManagementConfig::low_memory(),
1285 stats: ResourceStats::default(),
1286 last_cleanup: None,
1287 cleanup_counter: 0,
1288 shutdown_requested: false,
1289 }
1290 }
1291 fn check_resource_limits(&self, state: &NatTraversalState) -> bool {
1293 state.active_validations.len() > self.config.max_active_validations
1294 || state.local_candidates.len() > self.config.max_local_candidates
1295 || state.remote_candidates.len() > self.config.max_remote_candidates
1296 || state.candidate_pairs.len() > self.config.max_candidate_pairs
1297 }
1298 fn calculate_memory_pressure(
1300 &mut self,
1301 active_validations_len: usize,
1302 local_candidates_len: usize,
1303 remote_candidates_len: usize,
1304 candidate_pairs_len: usize,
1305 ) -> f64 {
1306 let total_limit = self.config.max_active_validations
1307 + self.config.max_local_candidates
1308 + self.config.max_remote_candidates
1309 + self.config.max_candidate_pairs;
1310 let current_usage = active_validations_len
1311 + local_candidates_len
1312 + remote_candidates_len
1313 + candidate_pairs_len;
1314
1315 let pressure = current_usage as f64 / total_limit as f64;
1316 self.stats.memory_pressure = pressure;
1317 pressure
1318 }
1319
1320 fn should_cleanup(&self, now: Instant) -> bool {
1322 if self.shutdown_requested {
1323 return true;
1324 }
1325 if let Some(last_cleanup) = self.last_cleanup {
1327 if now.duration_since(last_cleanup) >= self.config.cleanup_interval {
1328 return true;
1329 }
1330 } else {
1331 return true; }
1333
1334 if self.stats.memory_pressure > self.config.memory_pressure_threshold {
1336 return true;
1337 }
1338
1339 false
1340 }
1341
1342 fn cleanup_expired_resources(
1344 &mut self,
1345 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1346 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1347 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1348 candidate_pairs: &mut Vec<CandidatePair>,
1349 coordination: &mut Option<CoordinationState>,
1350 now: Instant,
1351 ) -> u64 {
1352 let mut cleaned = 0;
1353 cleaned += self.cleanup_expired_validations(active_validations, now);
1355
1356 cleaned += self.cleanup_stale_candidates(local_candidates, remote_candidates, now);
1358
1359 cleaned += self.cleanup_failed_pairs(candidate_pairs, now);
1361
1362 cleaned += self.cleanup_old_coordination(coordination, now);
1364
1365 self.stats.cleanup_operations += 1;
1367 self.stats.resources_cleaned += cleaned;
1368 self.last_cleanup = Some(now);
1369 self.cleanup_counter += 1;
1370
1371 debug!("Cleaned up {} expired resources", cleaned);
1372 cleaned
1373 }
1374
1375 fn cleanup_expired_validations(
1377 &mut self,
1378 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1379 now: Instant,
1380 ) -> u64 {
1381 let mut cleaned = 0;
1382 let validation_timeout = self.config.validation_timeout;
1383 active_validations.retain(|_addr, validation| {
1384 let is_expired = now.duration_since(validation.sent_at) > validation_timeout;
1385 if is_expired {
1386 cleaned += 1;
1387 trace!("Cleaned up expired validation for {:?}", _addr);
1388 }
1389 !is_expired
1390 });
1391
1392 cleaned
1393 }
1394
1395 fn cleanup_stale_candidates(
1397 &mut self,
1398 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1399 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1400 now: Instant,
1401 ) -> u64 {
1402 let mut cleaned = 0;
1403 let candidate_timeout = self.config.candidate_timeout;
1404 local_candidates.retain(|_seq, candidate| {
1406 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1407 || candidate.state == CandidateState::Failed
1408 || candidate.state == CandidateState::Removed;
1409 if is_stale {
1410 cleaned += 1;
1411 trace!("Cleaned up stale local candidate {:?}", candidate.address);
1412 }
1413 !is_stale
1414 });
1415
1416 remote_candidates.retain(|_seq, candidate| {
1418 let is_stale = now.duration_since(candidate.discovered_at) > candidate_timeout
1419 || candidate.state == CandidateState::Failed
1420 || candidate.state == CandidateState::Removed;
1421 if is_stale {
1422 cleaned += 1;
1423 trace!("Cleaned up stale remote candidate {:?}", candidate.address);
1424 }
1425 !is_stale
1426 });
1427
1428 cleaned
1429 }
1430
1431 fn cleanup_failed_pairs(
1433 &mut self,
1434 candidate_pairs: &mut Vec<CandidatePair>,
1435 now: Instant,
1436 ) -> u64 {
1437 let mut cleaned = 0;
1438 let pair_timeout = self.config.candidate_timeout;
1439 candidate_pairs.retain(|pair| {
1440 let is_stale = now.duration_since(pair.created_at) > pair_timeout
1441 || pair.state == PairState::Failed;
1442 if is_stale {
1443 cleaned += 1;
1444 trace!(
1445 "Cleaned up failed candidate pair {:?} -> {:?}",
1446 pair.local_addr, pair.remote_addr
1447 );
1448 }
1449 !is_stale
1450 });
1451
1452 cleaned
1453 }
1454
1455 fn cleanup_old_coordination(
1457 &mut self,
1458 coordination: &mut Option<CoordinationState>,
1459 now: Instant,
1460 ) -> u64 {
1461 let mut cleaned = 0;
1462 if let Some(coord) = coordination {
1463 let is_expired =
1464 now.duration_since(coord.round_start) > self.config.coordination_timeout;
1465 let is_failed = coord.state == CoordinationPhase::Failed;
1466
1467 if is_expired || is_failed {
1468 let round = coord.round;
1469 *coordination = None;
1470 cleaned += 1;
1471 trace!("Cleaned up old coordination state for round {}", round);
1472 }
1473 }
1474
1475 cleaned
1476 }
1477
1478 fn aggressive_cleanup(
1480 &mut self,
1481 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1482 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1483 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1484 candidate_pairs: &mut Vec<CandidatePair>,
1485 now: Instant,
1486 ) -> u64 {
1487 let mut cleaned = 0;
1488 let aggressive_timeout = self.config.candidate_timeout / 2;
1490
1491 local_candidates.retain(|_seq, candidate| {
1493 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1494 && candidate.state != CandidateState::Failed;
1495 if !keep {
1496 cleaned += 1;
1497 }
1498 keep
1499 });
1500
1501 remote_candidates.retain(|_seq, candidate| {
1502 let keep = now.duration_since(candidate.discovered_at) <= aggressive_timeout
1503 && candidate.state != CandidateState::Failed;
1504 if !keep {
1505 cleaned += 1;
1506 }
1507 keep
1508 });
1509
1510 candidate_pairs.retain(|pair| {
1512 let keep = pair.state != PairState::Waiting
1513 || now.duration_since(pair.created_at) <= aggressive_timeout;
1514 if !keep {
1515 cleaned += 1;
1516 }
1517 keep
1518 });
1519
1520 active_validations.retain(|_addr, validation| {
1522 let keep = now.duration_since(validation.sent_at) <= self.config.validation_timeout / 2;
1523 if !keep {
1524 cleaned += 1;
1525 }
1526 keep
1527 });
1528
1529 warn!(
1530 "Aggressive cleanup removed {} resources due to memory pressure",
1531 cleaned
1532 );
1533 cleaned
1534 }
1535
1536 fn request_shutdown(&mut self) {
1538 self.shutdown_requested = true;
1539 debug!("Resource cleanup coordinator shutdown requested");
1540 }
1541 fn shutdown_cleanup(
1543 &mut self,
1544 active_validations: &mut HashMap<SocketAddr, PathValidationState>,
1545 local_candidates: &mut HashMap<VarInt, AddressCandidate>,
1546 remote_candidates: &mut HashMap<VarInt, AddressCandidate>,
1547 candidate_pairs: &mut Vec<CandidatePair>,
1548 coordination: &mut Option<CoordinationState>,
1549 ) -> u64 {
1550 let mut cleaned = 0;
1551 cleaned += active_validations.len() as u64;
1553 active_validations.clear();
1554
1555 cleaned += local_candidates.len() as u64;
1556 local_candidates.clear();
1557
1558 cleaned += remote_candidates.len() as u64;
1559 remote_candidates.clear();
1560
1561 cleaned += candidate_pairs.len() as u64;
1562 candidate_pairs.clear();
1563
1564 if coordination.is_some() {
1565 *coordination = None;
1566 cleaned += 1;
1567 }
1568
1569 info!("Shutdown cleanup removed {} resources", cleaned);
1570 cleaned
1571 }
1572
1573 fn get_resource_stats(&self) -> &ResourceStats {
1575 &self.stats
1576 }
1577 fn update_stats(
1579 &mut self,
1580 active_validations_len: usize,
1581 local_candidates_len: usize,
1582 remote_candidates_len: usize,
1583 candidate_pairs_len: usize,
1584 ) {
1585 self.stats.active_validations = active_validations_len;
1586 self.stats.local_candidates = local_candidates_len;
1587 self.stats.remote_candidates = remote_candidates_len;
1588 self.stats.candidate_pairs = candidate_pairs_len;
1589 let current_usage = self.stats.active_validations
1591 + self.stats.local_candidates
1592 + self.stats.remote_candidates
1593 + self.stats.candidate_pairs;
1594
1595 if current_usage > self.stats.peak_memory_usage {
1596 self.stats.peak_memory_usage = current_usage;
1597 }
1598 }
1599
1600 pub(super) fn perform_cleanup(&mut self, now: Instant) {
1602 self.last_cleanup = Some(now);
1603 self.cleanup_counter += 1;
1604 self.stats.cleanup_operations += 1;
1606
1607 debug!("Performed resource cleanup #{}", self.cleanup_counter);
1608 }
1609}
1610
1611impl NetworkConditionMonitor {
1612 fn new() -> Self {
1614 Self {
1615 rtt_samples: VecDeque::new(),
1616 max_samples: 20,
1617 packet_loss_rate: 0.0,
1618 congestion_window: 10,
1619 quality_score: 0.8, last_quality_update: Instant::now(),
1621 quality_update_interval: Duration::from_secs(10),
1622 timeout_stats: TimeoutStatistics::default(),
1623 }
1624 }
1625 fn record_success(&mut self, rtt: Duration, now: Instant) {
1627 self.rtt_samples.push_back(rtt);
1629 if self.rtt_samples.len() > self.max_samples {
1630 self.rtt_samples.pop_front();
1631 }
1632 self.timeout_stats.total_responses += 1;
1634 self.update_timeout_stats(now);
1635
1636 self.update_quality_score(now);
1638 }
1639
1640 fn record_timeout(&mut self, now: Instant) {
1642 self.timeout_stats.total_timeouts += 1;
1643 self.update_timeout_stats(now);
1644 self.update_quality_score(now);
1646 }
1647
1648 fn update_timeout_stats(&mut self, now: Instant) {
1650 let total_attempts = self.timeout_stats.total_responses + self.timeout_stats.total_timeouts;
1651 if total_attempts > 0 {
1652 self.timeout_stats.timeout_rate =
1653 self.timeout_stats.total_timeouts as f64 / total_attempts as f64;
1654 }
1655
1656 if !self.rtt_samples.is_empty() {
1658 let total_rtt: Duration = self.rtt_samples.iter().sum();
1659 self.timeout_stats.avg_response_time = total_rtt / self.rtt_samples.len() as u32;
1660 }
1661
1662 self.timeout_stats.last_update = Some(now);
1663 }
1664
1665 fn update_quality_score(&mut self, now: Instant) {
1667 if now.duration_since(self.last_quality_update) < self.quality_update_interval {
1668 return;
1669 }
1670 let timeout_factor = 1.0 - self.timeout_stats.timeout_rate;
1672 let rtt_factor = self.calculate_rtt_factor();
1673 let consistency_factor = self.calculate_consistency_factor();
1674
1675 let new_quality = (timeout_factor * 0.4) + (rtt_factor * 0.3) + (consistency_factor * 0.3);
1677
1678 self.quality_score = self.quality_score * 0.7 + new_quality * 0.3;
1680 self.last_quality_update = now;
1681 }
1682
1683 fn calculate_rtt_factor(&self) -> f64 {
1685 if self.rtt_samples.is_empty() {
1686 return 0.5; }
1688 let avg_rtt = self.timeout_stats.avg_response_time;
1689
1690 let rtt_ms = avg_rtt.as_millis() as f64;
1692 let factor = 1.0 - (rtt_ms - 50.0) / 950.0;
1693 factor.clamp(0.0, 1.0)
1694 }
1695
1696 fn calculate_consistency_factor(&self) -> f64 {
1698 if self.rtt_samples.len() < 3 {
1699 return 0.5; }
1701 let mean_rtt = self.timeout_stats.avg_response_time;
1703 let variance: f64 = self
1704 .rtt_samples
1705 .iter()
1706 .map(|rtt| {
1707 let diff = (*rtt).abs_diff(mean_rtt);
1708 diff.as_millis() as f64
1709 })
1710 .map(|diff| diff * diff)
1711 .sum::<f64>()
1712 / self.rtt_samples.len() as f64;
1713
1714 let std_dev = variance.sqrt();
1715
1716 let consistency = 1.0 - (std_dev / 1000.0).min(1.0);
1718 consistency.clamp(0.0, 1.0)
1719 }
1720
1721 fn get_quality_score(&self) -> f64 {
1723 self.quality_score
1724 }
1725 fn get_estimated_rtt(&self) -> Option<Duration> {
1727 if self.rtt_samples.is_empty() {
1728 return None;
1729 }
1730 Some(self.timeout_stats.avg_response_time)
1731 }
1732
1733 fn is_suitable_for_coordination(&self) -> bool {
1735 self.quality_score >= 0.3 && self.timeout_stats.timeout_rate < 0.5
1737 }
1738 fn get_packet_loss_rate(&self) -> f64 {
1740 self.packet_loss_rate
1741 }
1742
1743 fn get_timeout_multiplier(&self) -> f64 {
1745 let base_multiplier = 1.0;
1746
1747 let quality_multiplier = if self.quality_score < 0.3 {
1749 2.0 } else if self.quality_score > 0.8 {
1751 0.8 } else {
1753 1.0 };
1755
1756 let loss_multiplier = 1.0 + (self.packet_loss_rate * 2.0);
1758
1759 base_multiplier * quality_multiplier * loss_multiplier
1760 }
1761
1762 fn cleanup(&mut self, now: Instant) {
1764 let _cutoff_time = now - Duration::from_secs(60);
1766
1767 if let Some(last_update) = self.timeout_stats.last_update {
1769 if now.duration_since(last_update) > Duration::from_secs(300) {
1770 self.timeout_stats = TimeoutStatistics::default();
1771 }
1772 }
1773 }
1774}
1775
1776impl NatTraversalState {
1777 pub(super) fn new(
1779 role: NatTraversalRole,
1780 max_candidates: u32,
1781 coordination_timeout: Duration,
1782 ) -> Self {
1783 let bootstrap_coordinator = if matches!(role, NatTraversalRole::Bootstrap) {
1784 Some(BootstrapCoordinator::new(BootstrapConfig::default()))
1785 } else {
1786 None
1787 };
1788 Self {
1789 role,
1790 local_candidates: HashMap::new(),
1791 remote_candidates: HashMap::new(),
1792 candidate_pairs: Vec::new(),
1793 pair_index: HashMap::new(),
1794 active_validations: HashMap::new(),
1795 coordination: None,
1796 next_sequence: VarInt::from_u32(1),
1797 max_candidates,
1798 coordination_timeout,
1799 stats: NatTraversalStats::default(),
1800 security_state: SecurityValidationState::new(),
1801 network_monitor: NetworkConditionMonitor::new(),
1802 resource_manager: ResourceCleanupCoordinator::new(),
1803 bootstrap_coordinator,
1804 }
1805 }
1806
1807 pub(super) fn add_remote_candidate(
1809 &mut self,
1810 sequence: VarInt,
1811 address: SocketAddr,
1812 priority: VarInt,
1813 now: Instant,
1814 ) -> Result<(), NatTraversalError> {
1815 if self.should_reject_new_resources(now) {
1817 debug!(
1818 "Rejecting new candidate due to resource limits: {}",
1819 address
1820 );
1821 return Err(NatTraversalError::ResourceLimitExceeded);
1822 }
1823 if self.security_state.is_candidate_rate_limited(now) {
1825 self.stats.rate_limit_violations += 1;
1826 debug!("Rate limit exceeded for candidate addition: {}", address);
1827 return Err(NatTraversalError::RateLimitExceeded);
1828 }
1829
1830 match self.security_state.validate_address(address, now) {
1832 AddressValidationResult::Invalid => {
1833 self.stats.invalid_address_rejections += 1;
1834 self.stats.security_rejections += 1;
1835 debug!("Invalid address rejected: {}", address);
1836 return Err(NatTraversalError::InvalidAddress);
1837 }
1838 AddressValidationResult::Suspicious => {
1839 self.stats.security_rejections += 1;
1840 debug!("Suspicious address rejected: {}", address);
1841 return Err(NatTraversalError::SecurityValidationFailed);
1842 }
1843 AddressValidationResult::Valid => {
1844 }
1846 }
1847
1848 if self.remote_candidates.len() >= self.max_candidates as usize {
1850 return Err(NatTraversalError::TooManyCandidates);
1851 }
1852
1853 if self
1855 .remote_candidates
1856 .values()
1857 .any(|c| c.address == address && c.state != CandidateState::Removed)
1858 {
1859 return Err(NatTraversalError::DuplicateAddress);
1860 }
1861
1862 let candidate = AddressCandidate {
1863 address,
1864 priority: priority.into_inner() as u32,
1865 source: CandidateSource::Peer,
1866 discovered_at: now,
1867 state: CandidateState::New,
1868 attempt_count: 0,
1869 last_attempt: None,
1870 };
1871
1872 self.remote_candidates.insert(sequence, candidate);
1873 self.stats.remote_candidates_received += 1;
1874
1875 trace!(
1876 "Added remote candidate: {} with priority {}",
1877 address, priority
1878 );
1879 Ok(())
1880 }
1881
1882 pub(super) fn remove_candidate(&mut self, sequence: VarInt) -> bool {
1884 if let Some(candidate) = self.remote_candidates.get_mut(&sequence) {
1885 candidate.state = CandidateState::Removed;
1886 self.active_validations.remove(&candidate.address);
1888 true
1889 } else {
1890 false
1891 }
1892 }
1893
1894 pub(super) fn add_local_candidate(
1896 &mut self,
1897 address: SocketAddr,
1898 source: CandidateSource,
1899 now: Instant,
1900 ) -> VarInt {
1901 let sequence = self.next_sequence;
1902 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
1903 .expect("sequence number overflow");
1904 let candidate_type = classify_candidate_type(source);
1906 let local_preference = self.calculate_local_preference(address);
1907 let priority = calculate_candidate_priority(candidate_type, local_preference, 1);
1908
1909 let candidate = AddressCandidate {
1910 address,
1911 priority,
1912 source,
1913 discovered_at: now,
1914 state: CandidateState::New,
1915 attempt_count: 0,
1916 last_attempt: None,
1917 };
1918
1919 self.local_candidates.insert(sequence, candidate);
1920 self.stats.local_candidates_sent += 1;
1921
1922 self.generate_candidate_pairs(now);
1924
1925 sequence
1926 }
1927
1928 fn calculate_local_preference(&self, addr: SocketAddr) -> u16 {
1930 match addr {
1931 SocketAddr::V4(v4) => {
1932 if v4.ip().is_loopback() {
1933 0 } else if v4.ip().is_private() {
1935 65000 } else {
1937 32000 }
1939 }
1940 SocketAddr::V6(v6) => {
1941 if v6.ip().is_loopback() {
1942 0
1943 } else if v6.ip().segments()[0] == 0xfe80 {
1944 30000 } else {
1947 50000 }
1949 }
1950 }
1951 }
1952 pub(super) fn generate_candidate_pairs(&mut self, now: Instant) {
1954 self.candidate_pairs.clear();
1955 self.pair_index.clear();
1956 let estimated_capacity = self.local_candidates.len() * self.remote_candidates.len();
1958 self.candidate_pairs.reserve(estimated_capacity);
1959 self.pair_index.reserve(estimated_capacity);
1960
1961 let mut compatibility_cache: HashMap<(SocketAddr, SocketAddr), bool> = HashMap::new();
1963
1964 for local_candidate in self.local_candidates.values() {
1965 if local_candidate.state == CandidateState::Removed {
1967 continue;
1968 }
1969
1970 let local_type = classify_candidate_type(local_candidate.source);
1972
1973 for (remote_seq, remote_candidate) in &self.remote_candidates {
1974 if remote_candidate.state == CandidateState::Removed {
1976 continue;
1977 }
1978
1979 let cache_key = (local_candidate.address, remote_candidate.address);
1981 let compatible = *compatibility_cache.entry(cache_key).or_insert_with(|| {
1982 are_candidates_compatible(local_candidate, remote_candidate)
1983 });
1984
1985 if !compatible {
1986 continue;
1987 }
1988
1989 let pair_priority =
1991 calculate_pair_priority(local_candidate.priority, remote_candidate.priority);
1992
1993 let remote_type = classify_candidate_type(remote_candidate.source);
1995 let pair_type = classify_pair_type(local_type, remote_type);
1996
1997 let pair = CandidatePair {
1998 remote_sequence: *remote_seq,
1999 local_addr: local_candidate.address,
2000 remote_addr: remote_candidate.address,
2001 priority: pair_priority,
2002 state: PairState::Waiting,
2003 pair_type,
2004 created_at: now,
2005 last_check: None,
2006 };
2007
2008 let index = self.candidate_pairs.len();
2010 self.pair_index.insert(remote_candidate.address, index);
2011 self.candidate_pairs.push(pair);
2012 }
2013 }
2014
2015 self.candidate_pairs
2017 .sort_unstable_by(|a, b| b.priority.cmp(&a.priority));
2018
2019 self.pair_index.clear();
2021 for (idx, pair) in self.candidate_pairs.iter().enumerate() {
2022 self.pair_index.insert(pair.remote_addr, idx);
2023 }
2024
2025 trace!("Generated {} candidate pairs", self.candidate_pairs.len());
2026 }
2027
2028 pub(super) fn get_next_validation_pairs(
2030 &mut self,
2031 max_concurrent: usize,
2032 ) -> Vec<&mut CandidatePair> {
2033 let mut result = Vec::with_capacity(max_concurrent);
2036 for pair in self.candidate_pairs.iter_mut() {
2037 if pair.state == PairState::Waiting {
2038 result.push(pair);
2039 if result.len() >= max_concurrent {
2040 break;
2041 }
2042 }
2043 }
2044
2045 result
2046 }
2047
2048 pub(super) fn find_pair_by_remote_addr(
2050 &mut self,
2051 addr: SocketAddr,
2052 ) -> Option<&mut CandidatePair> {
2053 if let Some(&index) = self.pair_index.get(&addr) {
2055 self.candidate_pairs.get_mut(index)
2056 } else {
2057 None
2058 }
2059 }
2060 pub(super) fn mark_pair_succeeded(&mut self, remote_addr: SocketAddr) -> bool {
2062 let (succeeded_type, succeeded_priority) = {
2064 if let Some(pair) = self.find_pair_by_remote_addr(remote_addr) {
2065 pair.state = PairState::Succeeded;
2066 (pair.pair_type, pair.priority)
2067 } else {
2068 return false;
2069 }
2070 };
2071 for other_pair in &mut self.candidate_pairs {
2073 if other_pair.pair_type == succeeded_type
2074 && other_pair.priority < succeeded_priority
2075 && other_pair.state == PairState::Waiting
2076 {
2077 other_pair.state = PairState::Frozen;
2078 }
2079 }
2080
2081 true
2082 }
2083
2084 pub(super) fn get_best_succeeded_pairs(&self) -> Vec<&CandidatePair> {
2086 let mut best_ipv4: Option<&CandidatePair> = None;
2087 let mut best_ipv6: Option<&CandidatePair> = None;
2088 for pair in &self.candidate_pairs {
2089 if pair.state != PairState::Succeeded {
2090 continue;
2091 }
2092
2093 match pair.remote_addr {
2094 SocketAddr::V4(_) => {
2095 if best_ipv4.is_none_or(|best| pair.priority > best.priority) {
2096 best_ipv4 = Some(pair);
2097 }
2098 }
2099 SocketAddr::V6(_) => {
2100 if best_ipv6.is_none_or(|best| pair.priority > best.priority) {
2101 best_ipv6 = Some(pair);
2102 }
2103 }
2104 }
2105 }
2106
2107 let mut result = Vec::new();
2108 if let Some(pair) = best_ipv4 {
2109 result.push(pair);
2110 }
2111 if let Some(pair) = best_ipv6 {
2112 result.push(pair);
2113 }
2114 result
2115 }
2116
2117 pub(super) fn get_validation_candidates(&self) -> Vec<(VarInt, &AddressCandidate)> {
2119 let mut candidates: Vec<_> = self
2120 .remote_candidates
2121 .iter()
2122 .filter(|(_, c)| c.state == CandidateState::New)
2123 .map(|(k, v)| (*k, v))
2124 .collect();
2125 candidates.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
2127 candidates
2128 }
2129
2130 pub(super) fn start_validation(
2132 &mut self,
2133 sequence: VarInt,
2134 challenge: u64,
2135 now: Instant,
2136 ) -> Result<(), NatTraversalError> {
2137 let candidate = self
2138 .remote_candidates
2139 .get_mut(&sequence)
2140 .ok_or(NatTraversalError::UnknownCandidate)?;
2141 if candidate.state != CandidateState::New {
2142 return Err(NatTraversalError::InvalidCandidateState);
2143 }
2144
2145 if Self::is_validation_suspicious(candidate, now) {
2147 self.stats.security_rejections += 1;
2148 debug!(
2149 "Suspicious validation attempt rejected for address {}",
2150 candidate.address
2151 );
2152 return Err(NatTraversalError::SecurityValidationFailed);
2153 }
2154
2155 if self.active_validations.len() >= 10 {
2157 debug!(
2158 "Too many concurrent validations, rejecting new validation for {}",
2159 candidate.address
2160 );
2161 return Err(NatTraversalError::SecurityValidationFailed);
2162 }
2163
2164 candidate.state = CandidateState::Validating;
2166 candidate.attempt_count += 1;
2167 candidate.last_attempt = Some(now);
2168
2169 let validation = PathValidationState {
2171 challenge,
2172 sent_at: now,
2173 retry_count: 0,
2174 max_retries: 3, coordination_round: self.coordination.as_ref().map(|c| c.round),
2176 timeout_state: AdaptiveTimeoutState::new(),
2177 last_retry_at: None,
2178 };
2179
2180 self.active_validations
2181 .insert(candidate.address, validation);
2182 trace!(
2183 "Started validation for candidate {} with challenge {}",
2184 candidate.address, challenge
2185 );
2186 Ok(())
2187 }
2188
2189 fn is_validation_suspicious(candidate: &AddressCandidate, now: Instant) -> bool {
2191 if candidate.attempt_count > 10 {
2193 return true;
2194 }
2195 if let Some(last_attempt) = candidate.last_attempt {
2197 let time_since_last = now.duration_since(last_attempt);
2198 if time_since_last < Duration::from_millis(100) {
2199 return true; }
2201 }
2202
2203 if candidate.state == CandidateState::Failed {
2205 let time_since_discovery = now.duration_since(candidate.discovered_at);
2206 if time_since_discovery < Duration::from_secs(60) {
2207 return true; }
2209 }
2210
2211 false
2212 }
2213
2214 pub(super) fn handle_validation_success(
2216 &mut self,
2217 remote_addr: SocketAddr,
2218 challenge: u64,
2219 now: Instant,
2220 ) -> Result<VarInt, NatTraversalError> {
2221 let sequence = self
2223 .remote_candidates
2224 .iter()
2225 .find(|(_, c)| c.address == remote_addr)
2226 .map(|(seq, _)| *seq)
2227 .ok_or(NatTraversalError::UnknownCandidate)?;
2228 let validation = self
2230 .active_validations
2231 .get_mut(&remote_addr)
2232 .ok_or(NatTraversalError::NoActiveValidation)?;
2233
2234 if validation.challenge != challenge {
2235 return Err(NatTraversalError::ChallengeMismatch);
2236 }
2237
2238 let rtt = now.duration_since(validation.sent_at);
2240 validation.timeout_state.update_success(rtt);
2241
2242 self.network_monitor.record_success(rtt, now);
2244
2245 let candidate = self
2247 .remote_candidates
2248 .get_mut(&sequence)
2249 .ok_or(NatTraversalError::UnknownCandidate)?;
2250
2251 candidate.state = CandidateState::Valid;
2252 self.active_validations.remove(&remote_addr);
2253 self.stats.validations_succeeded += 1;
2254
2255 trace!(
2256 "Validation successful for {} with RTT {:?}",
2257 remote_addr, rtt
2258 );
2259 Ok(sequence)
2260 }
2261
2262 pub(super) fn start_coordination_round(
2264 &mut self,
2265 targets: Vec<PunchTarget>,
2266 now: Instant,
2267 ) -> Result<VarInt, NatTraversalError> {
2268 if self.security_state.is_coordination_rate_limited(now) {
2270 self.stats.rate_limit_violations += 1;
2271 debug!(
2272 "Rate limit exceeded for coordination request with {} targets",
2273 targets.len()
2274 );
2275 return Err(NatTraversalError::RateLimitExceeded);
2276 }
2277 if self.is_coordination_suspicious(&targets, now) {
2279 self.stats.suspicious_coordination_attempts += 1;
2280 self.stats.security_rejections += 1;
2281 debug!(
2282 "Suspicious coordination request rejected with {} targets",
2283 targets.len()
2284 );
2285 return Err(NatTraversalError::SuspiciousCoordination);
2286 }
2287
2288 for target in &targets {
2290 match self
2291 .security_state
2292 .validate_address(target.remote_addr, now)
2293 {
2294 AddressValidationResult::Invalid => {
2295 self.stats.invalid_address_rejections += 1;
2296 self.stats.security_rejections += 1;
2297 debug!(
2298 "Invalid target address in coordination: {}",
2299 target.remote_addr
2300 );
2301 return Err(NatTraversalError::InvalidAddress);
2302 }
2303 AddressValidationResult::Suspicious => {
2304 self.stats.security_rejections += 1;
2305 debug!(
2306 "Suspicious target address in coordination: {}",
2307 target.remote_addr
2308 );
2309 return Err(NatTraversalError::SecurityValidationFailed);
2310 }
2311 AddressValidationResult::Valid => {
2312 }
2314 }
2315 }
2316
2317 let round = self.next_sequence;
2318 self.next_sequence = VarInt::from_u64(self.next_sequence.into_inner() + 1)
2319 .expect("sequence number overflow");
2320
2321 let coordination_grace = Duration::from_millis(500); let punch_start = now + coordination_grace;
2324
2325 self.coordination = Some(CoordinationState {
2326 round,
2327 punch_targets: targets,
2328 round_start: now,
2329 punch_start,
2330 round_duration: self.coordination_timeout,
2331 state: CoordinationPhase::Requesting,
2332 punch_request_sent: false,
2333 peer_punch_received: false,
2334 retry_count: 0,
2335 max_retries: 3,
2336 timeout_state: AdaptiveTimeoutState::new(),
2337 last_retry_at: None,
2338 });
2339
2340 self.stats.coordination_rounds += 1;
2341 trace!(
2342 "Started coordination round {} with {} targets",
2343 round,
2344 self.coordination.as_ref().unwrap().punch_targets.len()
2345 );
2346 Ok(round)
2347 }
2348
2349 fn is_coordination_suspicious(&self, targets: &[PunchTarget], _now: Instant) -> bool {
2351 if targets.len() > 20 {
2353 return true;
2354 }
2355 let mut seen_addresses = std::collections::HashSet::new();
2357 for target in targets {
2358 if !seen_addresses.insert(target.remote_addr) {
2359 return true; }
2361 }
2362
2363 if targets.len() > 5 {
2365 let mut ipv4_addresses: Vec<_> = targets
2367 .iter()
2368 .filter_map(|t| match t.remote_addr.ip() {
2369 IpAddr::V4(ipv4) => Some(u32::from(ipv4)),
2370 _ => None,
2371 })
2372 .collect();
2373
2374 if ipv4_addresses.len() >= 3 {
2375 ipv4_addresses.sort();
2376 let mut sequential_count = 1;
2377 for i in 1..ipv4_addresses.len() {
2378 if ipv4_addresses[i] == ipv4_addresses[i - 1] + 1 {
2379 sequential_count += 1;
2380 if sequential_count >= 3 {
2381 return true; }
2383 } else {
2384 sequential_count = 1;
2385 }
2386 }
2387 }
2388 }
2389
2390 false
2391 }
2392
2393 pub(super) fn get_coordination_phase(&self) -> Option<CoordinationPhase> {
2395 self.coordination.as_ref().map(|c| c.state)
2396 }
2397 pub(super) fn should_send_punch_request(&self) -> bool {
2399 if let Some(coord) = &self.coordination {
2400 coord.state == CoordinationPhase::Requesting && !coord.punch_request_sent
2401 } else {
2402 false
2403 }
2404 }
2405 pub(super) fn mark_punch_request_sent(&mut self) {
2407 if let Some(coord) = &mut self.coordination {
2408 coord.punch_request_sent = true;
2409 coord.state = CoordinationPhase::Coordinating;
2410 trace!("PUNCH_ME_NOW sent, waiting for peer coordination");
2411 }
2412 }
2413 pub(super) fn handle_peer_punch_request(
2415 &mut self,
2416 peer_round: VarInt,
2417 now: Instant,
2418 ) -> Result<bool, NatTraversalError> {
2419 if self.is_peer_coordination_suspicious(peer_round, now) {
2421 self.stats.suspicious_coordination_attempts += 1;
2422 self.stats.security_rejections += 1;
2423 debug!(
2424 "Suspicious peer coordination request rejected for round {}",
2425 peer_round
2426 );
2427 return Err(NatTraversalError::SuspiciousCoordination);
2428 }
2429 if let Some(coord) = &mut self.coordination {
2430 if coord.round == peer_round {
2431 match coord.state {
2432 CoordinationPhase::Coordinating | CoordinationPhase::Requesting => {
2433 coord.peer_punch_received = true;
2434 coord.state = CoordinationPhase::Preparing;
2435
2436 let network_rtt = self
2438 .network_monitor
2439 .get_estimated_rtt()
2440 .unwrap_or(Duration::from_millis(100));
2441 let quality_score = self.network_monitor.get_quality_score();
2442
2443 let base_grace = Duration::from_millis(150);
2445 let rtt_factor = (network_rtt.as_millis() as f64 / 100.0).clamp(0.5, 3.0);
2446 let quality_factor = (2.0 - quality_score).clamp(1.0, 2.0);
2447
2448 let adaptive_grace = Duration::from_millis(
2449 (base_grace.as_millis() as f64 * rtt_factor * quality_factor) as u64,
2450 );
2451
2452 coord.punch_start = now + adaptive_grace;
2453
2454 trace!(
2455 "Peer coordination received, punch starts in {:?} (RTT: {:?}, quality: {:.2})",
2456 adaptive_grace, network_rtt, quality_score
2457 );
2458 Ok(true)
2459 }
2460 CoordinationPhase::Preparing => {
2461 trace!("Peer coordination confirmed during preparation");
2463 Ok(true)
2464 }
2465 _ => {
2466 debug!(
2467 "Received coordination in unexpected phase: {:?}",
2468 coord.state
2469 );
2470 Ok(false)
2471 }
2472 }
2473 } else {
2474 debug!(
2475 "Received coordination for wrong round: {} vs {}",
2476 peer_round, coord.round
2477 );
2478 Ok(false)
2479 }
2480 } else {
2481 debug!("Received peer coordination but no active round");
2482 Ok(false)
2483 }
2484 }
2485
2486 fn is_peer_coordination_suspicious(&self, peer_round: VarInt, _now: Instant) -> bool {
2488 if peer_round.into_inner() == 0 {
2490 return true; }
2492 if let Some(coord) = &self.coordination {
2494 let our_round = coord.round.into_inner();
2495 let peer_round_num = peer_round.into_inner();
2496
2497 if peer_round_num > our_round + 100 || peer_round_num + 100 < our_round {
2499 return true;
2500 }
2501 }
2502
2503 false
2504 }
2505
2506 pub(super) fn should_start_punching(&self, now: Instant) -> bool {
2508 if let Some(coord) = &self.coordination {
2509 match coord.state {
2510 CoordinationPhase::Preparing => now >= coord.punch_start,
2511 CoordinationPhase::Coordinating => {
2512 coord.peer_punch_received && now >= coord.punch_start
2514 }
2515 _ => false,
2516 }
2517 } else {
2518 false
2519 }
2520 }
2521 pub(super) fn start_punching_phase(&mut self, now: Instant) {
2523 if let Some(coord) = &mut self.coordination {
2524 coord.state = CoordinationPhase::Punching;
2525 let network_rtt = self
2527 .network_monitor
2528 .get_estimated_rtt()
2529 .unwrap_or(Duration::from_millis(100));
2530
2531 let jitter_ms: u64 = rand::random::<u64>() % 11;
2533 let jitter = Duration::from_millis(jitter_ms);
2534 let transmission_time = coord.punch_start + network_rtt / 2 + jitter;
2535
2536 coord.punch_start = transmission_time.max(now);
2538
2539 trace!(
2540 "Starting synchronized hole punching at {:?} (RTT: {:?}, jitter: {:?})",
2541 coord.punch_start, network_rtt, jitter
2542 );
2543 }
2544 }
2545
2546 pub(super) fn get_punch_targets_from_coordination(&self) -> Option<&[PunchTarget]> {
2548 self.coordination
2549 .as_ref()
2550 .map(|c| c.punch_targets.as_slice())
2551 }
2552 pub(super) fn mark_coordination_validating(&mut self) {
2554 if let Some(coord) = &mut self.coordination {
2555 if coord.state == CoordinationPhase::Punching {
2556 coord.state = CoordinationPhase::Validating;
2557 trace!("Coordination moved to validation phase");
2558 }
2559 }
2560 }
2561 pub(super) fn handle_coordination_success(
2563 &mut self,
2564 remote_addr: SocketAddr,
2565 now: Instant,
2566 ) -> bool {
2567 if let Some(coord) = &mut self.coordination {
2568 let was_target = coord
2570 .punch_targets
2571 .iter()
2572 .any(|target| target.remote_addr == remote_addr);
2573 if was_target && coord.state == CoordinationPhase::Validating {
2574 let rtt = now.duration_since(coord.round_start);
2576 coord.timeout_state.update_success(rtt);
2577 self.network_monitor.record_success(rtt, now);
2578
2579 coord.state = CoordinationPhase::Succeeded;
2580 self.stats.direct_connections += 1;
2581 trace!(
2582 "Coordination succeeded via {} with RTT {:?}",
2583 remote_addr, rtt
2584 );
2585 true
2586 } else {
2587 false
2588 }
2589 } else {
2590 false
2591 }
2592 }
2593
2594 pub(super) fn handle_coordination_failure(&mut self, now: Instant) -> bool {
2596 if let Some(coord) = &mut self.coordination {
2597 coord.retry_count += 1;
2598 coord.timeout_state.update_timeout();
2599 self.network_monitor.record_timeout(now);
2600 if coord.timeout_state.should_retry(coord.max_retries)
2602 && self.network_monitor.is_suitable_for_coordination()
2603 {
2604 coord.state = CoordinationPhase::Requesting;
2606 coord.punch_request_sent = false;
2607 coord.peer_punch_received = false;
2608 coord.round_start = now;
2609 coord.last_retry_at = Some(now);
2610
2611 let retry_delay = coord.timeout_state.get_retry_delay();
2613
2614 let quality_multiplier = 2.0 - self.network_monitor.get_quality_score();
2616 let adjusted_delay = Duration::from_millis(
2617 (retry_delay.as_millis() as f64 * quality_multiplier) as u64,
2618 );
2619
2620 coord.punch_start = now + adjusted_delay;
2621
2622 trace!(
2623 "Coordination failed, retrying round {} (attempt {}) with delay {:?} (quality: {:.2})",
2624 coord.round,
2625 coord.retry_count + 1,
2626 adjusted_delay,
2627 self.network_monitor.get_quality_score()
2628 );
2629 true
2630 } else {
2631 coord.state = CoordinationPhase::Failed;
2632 self.stats.coordination_failures += 1;
2633
2634 if !self.network_monitor.is_suitable_for_coordination() {
2635 trace!(
2636 "Coordination failed due to poor network conditions (quality: {:.2})",
2637 self.network_monitor.get_quality_score()
2638 );
2639 } else {
2640 trace!("Coordination failed after {} attempts", coord.retry_count);
2641 }
2642 false
2643 }
2644 } else {
2645 false
2646 }
2647 }
2648
2649 pub(super) fn check_coordination_timeout(&mut self, now: Instant) -> bool {
2651 if let Some(coord) = &mut self.coordination {
2652 let timeout = coord.timeout_state.get_timeout();
2653 let elapsed = now.duration_since(coord.round_start);
2654 if elapsed > timeout {
2655 trace!(
2656 "Coordination round {} timed out after {:?} (adaptive timeout: {:?})",
2657 coord.round, elapsed, timeout
2658 );
2659 self.handle_coordination_failure(now);
2660 true
2661 } else {
2662 false
2663 }
2664 } else {
2665 false
2666 }
2667 }
2668
2669 pub(super) fn check_validation_timeouts(&mut self, now: Instant) -> Vec<SocketAddr> {
2671 let mut expired_validations = Vec::new();
2672 let mut retry_validations = Vec::new();
2673
2674 for (addr, validation) in &mut self.active_validations {
2675 let timeout = validation.timeout_state.get_timeout();
2676 let elapsed = now.duration_since(validation.sent_at);
2677
2678 if elapsed >= timeout {
2679 if validation
2680 .timeout_state
2681 .should_retry(validation.max_retries)
2682 {
2683 retry_validations.push(*addr);
2685 } else {
2686 expired_validations.push(*addr);
2688 }
2689 }
2690 }
2691
2692 for addr in retry_validations {
2694 if let Some(validation) = self.active_validations.get_mut(&addr) {
2695 validation.retry_count += 1;
2696 validation.sent_at = now;
2697 validation.last_retry_at = Some(now);
2698 validation.timeout_state.update_timeout();
2699
2700 trace!(
2701 "Retrying validation for {} (attempt {})",
2702 addr,
2703 validation.retry_count + 1
2704 );
2705 }
2706 }
2707
2708 for addr in &expired_validations {
2710 self.active_validations.remove(addr);
2711 self.network_monitor.record_timeout(now);
2712 trace!("Validation expired for {}", addr);
2713 }
2714
2715 expired_validations
2716 }
2717
2718 pub(super) fn schedule_validation_retries(&mut self, now: Instant) -> Vec<SocketAddr> {
2720 let mut retry_addresses = Vec::new();
2721
2722 for (addr, validation) in &mut self.active_validations {
2724 let elapsed = now.duration_since(validation.sent_at);
2725 let timeout = validation.timeout_state.get_timeout();
2726
2727 if elapsed > timeout
2728 && validation
2729 .timeout_state
2730 .should_retry(validation.max_retries)
2731 {
2732 validation.retry_count += 1;
2734 validation.last_retry_at = Some(now);
2735 validation.sent_at = now; validation.timeout_state.update_timeout();
2737
2738 retry_addresses.push(*addr);
2739 trace!(
2740 "Scheduled retry {} for validation to {}",
2741 validation.retry_count, addr
2742 );
2743 }
2744 }
2745
2746 retry_addresses
2747 }
2748
2749 pub(super) fn update_network_conditions(&mut self, now: Instant) {
2751 self.network_monitor.cleanup(now);
2752
2753 let multiplier = self.network_monitor.get_timeout_multiplier();
2755
2756 for validation in self.active_validations.values_mut() {
2758 if multiplier > 1.5 {
2759 validation.timeout_state.backoff_multiplier =
2761 (validation.timeout_state.backoff_multiplier * 1.2)
2762 .min(validation.timeout_state.max_backoff_multiplier);
2763 } else if multiplier < 0.8 {
2764 validation.timeout_state.backoff_multiplier =
2766 (validation.timeout_state.backoff_multiplier * 0.9).max(1.0);
2767 }
2768 }
2769 }
2770
2771 pub(super) fn should_retry_coordination(&self, now: Instant) -> bool {
2773 if let Some(coord) = &self.coordination {
2774 if coord.retry_count > 0 {
2775 if let Some(last_retry) = coord.last_retry_at {
2776 let retry_delay = coord.timeout_state.get_retry_delay();
2777 return now.duration_since(last_retry) >= retry_delay;
2778 }
2779 }
2780 }
2781 false
2782 }
2783
2784 pub(super) fn perform_resource_management(&mut self, now: Instant) -> u64 {
2786 self.resource_manager.update_stats(
2788 self.active_validations.len(),
2789 self.local_candidates.len(),
2790 self.remote_candidates.len(),
2791 self.candidate_pairs.len(),
2792 );
2793
2794 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2796 self.active_validations.len(),
2797 self.local_candidates.len(),
2798 self.remote_candidates.len(),
2799 self.candidate_pairs.len(),
2800 );
2801
2802 let mut cleaned = 0;
2804
2805 if self.resource_manager.should_cleanup(now) {
2806 cleaned += self.resource_manager.cleanup_expired_resources(
2807 &mut self.active_validations,
2808 &mut self.local_candidates,
2809 &mut self.remote_candidates,
2810 &mut self.candidate_pairs,
2811 &mut self.coordination,
2812 now,
2813 );
2814
2815 if memory_pressure > self.resource_manager.config.aggressive_cleanup_threshold {
2817 cleaned += self.resource_manager.aggressive_cleanup(
2818 &mut self.active_validations,
2819 &mut self.local_candidates,
2820 &mut self.remote_candidates,
2821 &mut self.candidate_pairs,
2822 now,
2823 );
2824 }
2825 }
2826
2827 cleaned
2828 }
2829
2830 pub(super) fn should_reject_new_resources(&mut self, _now: Instant) -> bool {
2832 self.resource_manager.update_stats(
2834 self.active_validations.len(),
2835 self.local_candidates.len(),
2836 self.remote_candidates.len(),
2837 self.candidate_pairs.len(),
2838 );
2839 let memory_pressure = self.resource_manager.calculate_memory_pressure(
2840 self.active_validations.len(),
2841 self.local_candidates.len(),
2842 self.remote_candidates.len(),
2843 self.candidate_pairs.len(),
2844 );
2845 if memory_pressure > self.resource_manager.config.memory_pressure_threshold {
2847 self.resource_manager.stats.allocation_failures += 1;
2848 return true;
2849 }
2850
2851 if self.resource_manager.check_resource_limits(self) {
2853 self.resource_manager.stats.allocation_failures += 1;
2854 return true;
2855 }
2856
2857 false
2858 }
2859
2860 pub(super) fn get_next_timeout(&self, now: Instant) -> Option<Instant> {
2862 let mut next_timeout = None;
2863 if let Some(coord) = &self.coordination {
2865 match coord.state {
2866 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2867 let timeout_at = coord.round_start + self.coordination_timeout;
2868 next_timeout =
2869 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2870 }
2871 CoordinationPhase::Preparing => {
2872 next_timeout = Some(
2874 next_timeout
2875 .map_or(coord.punch_start, |t: Instant| t.min(coord.punch_start)),
2876 );
2877 }
2878 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2879 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2881 next_timeout =
2882 Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2883 }
2884 _ => {}
2885 }
2886 }
2887
2888 for validation in self.active_validations.values() {
2890 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2891 next_timeout = Some(next_timeout.map_or(timeout_at, |t: Instant| t.min(timeout_at)));
2892 }
2893
2894 if self.resource_manager.should_cleanup(now) {
2896 let cleanup_at = now + Duration::from_secs(1);
2898 next_timeout = Some(next_timeout.map_or(cleanup_at, |t: Instant| t.min(cleanup_at)));
2899 }
2900
2901 next_timeout
2902 }
2903
2904 pub(super) fn handle_timeout(
2906 &mut self,
2907 now: Instant,
2908 ) -> Result<Vec<TimeoutAction>, NatTraversalError> {
2909 let mut actions = Vec::new();
2910 if let Some(coord) = &mut self.coordination {
2912 match coord.state {
2913 CoordinationPhase::Requesting | CoordinationPhase::Coordinating => {
2914 let timeout_at = coord.round_start + self.coordination_timeout;
2915 if now >= timeout_at {
2916 coord.retry_count += 1;
2917 if coord.retry_count >= coord.max_retries {
2918 debug!("Coordination failed after {} retries", coord.retry_count);
2919 coord.state = CoordinationPhase::Failed;
2920 actions.push(TimeoutAction::Failed);
2921 } else {
2922 debug!(
2923 "Coordination timeout, retrying ({}/{})",
2924 coord.retry_count, coord.max_retries
2925 );
2926 coord.state = CoordinationPhase::Requesting;
2927 coord.round_start = now;
2928 actions.push(TimeoutAction::RetryCoordination);
2929 }
2930 }
2931 }
2932 CoordinationPhase::Preparing => {
2933 if now >= coord.punch_start {
2935 debug!("Starting coordinated hole punching");
2936 coord.state = CoordinationPhase::Punching;
2937 actions.push(TimeoutAction::StartValidation);
2938 }
2939 }
2940 CoordinationPhase::Punching | CoordinationPhase::Validating => {
2941 let timeout_at = coord.round_start + coord.timeout_state.get_timeout();
2942 if now >= timeout_at {
2943 coord.retry_count += 1;
2944 if coord.retry_count >= coord.max_retries {
2945 debug!("Validation failed after {} retries", coord.retry_count);
2946 coord.state = CoordinationPhase::Failed;
2947 actions.push(TimeoutAction::Failed);
2948 } else {
2949 debug!(
2950 "Validation timeout, retrying ({}/{})",
2951 coord.retry_count, coord.max_retries
2952 );
2953 coord.state = CoordinationPhase::Punching;
2954 actions.push(TimeoutAction::StartValidation);
2955 }
2956 }
2957 }
2958 CoordinationPhase::Succeeded => {
2959 actions.push(TimeoutAction::Complete);
2960 }
2961 CoordinationPhase::Failed => {
2962 actions.push(TimeoutAction::Failed);
2963 }
2964 _ => {}
2965 }
2966 }
2967
2968 let mut expired_validations = Vec::new();
2970 for (addr, validation) in &mut self.active_validations {
2971 let timeout_at = validation.sent_at + validation.timeout_state.get_timeout();
2972 if now >= timeout_at {
2973 validation.retry_count += 1;
2974 if validation.retry_count >= validation.max_retries {
2975 debug!("Path validation failed for {}: max retries exceeded", addr);
2976 expired_validations.push(*addr);
2977 } else {
2978 debug!(
2979 "Path validation timeout for {}, retrying ({}/{})",
2980 addr, validation.retry_count, validation.max_retries
2981 );
2982 validation.sent_at = now;
2983 validation.last_retry_at = Some(now);
2984 actions.push(TimeoutAction::StartValidation);
2985 }
2986 }
2987 }
2988
2989 for addr in expired_validations {
2991 self.active_validations.remove(&addr);
2992 }
2993
2994 if self.resource_manager.should_cleanup(now) {
2996 self.resource_manager.perform_cleanup(now);
2997 }
2998
2999 self.network_monitor.update_quality_score(now);
3001
3002 if self.coordination.is_none()
3004 && !self.local_candidates.is_empty()
3005 && !self.remote_candidates.is_empty()
3006 {
3007 actions.push(TimeoutAction::RetryDiscovery);
3008 }
3009
3010 Ok(actions)
3011 }
3012
3013 pub(super) fn handle_address_observation(
3018 &mut self,
3019 peer_id: [u8; 32],
3020 observed_address: SocketAddr,
3021 connection_id: crate::shared::ConnectionId,
3022 peer_role: NatTraversalRole,
3023 now: Instant,
3024 ) -> Result<Option<crate::frame::AddAddress>, NatTraversalError> {
3025 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3026 let connection_context = ConnectionContext {
3027 connection_id,
3028 original_destination: observed_address, peer_role,
3030 transport_params: None,
3031 };
3032
3033 bootstrap_coordinator.observe_peer_address(
3035 peer_id,
3036 observed_address,
3037 connection_context,
3038 now,
3039 )?;
3040
3041 let sequence = self.next_sequence;
3043 self.next_sequence =
3044 VarInt::from_u32((self.next_sequence.into_inner() + 1).try_into().unwrap());
3045
3046 let priority = VarInt::from_u32(100); let add_address_frame =
3048 bootstrap_coordinator.generate_add_address_frame(peer_id, sequence, priority);
3049
3050 Ok(add_address_frame)
3051 } else {
3052 Ok(None)
3054 }
3055 }
3056
3057 pub(super) fn handle_punch_me_now_frame(
3062 &mut self,
3063 from_peer: [u8; 32],
3064 source_addr: SocketAddr,
3065 frame: &crate::frame::PunchMeNow,
3066 now: Instant,
3067 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3068 if let Some(bootstrap_coordinator) = &mut self.bootstrap_coordinator {
3069 bootstrap_coordinator.process_punch_me_now_frame(from_peer, source_addr, frame, now)
3070 } else {
3071 Ok(None)
3073 }
3074 }
3075 pub(super) fn get_observed_address(&self, peer_id: [u8; 32]) -> Option<SocketAddr> {
3079 self.bootstrap_coordinator
3080 .as_ref()
3081 .and_then(|coord| coord.get_peer_record(peer_id))
3082 .map(|record| record.observed_address)
3083 }
3084
3085 pub(super) fn start_candidate_discovery(&mut self) -> Result<(), NatTraversalError> {
3087 debug!("Starting candidate discovery for NAT traversal");
3088 if self.local_candidates.is_empty() {
3090 debug!("Local candidates will be populated by discovery manager");
3093 }
3094
3095 Ok(())
3096 }
3097
3098 pub(super) fn queue_add_address_frame(
3100 &mut self,
3101 sequence: VarInt,
3102 address: SocketAddr,
3103 priority: u32,
3104 ) -> Result<(), NatTraversalError> {
3105 debug!(
3106 "Queuing ADD_ADDRESS frame: seq={}, addr={}, priority={}",
3107 sequence, address, priority
3108 );
3109
3110 let candidate = AddressCandidate {
3112 address,
3113 priority,
3114 source: CandidateSource::Local,
3115 discovered_at: Instant::now(),
3116 state: CandidateState::New,
3117 attempt_count: 0,
3118 last_attempt: None,
3119 };
3120
3121 if !self.local_candidates.values().any(|c| c.address == address) {
3123 self.local_candidates.insert(sequence, candidate);
3124 }
3125
3126 Ok(())
3127 }
3128}
3129
3130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3132pub(crate) enum NatTraversalError {
3133 TooManyCandidates,
3135 DuplicateAddress,
3137 UnknownCandidate,
3139 InvalidCandidateState,
3141 NoActiveValidation,
3143 ChallengeMismatch,
3145 NoActiveCoordination,
3147 SecurityValidationFailed,
3149 RateLimitExceeded,
3151 InvalidAddress,
3153 SuspiciousCoordination,
3155 ResourceLimitExceeded,
3157}
3158impl std::fmt::Display for NatTraversalError {
3159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3160 match self {
3161 Self::TooManyCandidates => write!(f, "too many candidates"),
3162 Self::DuplicateAddress => write!(f, "duplicate address"),
3163 Self::UnknownCandidate => write!(f, "unknown candidate"),
3164 Self::InvalidCandidateState => write!(f, "invalid candidate state"),
3165 Self::NoActiveValidation => write!(f, "no active validation"),
3166 Self::ChallengeMismatch => write!(f, "challenge mismatch"),
3167 Self::NoActiveCoordination => write!(f, "no active coordination"),
3168 Self::SecurityValidationFailed => write!(f, "security validation failed"),
3169 Self::RateLimitExceeded => write!(f, "rate limit exceeded"),
3170 Self::InvalidAddress => write!(f, "invalid address"),
3171 Self::SuspiciousCoordination => write!(f, "suspicious coordination request"),
3172 Self::ResourceLimitExceeded => write!(f, "resource limit exceeded"),
3173 }
3174 }
3175}
3176
3177impl std::error::Error for NatTraversalError {}
3178
3179#[derive(Debug, Clone)]
3181pub(crate) struct SecurityStats {
3182 pub total_security_rejections: u32,
3184 pub rate_limit_violations: u32,
3186 pub invalid_address_rejections: u32,
3188 pub suspicious_coordination_attempts: u32,
3190 pub active_validations: usize,
3192 pub cached_address_validations: usize,
3194 pub current_candidate_rate: usize,
3196 pub current_coordination_rate: usize,
3198}
3199#[derive(Debug)]
3204pub(crate) struct BootstrapCoordinator {
3205 peer_registry: HashMap<PeerId, PeerObservationRecord>,
3207 coordination_sessions: HashMap<CoordinationSessionId, CoordinationSession>,
3209 address_observations: HashMap<SocketAddr, AddressObservation>,
3211 security_validator: SecurityValidationState,
3213 stats: BootstrapStats,
3215 _config: BootstrapConfig,
3217 _last_cleanup: Option<Instant>,
3219}
3220type CoordinationSessionId = u64;
3222type PeerId = [u8; 32];
3224#[derive(Debug, Clone)]
3226pub(crate) struct PeerObservationRecord {
3227 peer_id: PeerId,
3229 observed_address: SocketAddr,
3231 observed_at: Instant,
3233 connection_context: ConnectionContext,
3235 can_coordinate: bool,
3237 coordination_count: u32,
3239 success_rate: f64,
3241}
3242
3243#[derive(Debug, Clone)]
3245pub(crate) struct ConnectionContext {
3246 connection_id: ConnectionId,
3248 original_destination: SocketAddr,
3250 peer_role: NatTraversalRole,
3252 transport_params: Option<NatTraversalTransportParams>,
3254}
3255
3256#[derive(Debug, Clone)]
3258struct NatTraversalTransportParams {
3259 max_candidates: u32,
3261 coordination_timeout: Duration,
3263 supports_advanced_features: bool,
3265}
3266
3267#[derive(Debug, Clone)]
3269struct AddressObservation {
3270 address: SocketAddr,
3272 first_observed: Instant,
3274 observation_count: u32,
3276 validation_state: AddressValidationResult,
3278 associated_peers: Vec<PeerId>,
3280}
3281
3282#[derive(Debug, Clone)]
3284pub(crate) struct CoordinationSession {
3285 session_id: CoordinationSessionId,
3287 peer_a: PeerId,
3289 peer_b: PeerId,
3291 current_round: VarInt,
3293 started_at: Instant,
3295 phase: CoordinationPhase,
3297 target_addresses: Vec<(SocketAddr, VarInt)>, sync_state: SynchronizationState,
3301 stats: CoordinationSessionStats,
3303}
3304#[derive(Debug, Clone)]
3306struct SynchronizationState {
3307 peer_a_ready: bool,
3309 peer_b_ready: bool,
3311}
3312#[derive(Debug, Clone, Default)]
3314struct CoordinationSessionStats {
3315 successful_coordinations: u32,
3317}
3318#[derive(Debug, Clone, Default)]
3321pub(crate) struct BootstrapConfig {
3322 _unused: (),
3323}
3324#[derive(Debug, Clone, Default)]
3326pub(crate) struct BootstrapStats {
3327 total_observations: u64,
3329 total_coordinations: u64,
3331 successful_coordinations: u64,
3333 active_peers: usize,
3335 active_sessions: usize,
3337 security_rejections: u64,
3339}
3340#[derive(Debug, Clone)]
3342pub(crate) enum CoordinationSessionEvent {
3343 PhaseChanged {
3345 session_id: CoordinationSessionId,
3346 old_phase: CoordinationPhase,
3347 new_phase: CoordinationPhase,
3348 },
3349 SessionFailed {
3351 session_id: CoordinationSessionId,
3352 peer_a: PeerId,
3353 peer_b: PeerId,
3354 reason: String,
3355 },
3356 StartHolePunching {
3358 session_id: CoordinationSessionId,
3359 peer_a: PeerId,
3360 peer_b: PeerId,
3361 target_addresses: Vec<(SocketAddr, VarInt)>,
3362 },
3363 ReadyForCleanup { session_id: CoordinationSessionId },
3365}
3366#[derive(Debug, Clone, Copy)]
3368enum SessionAdvancementEvent {
3369 BothPeersReady,
3371 CoordinationComplete,
3373 PreparationComplete,
3375 PunchingComplete,
3377 ValidationTimeout,
3379 Timeout,
3381 ReadyForCleanup,
3383}
3384#[derive(Debug, Clone, Copy)]
3386pub(crate) enum CoordinationRecoveryAction {
3387 NoAction,
3389 RetryWithBackoff,
3391 MarkAsFailed,
3393 Cleanup,
3395}
3396impl BootstrapCoordinator {
3397 pub(crate) fn new(config: BootstrapConfig) -> Self {
3399 Self {
3400 peer_registry: HashMap::new(),
3401 coordination_sessions: HashMap::new(),
3402 address_observations: HashMap::new(),
3403 security_validator: SecurityValidationState::new(),
3404 stats: BootstrapStats::default(),
3405 _config: config,
3406 _last_cleanup: None,
3407 }
3408 }
3409 pub(crate) fn observe_peer_address(
3414 &mut self,
3415 peer_id: PeerId,
3416 observed_address: SocketAddr,
3417 connection_context: ConnectionContext,
3418 now: Instant,
3419 ) -> Result<(), NatTraversalError> {
3420 match self
3422 .security_validator
3423 .validate_address(observed_address, now)
3424 {
3425 AddressValidationResult::Valid => {}
3426 AddressValidationResult::Invalid => {
3427 self.stats.security_rejections += 1;
3428 return Err(NatTraversalError::InvalidAddress);
3429 }
3430 AddressValidationResult::Suspicious => {
3431 self.stats.security_rejections += 1;
3432 return Err(NatTraversalError::SecurityValidationFailed);
3433 }
3434 }
3435
3436 if self.security_validator.is_candidate_rate_limited(now) {
3438 self.stats.security_rejections += 1;
3439 return Err(NatTraversalError::RateLimitExceeded);
3440 }
3441
3442 let observation = self
3444 .address_observations
3445 .entry(observed_address)
3446 .or_insert_with(|| AddressObservation {
3447 address: observed_address,
3448 first_observed: now,
3449 observation_count: 0,
3450 validation_state: AddressValidationResult::Valid,
3451 associated_peers: Vec::new(),
3452 });
3453
3454 observation.observation_count += 1;
3455 if !observation.associated_peers.contains(&peer_id) {
3456 observation.associated_peers.push(peer_id);
3457 }
3458
3459 let peer_record = PeerObservationRecord {
3461 peer_id,
3462 observed_address,
3463 observed_at: now,
3464 connection_context,
3465 can_coordinate: true, coordination_count: 0,
3467 success_rate: 1.0,
3468 };
3469
3470 self.peer_registry.insert(peer_id, peer_record);
3471 self.stats.total_observations += 1;
3472 self.stats.active_peers = self.peer_registry.len();
3473
3474 debug!(
3475 "Observed peer {:?} at address {} (total observations: {})",
3476 peer_id, observed_address, self.stats.total_observations
3477 );
3478
3479 Ok(())
3480 }
3481
3482 pub(crate) fn generate_add_address_frame(
3487 &self,
3488 peer_id: PeerId,
3489 sequence: VarInt,
3490 priority: VarInt,
3491 ) -> Option<crate::frame::AddAddress> {
3492 self.peer_registry
3493 .get(&peer_id)
3494 .map(|peer_record| crate::frame::AddAddress {
3495 sequence,
3496 address: peer_record.observed_address,
3497 priority,
3498 })
3499 }
3500
3501 pub(crate) fn process_punch_me_now_frame(
3506 &mut self,
3507 from_peer: PeerId,
3508 source_addr: SocketAddr,
3509 frame: &crate::frame::PunchMeNow,
3510 now: Instant,
3511 ) -> Result<Option<crate::frame::PunchMeNow>, NatTraversalError> {
3512 if self
3514 .security_validator
3515 .is_adaptive_rate_limited(from_peer, now)
3516 {
3517 self.stats.security_rejections += 1;
3518 debug!(
3519 "PUNCH_ME_NOW frame rejected: adaptive rate limit exceeded for peer {:?}",
3520 hex::encode(&from_peer[..8])
3521 );
3522 return Err(NatTraversalError::RateLimitExceeded);
3523 }
3524 self.security_validator
3526 .enhanced_address_validation(frame.address, source_addr, now)
3527 .inspect_err(|&e| {
3528 self.stats.security_rejections += 1;
3529 debug!(
3530 "PUNCH_ME_NOW frame address validation failed from peer {:?}: {:?}",
3531 hex::encode(&from_peer[..8]),
3532 e
3533 );
3534 })?;
3535
3536 self.security_validator
3538 .validate_punch_me_now_frame(frame, source_addr, from_peer, now)
3539 .inspect_err(|&e| {
3540 self.stats.security_rejections += 1;
3541 debug!(
3542 "PUNCH_ME_NOW frame validation failed from peer {:?}: {:?}",
3543 hex::encode(&from_peer[..8]),
3544 e
3545 );
3546 })?;
3547
3548 if let Some(target_peer_id) = frame.target_peer_id {
3550 if let Some(target_peer) = self.peer_registry.get(&target_peer_id) {
3552 let session_id = self.generate_session_id();
3554
3555 if !self.coordination_sessions.contains_key(&session_id) {
3556 let _network_rtt = self
3558 .estimate_peer_rtt(&from_peer)
3559 .unwrap_or(Duration::from_millis(100));
3560
3561 let session = CoordinationSession {
3562 session_id,
3563 peer_a: from_peer,
3564 peer_b: target_peer_id,
3565 current_round: frame.round,
3566 started_at: now,
3567 phase: CoordinationPhase::Requesting,
3568 target_addresses: vec![(frame.address, frame.paired_with_sequence_number)],
3569 sync_state: SynchronizationState {
3570 peer_a_ready: true, peer_b_ready: false,
3572 },
3573 stats: CoordinationSessionStats::default(),
3574 };
3575
3576 self.coordination_sessions.insert(session_id, session);
3577 self.stats.total_coordinations += 1;
3578 self.stats.active_sessions = self.coordination_sessions.len();
3579 }
3580
3581 let coordination_frame = crate::frame::PunchMeNow {
3583 round: frame.round,
3584 paired_with_sequence_number: frame.paired_with_sequence_number,
3585 address: target_peer.observed_address,
3586 target_peer_id: Some(from_peer),
3587 };
3588
3589 info!(
3590 "Coordinating hole punch between {:?} and {:?} (round: {})",
3591 from_peer, target_peer_id, frame.round
3592 );
3593
3594 Ok(Some(coordination_frame))
3595 } else {
3596 warn!(
3598 "Target peer {:?} not found for coordination from {:?}",
3599 target_peer_id, from_peer
3600 );
3601 Ok(None)
3602 }
3603 } else {
3604 let session_id = if let Some(session) =
3606 self.find_coordination_session_by_peer(from_peer, frame.round)
3607 {
3608 session.sync_state.peer_b_ready = true;
3609
3610 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3612 session.phase = CoordinationPhase::Punching;
3613 session.stats.successful_coordinations += 1;
3614 Some(session.session_id)
3615 } else {
3616 None
3617 }
3618 } else {
3619 None
3620 };
3621
3622 if let Some(session_id) = session_id {
3624 self.stats.successful_coordinations += 1;
3625 info!(
3626 "Coordination complete for session {} (round: {})",
3627 session_id, frame.round
3628 );
3629 }
3630
3631 Ok(None)
3632 }
3633 }
3634
3635 fn find_coordination_session_by_peer(
3637 &mut self,
3638 peer_id: PeerId,
3639 round: VarInt,
3640 ) -> Option<&mut CoordinationSession> {
3641 self.coordination_sessions.values_mut().find(|session| {
3642 (session.peer_a == peer_id || session.peer_b == peer_id)
3643 && session.current_round == round
3644 })
3645 }
3646 fn generate_session_id(&self) -> CoordinationSessionId {
3648 rand::random()
3649 }
3650 pub(crate) fn generate_secure_coordination_round(&self) -> VarInt {
3652 self.security_validator.generate_secure_coordination_round()
3653 }
3654
3655 pub(crate) fn validate_coordination_security(
3657 &mut self,
3658 peer_id: PeerId,
3659 source_addr: SocketAddr,
3660 target_addr: SocketAddr,
3661 now: Instant,
3662 ) -> Result<(), NatTraversalError> {
3663 if self
3665 .security_validator
3666 .is_adaptive_rate_limited(peer_id, now)
3667 {
3668 self.stats.security_rejections += 1;
3669 return Err(NatTraversalError::RateLimitExceeded);
3670 }
3671
3672 self.security_validator
3674 .enhanced_address_validation(target_addr, source_addr, now)?;
3675
3676 self.security_validator
3678 .validate_amplification_limits(source_addr, target_addr, now)?;
3679
3680 Ok(())
3681 }
3682
3683 pub(crate) fn cleanup_expired_sessions(&mut self, now: Instant) {
3685 let session_timeout = Duration::from_secs(300); let expired_sessions: Vec<CoordinationSessionId> = self
3689 .coordination_sessions
3690 .iter()
3691 .filter(|(_, session)| now.duration_since(session.started_at) > session_timeout)
3692 .map(|(&session_id, _)| session_id)
3693 .collect();
3694
3695 for session_id in expired_sessions {
3697 if let Some(session) = self.coordination_sessions.remove(&session_id) {
3698 debug!(
3699 "Cleaned up expired coordination session {} between {:?} and {:?}",
3700 session_id,
3701 hex::encode(&session.peer_a[..8]),
3702 hex::encode(&session.peer_b[..8])
3703 );
3704 }
3705 }
3706
3707 self.stats.active_sessions = self.coordination_sessions.len();
3709
3710 let observation_timeout = Duration::from_secs(3600); self.peer_registry
3713 .retain(|_, record| now.duration_since(record.observed_at) <= observation_timeout);
3714
3715 self.stats.active_peers = self.peer_registry.len();
3717
3718 self.address_observations.retain(|_, observation| {
3720 now.duration_since(observation.first_observed) <= observation_timeout
3721 });
3722 }
3723
3724 pub(crate) fn get_stats(&self) -> &BootstrapStats {
3726 &self.stats
3727 }
3728
3729 pub(crate) fn update_peer_coordination_stats(&mut self, peer_id: PeerId, success: bool) {
3731 if let Some(peer_record) = self.peer_registry.get_mut(&peer_id) {
3732 peer_record.coordination_count += 1;
3733
3734 if success {
3735 let alpha = 0.1; peer_record.success_rate = peer_record.success_rate * (1.0 - alpha) + alpha;
3738 } else {
3739 let alpha = 0.1;
3741 peer_record.success_rate *= 1.0 - alpha;
3742 }
3743
3744 if peer_record.success_rate < 0.1 && peer_record.coordination_count > 10 {
3746 peer_record.can_coordinate = false;
3747 warn!(
3748 "Disabled coordination for peer {:?} due to low success rate: {:.2}",
3749 hex::encode(&peer_id[..8]),
3750 peer_record.success_rate
3751 );
3752 }
3753 }
3754 }
3755
3756 pub(crate) fn poll_session_state_machine(
3761 &mut self,
3762 now: Instant,
3763 ) -> Vec<CoordinationSessionEvent> {
3764 let mut events = Vec::new();
3765 let mut sessions_to_update = Vec::new();
3766
3767 for (&session_id, session) in &self.coordination_sessions {
3769 if let Some(event) = self.should_advance_session(session, now) {
3770 sessions_to_update.push((session_id, event));
3771 }
3772 }
3773
3774 for (session_id, event) in sessions_to_update {
3776 let session_events =
3777 if let Some(session) = self.coordination_sessions.get_mut(&session_id) {
3778 let peer_a = session.peer_a;
3779 let peer_b = session.peer_b;
3780
3781 match Self::advance_session_state_static(session, event, now) {
3782 Ok(session_events) => session_events,
3783 Err(e) => {
3784 warn!("Failed to advance session {} state: {:?}", session_id, e);
3785 session.phase = CoordinationPhase::Failed;
3787 vec![CoordinationSessionEvent::SessionFailed {
3788 session_id,
3789 peer_a,
3790 peer_b,
3791 reason: format!("State advancement error: {e:?}"),
3792 }]
3793 }
3794 }
3795 } else {
3796 Vec::new()
3797 };
3798
3799 events.extend(session_events);
3800 }
3801
3802 self.cleanup_completed_sessions(now);
3804
3805 events
3806 }
3807
3808 fn should_advance_session(
3810 &self,
3811 session: &CoordinationSession,
3812 now: Instant,
3813 ) -> Option<SessionAdvancementEvent> {
3814 let session_age = now.duration_since(session.started_at);
3815
3816 match session.phase {
3817 CoordinationPhase::Requesting => {
3818 if session_age > Duration::from_secs(10) {
3820 Some(SessionAdvancementEvent::Timeout)
3821 } else if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
3822 Some(SessionAdvancementEvent::BothPeersReady)
3823 } else {
3824 None
3825 }
3826 }
3827 CoordinationPhase::Coordinating => {
3828 if session_age > Duration::from_millis(500) {
3830 Some(SessionAdvancementEvent::CoordinationComplete)
3831 } else {
3832 None
3833 }
3834 }
3835 CoordinationPhase::Preparing => {
3836 if session_age > Duration::from_secs(1) {
3838 Some(SessionAdvancementEvent::PreparationComplete)
3839 } else {
3840 None
3841 }
3842 }
3843 CoordinationPhase::Punching => {
3844 if session_age > Duration::from_secs(2) {
3846 Some(SessionAdvancementEvent::PunchingComplete)
3847 } else {
3848 None
3849 }
3850 }
3851 CoordinationPhase::Validating => {
3852 if session_age > Duration::from_secs(10) {
3854 Some(SessionAdvancementEvent::ValidationTimeout)
3855 } else {
3856 None
3857 }
3858 }
3859 CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
3860 if session_age > Duration::from_secs(60) {
3862 Some(SessionAdvancementEvent::ReadyForCleanup)
3863 } else {
3864 None
3865 }
3866 }
3867 CoordinationPhase::Idle => {
3868 Some(SessionAdvancementEvent::Timeout)
3870 }
3871 }
3872 }
3873
3874 fn advance_session_state_static(
3876 session: &mut CoordinationSession,
3877 event: SessionAdvancementEvent,
3878 _now: Instant,
3879 ) -> Result<Vec<CoordinationSessionEvent>, NatTraversalError> {
3880 let mut events = Vec::new();
3881 let previous_phase = session.phase;
3882
3883 match (session.phase, event) {
3884 (CoordinationPhase::Requesting, SessionAdvancementEvent::BothPeersReady) => {
3885 session.phase = CoordinationPhase::Coordinating;
3886 debug!(
3887 "Session {} advanced from Requesting to Coordinating",
3888 session.session_id
3889 );
3890 events.push(CoordinationSessionEvent::PhaseChanged {
3891 session_id: session.session_id,
3892 old_phase: previous_phase,
3893 new_phase: session.phase,
3894 });
3895 }
3896 (CoordinationPhase::Requesting, SessionAdvancementEvent::Timeout) => {
3897 session.phase = CoordinationPhase::Failed;
3898 warn!(
3899 "Session {} timed out in Requesting phase",
3900 session.session_id
3901 );
3902 events.push(CoordinationSessionEvent::SessionFailed {
3903 session_id: session.session_id,
3904 peer_a: session.peer_a,
3905 peer_b: session.peer_b,
3906 reason: "Timeout waiting for peer responses".to_string(),
3907 });
3908 }
3909 (CoordinationPhase::Coordinating, SessionAdvancementEvent::CoordinationComplete) => {
3910 session.phase = CoordinationPhase::Preparing;
3911 debug!(
3912 "Session {} advanced from Coordinating to Preparing",
3913 session.session_id
3914 );
3915 events.push(CoordinationSessionEvent::PhaseChanged {
3916 session_id: session.session_id,
3917 old_phase: previous_phase,
3918 new_phase: session.phase,
3919 });
3920 }
3921 (CoordinationPhase::Preparing, SessionAdvancementEvent::PreparationComplete) => {
3922 session.phase = CoordinationPhase::Punching;
3923 debug!(
3924 "Session {} advanced from Preparing to Punching",
3925 session.session_id
3926 );
3927 events.push(CoordinationSessionEvent::PhaseChanged {
3928 session_id: session.session_id,
3929 old_phase: previous_phase,
3930 new_phase: session.phase,
3931 });
3932 events.push(CoordinationSessionEvent::StartHolePunching {
3933 session_id: session.session_id,
3934 peer_a: session.peer_a,
3935 peer_b: session.peer_b,
3936 target_addresses: session.target_addresses.clone(),
3937 });
3938 }
3939 (CoordinationPhase::Punching, SessionAdvancementEvent::PunchingComplete) => {
3940 session.phase = CoordinationPhase::Validating;
3941 debug!(
3942 "Session {} advanced from Punching to Validating",
3943 session.session_id
3944 );
3945 events.push(CoordinationSessionEvent::PhaseChanged {
3946 session_id: session.session_id,
3947 old_phase: previous_phase,
3948 new_phase: session.phase,
3949 });
3950 }
3951 (CoordinationPhase::Validating, SessionAdvancementEvent::ValidationTimeout) => {
3952 session.phase = CoordinationPhase::Failed;
3953 warn!("Session {} validation timed out", session.session_id);
3954 events.push(CoordinationSessionEvent::SessionFailed {
3955 session_id: session.session_id,
3956 peer_a: session.peer_a,
3957 peer_b: session.peer_b,
3958 reason: "Validation timeout".to_string(),
3959 });
3960 }
3961 (phase, SessionAdvancementEvent::ReadyForCleanup) => {
3962 debug!(
3963 "Session {} ready for cleanup in phase {:?}",
3964 session.session_id, phase
3965 );
3966 events.push(CoordinationSessionEvent::ReadyForCleanup {
3967 session_id: session.session_id,
3968 });
3969 }
3970 _ => {
3971 warn!(
3973 "Invalid state transition for session {}: {:?} -> {:?}",
3974 session.session_id, session.phase, event
3975 );
3976 }
3977 }
3978
3979 Ok(events)
3980 }
3981
3982 fn cleanup_completed_sessions(&mut self, now: Instant) {
3984 let cleanup_timeout = Duration::from_secs(300); let sessions_to_remove: Vec<CoordinationSessionId> = self
3987 .coordination_sessions
3988 .iter()
3989 .filter(|(_, session)| {
3990 matches!(
3991 session.phase,
3992 CoordinationPhase::Succeeded | CoordinationPhase::Failed
3993 ) && now.duration_since(session.started_at) > cleanup_timeout
3994 })
3995 .map(|(&session_id, _)| session_id)
3996 .collect();
3997
3998 for session_id in sessions_to_remove {
3999 if let Some(session) = self.coordination_sessions.remove(&session_id) {
4000 debug!(
4001 "Cleaned up completed session {} in phase {:?}",
4002 session_id, session.phase
4003 );
4004 }
4005 }
4006
4007 self.stats.active_sessions = self.coordination_sessions.len();
4008 }
4009
4010 pub(crate) fn retry_failed_coordination(
4015 &mut self,
4016 session_id: CoordinationSessionId,
4017 now: Instant,
4018 ) -> Result<bool, NatTraversalError> {
4019 let session = self
4020 .coordination_sessions
4021 .get_mut(&session_id)
4022 .ok_or(NatTraversalError::NoActiveCoordination)?;
4023
4024 if !matches!(session.phase, CoordinationPhase::Failed) {
4026 return Ok(false);
4027 }
4028
4029 let base_delay = Duration::from_secs(1);
4031 let max_delay = Duration::from_secs(60);
4032 let retry_count = session.stats.successful_coordinations; let delay = std::cmp::min(
4035 base_delay * 2_u32.pow(retry_count.min(10)), max_delay,
4037 );
4038
4039 let _jitter_factor = 0.1;
4041 let jitter =
4042 Duration::from_millis((rand::random::<u64>() % 100) * delay.as_millis() as u64 / 1000);
4043 let total_delay = delay + jitter;
4044
4045 if now.duration_since(session.started_at) < total_delay {
4047 return Ok(false);
4048 }
4049
4050 const MAX_RETRIES: u32 = 5;
4052 if retry_count >= MAX_RETRIES {
4053 warn!(
4054 "Session {} exceeded maximum retry attempts ({})",
4055 session_id, MAX_RETRIES
4056 );
4057 return Ok(false);
4058 }
4059
4060 session.phase = CoordinationPhase::Requesting;
4062 session.started_at = now;
4063 session.sync_state.peer_a_ready = false;
4064 session.sync_state.peer_b_ready = false;
4065 session.stats.successful_coordinations += 1; info!(
4068 "Retrying coordination session {} (attempt {})",
4069 session_id,
4070 retry_count + 1
4071 );
4072 Ok(true)
4073 }
4074
4075 pub(crate) fn handle_coordination_error(
4077 &mut self,
4078 session_id: CoordinationSessionId,
4079 error: NatTraversalError,
4080 _now: Instant,
4081 ) -> CoordinationRecoveryAction {
4082 let session = match self.coordination_sessions.get_mut(&session_id) {
4083 Some(session) => session,
4084 None => return CoordinationRecoveryAction::NoAction,
4085 };
4086
4087 match error {
4088 NatTraversalError::RateLimitExceeded => {
4089 warn!("Rate limit exceeded for session {}, will retry", session_id);
4091 CoordinationRecoveryAction::RetryWithBackoff
4092 }
4093 NatTraversalError::SecurityValidationFailed
4094 | NatTraversalError::SuspiciousCoordination => {
4095 session.phase = CoordinationPhase::Failed;
4097 warn!(
4098 "Security validation failed for session {}, marking as failed",
4099 session_id
4100 );
4101 CoordinationRecoveryAction::MarkAsFailed
4102 }
4103 NatTraversalError::InvalidAddress => {
4104 warn!("Invalid address in session {}, allowing retry", session_id);
4106 CoordinationRecoveryAction::RetryWithBackoff
4107 }
4108 NatTraversalError::NoActiveCoordination => {
4109 warn!(
4111 "No active coordination for session {}, cleaning up",
4112 session_id
4113 );
4114 CoordinationRecoveryAction::Cleanup
4115 }
4116 _ => {
4117 warn!(
4119 "Coordination error for session {}: {:?}, will retry",
4120 session_id, error
4121 );
4122 CoordinationRecoveryAction::RetryWithBackoff
4123 }
4124 }
4125 }
4126
4127 fn estimate_peer_rtt(&self, peer_id: &PeerId) -> Option<Duration> {
4129 self.peer_registry
4132 .get(peer_id)
4133 .map(|_peer_record| Duration::from_millis(100))
4134 }
4135 pub(crate) fn coordinate_hole_punching(
4140 &mut self,
4141 peer_a: PeerId,
4142 peer_b: PeerId,
4143 round: VarInt,
4144 now: Instant,
4145 ) -> Result<CoordinationSessionId, NatTraversalError> {
4146 let peer_a_record = self
4148 .peer_registry
4149 .get(&peer_a)
4150 .ok_or(NatTraversalError::UnknownCandidate)?;
4151 let peer_b_record = self
4152 .peer_registry
4153 .get(&peer_b)
4154 .ok_or(NatTraversalError::UnknownCandidate)?;
4155
4156 if !peer_a_record.can_coordinate || !peer_b_record.can_coordinate {
4157 return Err(NatTraversalError::InvalidCandidateState);
4158 }
4159
4160 let session_id = self.generate_session_id();
4162
4163 let session = CoordinationSession {
4165 session_id,
4166 peer_a,
4167 peer_b,
4168 current_round: round,
4169 started_at: now,
4170 phase: CoordinationPhase::Requesting,
4171 target_addresses: vec![
4172 (peer_a_record.observed_address, VarInt::from_u32(0)),
4173 (peer_b_record.observed_address, VarInt::from_u32(1)),
4174 ],
4175 sync_state: SynchronizationState {
4176 peer_a_ready: false,
4177 peer_b_ready: false,
4178 },
4179 stats: CoordinationSessionStats::default(),
4180 };
4181
4182 self.coordination_sessions.insert(session_id, session);
4183 self.stats.total_coordinations += 1;
4184 self.stats.active_sessions = self.coordination_sessions.len();
4185
4186 info!(
4187 "Started coordination session {} between peers {:?} and {:?} (round: {})",
4188 session_id,
4189 hex::encode(&peer_a[..8]),
4190 hex::encode(&peer_b[..8]),
4191 round
4192 );
4193
4194 Ok(session_id)
4195 }
4196
4197 pub(crate) fn relay_coordination_frame(
4202 &mut self,
4203 session_id: CoordinationSessionId,
4204 from_peer: PeerId,
4205 frame: &crate::frame::PunchMeNow,
4206 _now: Instant,
4207 ) -> Result<Option<(PeerId, crate::frame::PunchMeNow)>, NatTraversalError> {
4208 let session = self
4209 .coordination_sessions
4210 .get_mut(&session_id)
4211 .ok_or(NatTraversalError::NoActiveCoordination)?;
4212
4213 if session.peer_a != from_peer && session.peer_b != from_peer {
4215 return Err(NatTraversalError::SuspiciousCoordination);
4216 }
4217
4218 let target_peer = if session.peer_a == from_peer {
4220 session.peer_b
4221 } else {
4222 session.peer_a
4223 };
4224
4225 let target_record = self
4227 .peer_registry
4228 .get(&target_peer)
4229 .ok_or(NatTraversalError::UnknownCandidate)?;
4230
4231 if session.peer_a == from_peer {
4233 session.sync_state.peer_a_ready = true;
4234 } else {
4235 session.sync_state.peer_b_ready = true;
4236 }
4237
4238 let relay_frame = crate::frame::PunchMeNow {
4240 round: frame.round,
4241 paired_with_sequence_number: frame.paired_with_sequence_number,
4242 address: target_record.observed_address,
4243 target_peer_id: Some(from_peer),
4244 };
4245
4246 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4248 session.phase = CoordinationPhase::Coordinating;
4249 info!(
4250 "Coordination phase complete for session {} - both peers ready",
4251 session_id
4252 );
4253 }
4254
4255 debug!(
4256 "Relaying coordination frame from {:?} to {:?} in session {}",
4257 hex::encode(&from_peer[..8]),
4258 hex::encode(&target_peer[..8]),
4259 session_id
4260 );
4261
4262 Ok(Some((target_peer, relay_frame)))
4263 }
4264
4265 pub(crate) fn advance_coordination_round(
4270 &mut self,
4271 session_id: CoordinationSessionId,
4272 now: Instant,
4273 ) -> Result<CoordinationPhase, NatTraversalError> {
4274 let session = self
4275 .coordination_sessions
4276 .get_mut(&session_id)
4277 .ok_or(NatTraversalError::NoActiveCoordination)?;
4278
4279 let previous_phase = session.phase;
4280
4281 match session.phase {
4283 CoordinationPhase::Requesting => {
4284 if session.sync_state.peer_a_ready && session.sync_state.peer_b_ready {
4286 session.phase = CoordinationPhase::Coordinating;
4287 debug!("Session {} advanced to Coordinating phase", session_id);
4288 }
4289 }
4290 CoordinationPhase::Coordinating => {
4291 let coordination_delay = Duration::from_millis(200); let punch_time = now + coordination_delay;
4294
4295 session.phase = CoordinationPhase::Preparing;
4296 debug!(
4297 "Session {} advanced to Preparing phase, punch time: {:?}",
4298 session_id, punch_time
4299 );
4300 }
4301 CoordinationPhase::Preparing => {
4302 session.phase = CoordinationPhase::Punching;
4304 debug!("Session {} advanced to Punching phase", session_id);
4305 }
4306 CoordinationPhase::Punching => {
4307 session.phase = CoordinationPhase::Validating;
4309 debug!("Session {} advanced to Validating phase", session_id);
4310 }
4311 CoordinationPhase::Validating => {
4312 let validation_timeout = Duration::from_secs(5);
4314 if now.duration_since(session.started_at) > validation_timeout {
4315 session.phase = CoordinationPhase::Failed;
4316 debug!("Session {} timed out in validation", session_id);
4317 }
4318 }
4319 CoordinationPhase::Succeeded | CoordinationPhase::Failed => {
4320 }
4322 CoordinationPhase::Idle => {
4323 session.phase = CoordinationPhase::Requesting;
4325 }
4326 }
4327
4328 if session.phase != previous_phase {
4330 match session.phase {
4331 CoordinationPhase::Succeeded => {
4332 session.stats.successful_coordinations += 1;
4333 self.stats.successful_coordinations += 1;
4334 }
4335 CoordinationPhase::Failed => {
4336 }
4338 _ => {}
4339 }
4340 }
4341
4342 Ok(session.phase)
4343 }
4344
4345 pub(crate) fn get_coordination_session(
4347 &self,
4348 session_id: CoordinationSessionId,
4349 ) -> Option<&CoordinationSession> {
4350 self.coordination_sessions.get(&session_id)
4351 }
4352
4353 pub(crate) fn get_coordination_session_mut(
4355 &mut self,
4356 session_id: CoordinationSessionId,
4357 ) -> Option<&mut CoordinationSession> {
4358 self.coordination_sessions.get_mut(&session_id)
4359 }
4360
4361 pub(crate) fn mark_coordination_success(
4363 &mut self,
4364 session_id: CoordinationSessionId,
4365 _now: Instant,
4366 ) -> Result<(), NatTraversalError> {
4367 let session = self
4368 .coordination_sessions
4369 .get_mut(&session_id)
4370 .ok_or(NatTraversalError::NoActiveCoordination)?;
4371
4372 session.phase = CoordinationPhase::Succeeded;
4373 session.stats.successful_coordinations += 1;
4374 self.stats.successful_coordinations += 1;
4375
4376 if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4378 peer_a_record.coordination_count += 1;
4379 peer_a_record.success_rate =
4380 (peer_a_record.success_rate * (peer_a_record.coordination_count - 1) as f64 + 1.0)
4381 / peer_a_record.coordination_count as f64;
4382 }
4383
4384 if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4385 peer_b_record.coordination_count += 1;
4386 peer_b_record.success_rate =
4387 (peer_b_record.success_rate * (peer_b_record.coordination_count - 1) as f64 + 1.0)
4388 / peer_b_record.coordination_count as f64;
4389 }
4390
4391 info!("Coordination session {} marked as successful", session_id);
4392 Ok(())
4393 }
4394
4395 pub(crate) fn mark_coordination_failure(
4397 &mut self,
4398 session_id: CoordinationSessionId,
4399 reason: &str,
4400 _now: Instant,
4401 ) -> Result<(), NatTraversalError> {
4402 let session = self
4403 .coordination_sessions
4404 .get_mut(&session_id)
4405 .ok_or(NatTraversalError::NoActiveCoordination)?;
4406
4407 session.phase = CoordinationPhase::Failed;
4408
4409 if let Some(peer_a_record) = self.peer_registry.get_mut(&session.peer_a) {
4411 peer_a_record.coordination_count += 1;
4412 peer_a_record.success_rate = (peer_a_record.success_rate
4413 * (peer_a_record.coordination_count - 1) as f64)
4414 / peer_a_record.coordination_count as f64;
4415 }
4416
4417 if let Some(peer_b_record) = self.peer_registry.get_mut(&session.peer_b) {
4418 peer_b_record.coordination_count += 1;
4419 peer_b_record.success_rate = (peer_b_record.success_rate
4420 * (peer_b_record.coordination_count - 1) as f64)
4421 / peer_b_record.coordination_count as f64;
4422 }
4423
4424 warn!("Coordination session {} failed: {}", session_id, reason);
4425 Ok(())
4426 }
4427
4428 pub(crate) fn get_peer_record(&self, peer_id: PeerId) -> Option<&PeerObservationRecord> {
4430 self.peer_registry.get(&peer_id)
4431 }
4432}
4433
4434#[cfg(test)]
4448mod tests {
4449 use super::*;
4450
4451 fn create_test_state(role: NatTraversalRole) -> NatTraversalState {
4452 NatTraversalState::new(
4453 role,
4454 10, Duration::from_secs(30), )
4457 }
4458
4459 #[test]
4460 fn test_add_quic_discovered_address() {
4461 let mut state = create_test_state(NatTraversalRole::Client);
4463 let now = Instant::now();
4464
4465 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5678));
4467 let seq = state.add_local_candidate(
4468 discovered_addr,
4469 CandidateSource::Observed { by_node: None },
4470 now,
4471 );
4472
4473 assert_eq!(state.local_candidates.len(), 1);
4475 let candidate = state.local_candidates.get(&seq).unwrap();
4476 assert_eq!(candidate.address, discovered_addr);
4477 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4478 assert_eq!(candidate.state, CandidateState::New);
4479
4480 assert!(candidate.priority > 0);
4482 }
4483
4484 #[test]
4485 fn test_add_multiple_quic_discovered_addresses() {
4486 let mut state = create_test_state(NatTraversalRole::Client);
4488 let now = Instant::now();
4489
4490 let addrs = vec![
4491 SocketAddr::from(([1, 2, 3, 4], 5678)),
4492 SocketAddr::from(([5, 6, 7, 8], 9012)),
4493 SocketAddr::from(([2001, 0xdb8, 0, 0, 0, 0, 0, 1], 443)),
4494 ];
4495
4496 let mut sequences = Vec::new();
4497 for addr in &addrs {
4498 let seq =
4499 state.add_local_candidate(*addr, CandidateSource::Observed { by_node: None }, now);
4500 sequences.push(seq);
4501 }
4502
4503 assert_eq!(state.local_candidates.len(), 3);
4505
4506 for (seq, addr) in sequences.iter().zip(&addrs) {
4508 let candidate = state.local_candidates.get(seq).unwrap();
4509 assert_eq!(candidate.address, *addr);
4510 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4511 }
4512 }
4513
4514 #[test]
4515 fn test_quic_discovered_addresses_in_local_candidates() {
4516 let mut state = create_test_state(NatTraversalRole::Client);
4518 let now = Instant::now();
4519
4520 let addr = SocketAddr::from(([192, 168, 1, 100], 5000));
4522 let seq = state.add_local_candidate(addr, CandidateSource::Observed { by_node: None }, now);
4523
4524 assert!(state.local_candidates.contains_key(&seq));
4526 let candidate = state.local_candidates.get(&seq).unwrap();
4527 assert_eq!(candidate.address, addr);
4528
4529 assert!(matches!(candidate.source, CandidateSource::Observed { .. }));
4531 }
4532
4533 #[test]
4534 fn test_quic_discovered_addresses_included_in_hole_punching() {
4535 let mut state = create_test_state(NatTraversalRole::Client);
4537 let now = Instant::now();
4538
4539 let local_addr = SocketAddr::from(([192, 168, 1, 100], 5000));
4541 state.add_local_candidate(local_addr, CandidateSource::Observed { by_node: None }, now);
4542
4543 let remote_addr = SocketAddr::from(([1, 2, 3, 4], 6000));
4545 let priority = VarInt::from_u32(100);
4546 state
4547 .add_remote_candidate(VarInt::from_u32(1), remote_addr, priority, now)
4548 .expect("add remote candidate should succeed");
4549
4550 state.generate_candidate_pairs(now);
4552
4553 assert_eq!(state.candidate_pairs.len(), 1);
4555 let pair = &state.candidate_pairs[0];
4556 assert_eq!(pair.local_addr, local_addr);
4557 assert_eq!(pair.remote_addr, remote_addr);
4558 }
4559
4560 #[test]
4561 fn test_prioritize_quic_discovered_over_predicted() {
4562 let mut state = create_test_state(NatTraversalRole::Client);
4564 let now = Instant::now();
4565
4566 let predicted_addr = SocketAddr::from(([1, 2, 3, 4], 5000));
4568 let predicted_seq =
4569 state.add_local_candidate(predicted_addr, CandidateSource::Predicted, now);
4570
4571 let discovered_addr = SocketAddr::from(([1, 2, 3, 4], 5001));
4573 let discovered_seq = state.add_local_candidate(
4574 discovered_addr,
4575 CandidateSource::Observed { by_node: None },
4576 now,
4577 );
4578
4579 let predicted_priority = state.local_candidates.get(&predicted_seq).unwrap().priority;
4581 let discovered_priority = state
4582 .local_candidates
4583 .get(&discovered_seq)
4584 .unwrap()
4585 .priority;
4586
4587 assert!(discovered_priority >= predicted_priority);
4590 }
4591
4592 #[test]
4593 fn test_integration_with_nat_traversal_flow() {
4594 let mut state = create_test_state(NatTraversalRole::Client);
4596 let now = Instant::now();
4597
4598 let local_addr = SocketAddr::from(([192, 168, 1, 2], 5000));
4600 state.add_local_candidate(local_addr, CandidateSource::Local, now);
4601
4602 let discovered_addr = SocketAddr::from(([44, 55, 66, 77], 5000));
4603 state.add_local_candidate(
4604 discovered_addr,
4605 CandidateSource::Observed { by_node: None },
4606 now,
4607 );
4608
4609 let remote1 = SocketAddr::from(([93, 184, 215, 123], 6000));
4611 let remote2 = SocketAddr::from(([172, 217, 16, 34], 7000));
4612 let priority = VarInt::from_u32(100);
4613 state
4614 .add_remote_candidate(VarInt::from_u32(1), remote1, priority, now)
4615 .expect("add remote candidate should succeed");
4616 state
4617 .add_remote_candidate(VarInt::from_u32(2), remote2, priority, now)
4618 .expect("add remote candidate should succeed");
4619
4620 state.generate_candidate_pairs(now);
4622
4623 assert_eq!(state.candidate_pairs.len(), 4);
4625
4626 let discovered_pairs: Vec<_> = state
4628 .candidate_pairs
4629 .iter()
4630 .filter(|p| p.local_addr == discovered_addr)
4631 .collect();
4632 assert_eq!(discovered_pairs.len(), 2);
4633 }
4634}